ccdcontext.cpp 150 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "wujobq.hpp"
  16. #include "nbcd.hpp"
  17. #include "rtlread_imp.hpp"
  18. #include "thorplugin.hpp"
  19. #include "thorxmlread.hpp"
  20. #include "thorstats.hpp"
  21. #include "roxiemem.hpp"
  22. #include "eventqueue.hpp"
  23. #include "ccd.hpp"
  24. #include "ccdcontext.hpp"
  25. #include "ccddebug.hpp"
  26. #include "ccddali.hpp"
  27. #include "ccdquery.hpp"
  28. #include "ccdqueue.ipp"
  29. #include "ccdsnmp.hpp"
  30. #include "ccdstate.hpp"
  31. #include "roxiehelper.hpp"
  32. #include "enginecontext.hpp"
  33. using roxiemem::IRowManager;
  34. //=======================================================================================================================
  35. #define DEBUGEE_TIMEOUT 10000
  36. class CSlaveDebugContext : public CBaseDebugContext
  37. {
  38. /*
  39. Some thoughts on slave debugging
  40. 1. Something like a ping can be used to get data from slave when needed
  41. 2. Should disable IBYTI processing (always use primary) - DONE
  42. and server-side caching - DONE
  43. 3. Roxie server can know what slave transactions are pending by intercepting the sends - no need for slave to call back just to indicate start of slave subgraph
  44. 4. There is a problem when a slave hits a breakpoint in that the breakpoint cound have been deleted by the time it gets a chance to tell the Roxie server - can't
  45. happen in local case because of the critical block at the head of checkBreakpoint but the local copy of BPs out on slave CAN get out of date. Should we care?
  46. Should there be a "Sorry, your breakpoints are out of date, here's the new set" response?
  47. Actually what we do is recheck the BP on the server, and ensure that breakpoint indexes are persistent. DONE
  48. 5. We need to serialize over our graph info if changed since last time.
  49. 6. I think we need to change implementation of debugGraph to support children. Then we have a place to put a proxy for a remote one.
  50. - id's should probably be structured so we can use a hash table at each level
  51. */
  52. const RoxiePacketHeader &header;
  53. memsize_t parentActivity;
  54. unsigned channel;
  55. int debugSequence;
  56. CriticalSection crit;
  57. const IRoxieContextLogger &logctx; // hides base class definition with more derived class pointer
  58. public:
  59. CSlaveDebugContext(IRoxieSlaveContext *_ctx, const IRoxieContextLogger &_logctx, RoxiePacketHeader &_header)
  60. : CBaseDebugContext(_logctx), header(_header), logctx(_logctx)
  61. {
  62. channel = header.channel;
  63. debugSequence = 0;
  64. parentActivity = 0;
  65. }
  66. void init(const IRoxieQueryPacket *_packet)
  67. {
  68. unsigned traceLength = _packet->getTraceLength();
  69. assertex(traceLength);
  70. const byte *traceInfo = _packet->queryTraceInfo();
  71. assertex((*traceInfo & LOGGING_DEBUGGERACTIVE) != 0);
  72. unsigned debugLen = *(unsigned short *) (traceInfo + 1);
  73. MemoryBuffer b;
  74. b.setBuffer(debugLen, (char *) (traceInfo + 1 + sizeof(unsigned short)), false);
  75. deserialize(b);
  76. __uint64 tmp; // can't serialize memsize_t
  77. b.read(tmp); // note - this is written by the RemoteAdaptor not by the serialize....
  78. parentActivity = (memsize_t)tmp;
  79. }
  80. virtual unsigned queryChannel() const
  81. {
  82. return channel;
  83. }
  84. virtual BreakpointActionMode checkBreakpoint(DebugState state, IActivityDebugContext *probe, const void *extra)
  85. {
  86. return CBaseDebugContext::checkBreakpoint(state, probe, extra);
  87. }
  88. virtual void waitForDebugger(DebugState state, IActivityDebugContext *probe)
  89. {
  90. StringBuffer debugIdString;
  91. CriticalBlock b(crit); // Make sure send sequentially - don't know if this is strictly needed...
  92. debugSequence++;
  93. debugIdString.appendf(".debug.%x", debugSequence);
  94. IPendingCallback *callback = ROQ->notePendingCallback(header, debugIdString.str()); // note that we register before the send to avoid a race.
  95. try
  96. {
  97. RoxiePacketHeader newHeader(header, ROXIE_DEBUGCALLBACK, 0); // subchannel not relevant
  98. for (;;) // retry indefinitely, as more than likely Roxie server is waiting for user input ...
  99. {
  100. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  101. // These are deserialized in onDebugCallback..
  102. MemoryBuffer debugInfo;
  103. debugInfo.append(debugSequence);
  104. debugInfo.append((char) state);
  105. if (state==DebugStateGraphFinished)
  106. {
  107. debugInfo.append(globalCounts.count());
  108. HashIterator edges(globalCounts);
  109. ForEach(edges)
  110. {
  111. IGlobalEdgeRecord *edge = globalCounts.mapToValue(&edges.query());
  112. debugInfo.append((const char *) edges.query().getKey());
  113. debugInfo.append(edge->queryCount());
  114. }
  115. }
  116. debugInfo.append(currentBreakpointUID);
  117. debugInfo.append((__uint64)parentActivity); // can't serialize memsize_t
  118. debugInfo.append(channel);
  119. assertex (currentGraph); // otherwise what am I remote debugging?
  120. currentGraph->serializeProxyGraphs(debugInfo);
  121. debugInfo.append(probe ? probe->queryEdgeId() : "");
  122. char *buf = (char *) output->getBuffer(debugInfo.length(), true);
  123. memcpy(buf, debugInfo.toByteArray(), debugInfo.length());
  124. output->putBuffer(buf, debugInfo.length(), true);
  125. output->flush();
  126. output.clear();
  127. if (callback->wait(5000))
  128. break;
  129. }
  130. if (traceLevel > 6)
  131. { StringBuffer s; DBGLOG("Processing information from Roxie server in response to %s", newHeader.toString(s).str()); }
  132. MemoryBuffer &serverData = callback->queryData();
  133. deserialize(serverData);
  134. }
  135. catch (...)
  136. {
  137. ROQ->removePendingCallback(callback);
  138. throw;
  139. }
  140. ROQ->removePendingCallback(callback);
  141. }
  142. virtual IRoxieQueryPacket *onDebugCallback(const RoxiePacketHeader &header, size32_t len, char *data)
  143. {
  144. // MORE - Implies a server -> slave child -> slave grandchild type situation - need to pass call on to Roxie server (rather as I do for file callback)
  145. UNIMPLEMENTED;
  146. }
  147. virtual bool onDebuggerTimeout()
  148. {
  149. throwUnexpected();
  150. }
  151. virtual void debugCounts(IXmlWriter *output, unsigned sinceSequence, bool reset)
  152. {
  153. // This gives info for the global view - accumulated counts for all instances, plus the graph as fetched from the workunit
  154. HashIterator edges(globalCounts);
  155. ForEach(edges)
  156. {
  157. IGlobalEdgeRecord *edge = globalCounts.mapToValue(&edges.query());
  158. if (edge->queryLastSequence() && (!sinceSequence || edge->queryLastSequence() > sinceSequence))
  159. {
  160. output->outputBeginNested("edge", true);
  161. output->outputCString((const char *) edges.query().getKey(), "@edgeId");
  162. output->outputUInt(edge->queryCount(), sizeof(unsigned), "@count");
  163. output->outputEndNested("edge");
  164. }
  165. if (reset)
  166. edge->reset();
  167. }
  168. }
  169. };
  170. //=======================================================================================================================
  171. #define DEFAULT_PERSIST_COPIES (-1)
  172. #define PERSIST_LOCK_TIMEOUT 10000
  173. #define PERSIST_LOCK_SLEEP 5000
  174. class CRoxieWorkflowMachine : public WorkflowMachine
  175. {
  176. class PersistVersion : public CInterface
  177. {
  178. public:
  179. PersistVersion(char const * _logicalName, unsigned _eclCRC, unsigned __int64 _allCRC, bool _isFile) : logicalName(_logicalName), eclCRC(_eclCRC), allCRC(_allCRC), isFile(_isFile) {}
  180. StringAttr logicalName;
  181. unsigned eclCRC;
  182. unsigned __int64 allCRC;
  183. bool isFile;
  184. };
  185. public:
  186. CRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *_wu, bool _doOnce, const IRoxieContextLogger &_logctx)
  187. : WorkflowMachine(_logctx)
  188. {
  189. workunit = _wu;
  190. workflowInfo = _workflowInfo;
  191. doOnce = _doOnce;
  192. }
  193. void returnPersistVersion(char const * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
  194. {
  195. persist.setown(new PersistVersion(logicalName, eclCRC, allCRC, isFile));
  196. }
  197. protected:
  198. virtual void begin()
  199. {
  200. // MORE - should pre-do more of this work
  201. unsigned count = 0;
  202. if (workunit)
  203. {
  204. workflow.setown(workunit->getWorkflowClone());
  205. }
  206. else
  207. {
  208. Owned<IConstWorkflowItemIterator> iter = createWorkflowItemIterator(workflowInfo);
  209. for(iter->first(); iter->isValid(); iter->next())
  210. count++;
  211. workflow.setown(createWorkflowItemArray(count));
  212. for(iter->first(); iter->isValid(); iter->next())
  213. {
  214. IConstWorkflowItem *item = iter->query();
  215. bool isOnce = (item->queryMode() == WFModeOnce);
  216. workflow->addClone(item);
  217. if (isOnce != doOnce)
  218. workflow->queryWfid(item->queryWfid()).setState(WFStateDone);
  219. }
  220. }
  221. }
  222. virtual void end()
  223. {
  224. if (workunit)
  225. {
  226. WorkunitUpdate w(&workunit->lock());
  227. w->syncRuntimeWorkflow(workflow);
  228. }
  229. workflow.clear();
  230. }
  231. virtual void schedulingStart()
  232. {
  233. if (!workunit)
  234. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported when running predeployed queries");
  235. if (!wfconn)
  236. wfconn.setown(getWorkflowScheduleConnection(workunit->queryWuid()));
  237. wfconn->lock();
  238. wfconn->setActive();
  239. wfconn->pull(workflow);
  240. wfconn->unlock();
  241. }
  242. virtual bool schedulingPull()
  243. {
  244. if (!workunit)
  245. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported when running predeployed queries");
  246. wfconn->lock();
  247. bool more = wfconn->pull(workflow);
  248. wfconn->unlock();
  249. return more;
  250. }
  251. virtual bool schedulingPullStop()
  252. {
  253. if (!workunit)
  254. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported when running predeployed queries");
  255. wfconn->lock();
  256. bool more = wfconn->pull(workflow);
  257. if(!more) wfconn->resetActive();
  258. wfconn->unlock();
  259. return more;
  260. }
  261. virtual void noteTiming(unsigned wfid, timestamp_type startTime, stat_type elapsedNs)
  262. {
  263. if (!workunit)
  264. return;
  265. WorkunitUpdate wu(&workunit->lock());
  266. StringBuffer scope;
  267. scope.append(WorkflowScopePrefix).append(wfid);
  268. updateWorkunitStat(wu, SSTworkflow, scope, StWhenStarted, nullptr, startTime, 0);
  269. updateWorkunitStat(wu, SSTworkflow, scope, StTimeElapsed, nullptr, elapsedNs, 0);
  270. }
  271. virtual void reportContingencyFailure(char const * type, IException * e)
  272. {
  273. if (workunit)
  274. {
  275. StringBuffer msg;
  276. msg.append(type).append(" clause failed (execution will continue): ").append(e->errorCode()).append(": ");
  277. e->errorMessage(msg);
  278. WorkunitUpdate wu(&workunit->lock());
  279. addExceptionToWorkunit(wu, SeverityWarning, "user", e->errorCode(), msg.str(), NULL, 0, 0, 0);
  280. }
  281. }
  282. virtual void checkForAbort(unsigned wfid, IException * handling)
  283. {
  284. if (workunit && workunit->aborting())
  285. {
  286. if(handling)
  287. {
  288. StringBuffer msg;
  289. msg.append("Abort takes precedence over error: ").append(handling->errorCode()).append(": ");
  290. handling->errorMessage(msg);
  291. msg.append(" (in item ").append(wfid).append(")");
  292. WorkunitUpdate wu(&workunit->lock());
  293. addExceptionToWorkunit(wu, SeverityWarning, "user", handling->errorCode(), msg.str(), NULL, 0, 0, 0);
  294. handling->Release();
  295. }
  296. throw new WorkflowException(0, "Workunit abort request received", wfid, WorkflowException::ABORT, MSGAUD_user);
  297. }
  298. }
  299. virtual void doExecutePersistItem(IRuntimeWorkflowItem & item)
  300. {
  301. if (!workunit)
  302. {
  303. throw MakeStringException(0, "PERSIST not supported when running predeployed queries");
  304. }
  305. unsigned wfid = item.queryWfid();
  306. // Old persist model requires dependencies to be executed BEFORE checking if the persist is up to date
  307. // Defaults to old model, in case executing a WU that is created by earlier eclcc
  308. if (!workunit->getDebugValueBool("expandPersistInputDependencies", false))
  309. doExecuteItemDependencies(item, wfid);
  310. SCMStringBuffer name;
  311. const char *logicalName = item.getPersistName(name).str();
  312. int maxPersistCopies = item.queryPersistCopies();
  313. if (maxPersistCopies < 0)
  314. maxPersistCopies = DEFAULT_PERSIST_COPIES;
  315. Owned<IRemoteConnection> persistLock;
  316. persistLock.setown(startPersist(logicalName));
  317. doExecuteItemDependency(item, item.queryPersistWfid(), wfid, true); // generated code should end up calling back to returnPersistVersion, which sets persist
  318. if (!persist)
  319. {
  320. StringBuffer errmsg;
  321. errmsg.append("Internal error in generated code: for wfid ").append(wfid).append(", persist CRC wfid ").append(item.queryPersistWfid()).append(" did not call returnPersistVersion");
  322. throw MakeStringExceptionDirect(0, errmsg.str());
  323. }
  324. Owned<PersistVersion> thisPersist = persist.getClear();
  325. if (strcmp(logicalName, thisPersist->logicalName.get()) != 0)
  326. {
  327. StringBuffer errmsg;
  328. errmsg.append("Failed workflow/persist consistency check: wfid ").append(wfid).append(", WU persist name ").append(logicalName).append(", runtime persist name ").append(thisPersist->logicalName.get());
  329. throw MakeStringExceptionDirect(0, errmsg.str());
  330. }
  331. if (workunit->getDebugValueInt("freezepersists", 0) != 0)
  332. {
  333. checkPersistMatches(logicalName, thisPersist->eclCRC);
  334. }
  335. else if(!isPersistUptoDate(persistLock, item, logicalName, thisPersist->eclCRC, thisPersist->allCRC, thisPersist->isFile))
  336. {
  337. // New persist model allows dependencies to be executed AFTER checking if the persist is up to date
  338. if (workunit->getDebugValueBool("expandPersistInputDependencies", false))
  339. doExecuteItemDependencies(item, wfid);
  340. if (maxPersistCopies > 0)
  341. deleteLRUPersists(logicalName, (unsigned) maxPersistCopies-1);
  342. doExecuteItem(item, wfid);
  343. updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
  344. }
  345. logctx.CTXLOG("Finished persists - add to read lock list");
  346. persistReadLocks.append(*persistLock.getClear());
  347. }
  348. bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item)
  349. {
  350. SCMStringBuffer name;
  351. const char *logicalName = item.getPersistName(name).str();
  352. StringBuffer whenName;
  353. expandLogicalFilename(whenName, logicalName, workunit, false, false);
  354. whenName.append("$when");
  355. if (!isResult(whenName, ResultSequencePersist))
  356. return false;
  357. when = getResultInt(whenName, ResultSequencePersist);
  358. return true;
  359. }
  360. virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item)
  361. {
  362. if (!workunit)
  363. throw MakeStringException(0, "CRITICAL not supported when running predeployed queries");
  364. unsigned wfid = item.queryWfid();
  365. SCMStringBuffer name;
  366. const char *criticalName = item.getCriticalName(name).str();
  367. Owned<IRemoteConnection> rlock = obtainCriticalLock(criticalName);
  368. if (!rlock.get())
  369. throw MakeStringException(0, "Cannot obtain Critical section lock");
  370. doExecuteItemDependencies(item, wfid);
  371. doExecuteItem(item, wfid);
  372. releaseCriticalLock(rlock);
  373. }
  374. private:
  375. bool isResult(const char *name, unsigned sequence)
  376. {
  377. Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
  378. return r != NULL && r->getResultStatus() != ResultStatusUndefined;
  379. }
  380. unsigned getResultHash(const char * name, unsigned sequence)
  381. {
  382. Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
  383. if (!r)
  384. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Failed to retrieve hash value %s from workunit", name);
  385. return r->getResultHash();
  386. }
  387. unsigned __int64 getResultInt(const char * name, unsigned sequence)
  388. {
  389. Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
  390. if (!r)
  391. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Failed to retrieve persist value %s from workunit", name);
  392. return r->getResultInt();
  393. }
  394. void setResultInt(const char * name, unsigned sequence, unsigned __int64 value, unsigned size)
  395. {
  396. WorkunitUpdate w(&workunit->lock());
  397. w->setResultInt(name, sequence, value);
  398. }
  399. inline bool fileExists(const char *lfn)
  400. {
  401. Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn, queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser);
  402. if (f)
  403. return true;
  404. return false;
  405. }
  406. inline IUserDescriptor *queryUserDescriptor()
  407. {
  408. if (workunit)
  409. return workunit->queryUserDescriptor();//ad-hoc mode
  410. else
  411. {
  412. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  413. if (daliHelper)
  414. return daliHelper->queryUserDescriptor();//predeployed query mode
  415. }
  416. return NULL;
  417. }
  418. bool checkPersistUptoDate(IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer &errText)
  419. {
  420. StringBuffer lfn, crcName, eclName;
  421. expandLogicalFilename(lfn, logicalName, workunit, false, false);
  422. crcName.append(lfn).append("$crc");
  423. eclName.append(lfn).append("$eclcrc");
  424. if (!isResult(lfn, ResultSequencePersist))
  425. errText.appendf("Building PERSIST('%s'): It hasn't been calculated before", logicalName);
  426. else if (isFile && !fileExists(lfn))
  427. errText.appendf("Rebuilding PERSIST('%s'): Persistent file does not exist", logicalName);
  428. else if (!item.queryPersistRefresh())
  429. {
  430. errText.appendf("Not rebuilding PERSIST('%s'): due to REFRESH(false)", logicalName);
  431. return true;
  432. }
  433. else if (!isResult(crcName, ResultSequencePersist))
  434. errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
  435. else
  436. {
  437. unsigned savedEclCRC = (unsigned) getResultInt(eclName, ResultSequencePersist);
  438. unsigned __int64 savedCRC = (unsigned __int64)getResultInt(crcName, ResultSequencePersist);
  439. if (savedEclCRC != eclCRC)
  440. errText.appendf("Rebuilding PERSIST('%s'): ECL has changed", logicalName);
  441. else if (savedCRC != allCRC)
  442. errText.appendf("Rebuilding PERSIST('%s'): Input files have changed", logicalName);
  443. else if (isItemOlderThanInputPersists(item))
  444. errText.appendf("Rebuilding PERSIST('%s'): Input persists are more recent", logicalName);
  445. else
  446. return true;
  447. }
  448. return false;
  449. }
  450. bool changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat)
  451. {
  452. logctx.CTXLOG("Waiting to change persist lock to %s for %s", (mode == RTM_LOCK_WRITE) ? "write" : "read", name); // MORE - pass a logctx around?
  453. //When converting a read lock to a write lock so the persist can be rebuilt hold onto the lock as short as
  454. //possible. Otherwise lots of workunits each trying to convert read locks to write locks will mean
  455. //that the read lock is never released by all the workunits at the same time, so no workunit can progress.
  456. unsigned timeout = repeat ? PERSIST_LOCK_TIMEOUT : 0;
  457. for (;;)
  458. {
  459. try
  460. {
  461. persistLock->changeMode(mode, timeout);
  462. logctx.CTXLOG("Changed persist lock");
  463. return true;
  464. }
  465. catch(ISDSException *E)
  466. {
  467. if (SDSExcpt_LockTimeout != E->errorCode())
  468. throw E;
  469. E->Release();
  470. }
  471. if (!repeat)
  472. {
  473. logctx.CTXLOG("Failed to convert persist lock");
  474. return false;
  475. }
  476. //This is only executed when converting write->read. There is significant doubt whether the changeMode()
  477. //can ever fail - and whether the execution can ever get here.
  478. logctx.CTXLOG("Waiting to convert persist lock"); // MORE - give a chance to abort
  479. }
  480. }
  481. IRemoteConnection *getPersistReadLock(const char * logicalName)
  482. {
  483. StringBuffer lfn;
  484. expandLogicalFilename(lfn, logicalName, workunit, false, false);
  485. if (!lfn.length())
  486. throw MakeStringException(0, "Invalid persist name used : '%s'", logicalName);
  487. const char * name = lfn;
  488. StringBuffer xpath;
  489. xpath.append("/PersistRunLocks/");
  490. if (isdigit(*name))
  491. xpath.append("_");
  492. for (const char * cur = name;*cur;cur++)
  493. xpath.append(isalnum(*cur) ? *cur : '_');
  494. logctx.CTXLOG("Waiting for persist read lock for %s", name);
  495. Owned<IRemoteConnection> persistLock;
  496. for (;;)
  497. {
  498. try
  499. {
  500. unsigned mode = RTM_CREATE_QUERY | RTM_LOCK_READ;
  501. if (queryDaliServerVersion().compare("1.4") >= 0)
  502. mode |= RTM_DELETE_ON_DISCONNECT;
  503. persistLock.setown(querySDS().connect(xpath.str(), myProcessSession(), mode, PERSIST_LOCK_TIMEOUT));
  504. }
  505. catch(ISDSException *E)
  506. {
  507. if (SDSExcpt_LockTimeout != E->errorCode())
  508. throw E;
  509. E->Release();
  510. }
  511. if (persistLock)
  512. break;
  513. logctx.CTXLOG("Waiting for persist read lock"); // MORE - give a chance to abort
  514. }
  515. logctx.CTXLOG("Obtained persist read lock");
  516. return persistLock.getClear();
  517. }
  518. void setBlockedOnPersist(const char * logicalName)
  519. {
  520. StringBuffer s;
  521. s.append("Waiting for persist ").append(logicalName);
  522. WorkunitUpdate w(&workunit->lock());
  523. w->setState(WUStateBlocked);
  524. w->setStateEx(s.str());
  525. }
  526. bool isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
  527. {
  528. //Loop trying to get a write lock - if it fails, then release the read lock, otherwise
  529. //you can get a deadlock with several things waiting to read, and none being able to write.
  530. bool rebuildAllPersists = false; // Useful for debugging purposes
  531. for (;;)
  532. {
  533. StringBuffer dummy;
  534. if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
  535. {
  536. if (dummy.length())
  537. logctx.CTXLOG("%s", dummy.str());
  538. else
  539. logctx.CTXLOG("PERSIST('%s') is up to date", logicalName);
  540. return true;
  541. }
  542. //Get a write lock
  543. setBlockedOnPersist(logicalName);
  544. if (changePersistLockMode(persistLock, RTM_LOCK_WRITE, logicalName, false))
  545. break;
  546. //failed to get a write lock, so release our read lock
  547. persistLock.clear();
  548. MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
  549. persistLock.setown(getPersistReadLock(logicalName));
  550. }
  551. WorkunitUpdate w(&workunit->lock());
  552. w->setState(WUStateRunning);
  553. //Check again whether up to date, someone else might have updated it!
  554. StringBuffer errText;
  555. if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
  556. {
  557. if (errText.length())
  558. {
  559. errText.append(" (after being calculated by another job)");
  560. logctx.CTXLOG("%s", errText.str());
  561. }
  562. else
  563. logctx.CTXLOG("PERSIST('%s') is up to date (after being calculated by another job)", logicalName);
  564. changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
  565. return true;
  566. }
  567. if (errText.length())
  568. logctx.CTXLOG("%s", errText.str());
  569. return false;
  570. }
  571. void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
  572. {
  573. StringBuffer lfn, crcName, eclName, whenName;
  574. expandLogicalFilename(lfn, logicalName, workunit, false, false);
  575. crcName.append(lfn).append("$crc");
  576. eclName.append(lfn).append("$eclcrc");
  577. whenName.append(lfn).append("$when");
  578. setResultInt(crcName, ResultSequencePersist, allCRC, sizeof(int));
  579. setResultInt(eclName, ResultSequencePersist, eclCRC, sizeof(int));
  580. setResultInt(whenName, ResultSequencePersist, time(NULL), sizeof(int));
  581. logctx.CTXLOG("Convert persist write lock to read lock");
  582. changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
  583. }
  584. IRemoteConnection *startPersist(const char * logicalName)
  585. {
  586. setBlockedOnPersist(logicalName);
  587. IRemoteConnection *persistLock = getPersistReadLock(logicalName);
  588. WorkunitUpdate w(&workunit->lock());
  589. w->setState(WUStateRunning);
  590. return persistLock;
  591. }
  592. void checkPersistMatches(const char * logicalName, unsigned eclCRC)
  593. {
  594. StringBuffer lfn, eclName;
  595. expandLogicalFilename(lfn, logicalName, workunit, true, false);
  596. eclName.append(lfn).append("$eclcrc");
  597. if (!isResult(lfn, ResultSequencePersist))
  598. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Frozen PERSIST('%s') hasn't been calculated ", logicalName);
  599. if (isResult(eclName, ResultSequencePersist) && (getResultInt(eclName, ResultSequencePersist) != eclCRC))
  600. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Frozen PERSIST('%s') ECL has changed", logicalName);
  601. StringBuffer msg;
  602. msg.append("Frozen PERSIST('").append(logicalName).append("') is up to date");
  603. logctx.CTXLOG("%s", msg.str());
  604. }
  605. static int comparePersistAccess(IInterface * const *_a, IInterface * const *_b)
  606. {
  607. IPropertyTree *a = *(IPropertyTree **)_a;
  608. IPropertyTree *b = *(IPropertyTree **)_b;
  609. const char *accessedA = a->queryProp("@accessed");
  610. const char *accessedB = b->queryProp("@accessed");
  611. if (accessedA && accessedB)
  612. return strcmp(accessedB, accessedA);
  613. else if (accessedB)
  614. return -1;
  615. else if (accessedA)
  616. return 1;
  617. else
  618. return 0;
  619. }
  620. void deleteLRUPersists(const char * logicalName, unsigned keep)
  621. {
  622. StringBuffer lfn;
  623. expandLogicalFilename(lfn, logicalName, workunit, false, false);
  624. logicalName = lfn.str();
  625. const char *tail = strrchr(logicalName, '_'); // Locate the trailing double-underbar
  626. assertex(tail);
  627. StringBuffer head(tail-logicalName+1, logicalName);
  628. head.append("p*"); // Multi-mode persist names end with __pNNNNNNN
  629. restart: // If things change beneath us as we are deleting, repeat the process
  630. IArrayOf<IPropertyTree> persists;
  631. Owned<IDFAttributesIterator> iter = queryDistributedFileDirectory().getDFAttributesIterator(head,queryUserDescriptor(),false,false,NULL);
  632. ForEach(*iter)
  633. {
  634. IPropertyTree &pt = iter->query();
  635. const char *name = pt.queryProp("@name");
  636. if (stricmp(name, logicalName) == 0) // Don't include the one we are intending to recreate in the LRU list (keep value does not include it)
  637. continue;
  638. if (pt.getPropBool("@persistent", false))
  639. {
  640. // Paranoia - check as far as we can that it really is another instance of this persist
  641. tail = strrchr(name, '_'); // Locate the trailing double-underbar
  642. assertex(tail);
  643. tail++;
  644. bool crcSuffix = (*tail++=='p');
  645. while (crcSuffix && *tail)
  646. {
  647. if (!isdigit(*tail))
  648. crcSuffix = false;
  649. tail++;
  650. }
  651. if (crcSuffix)
  652. persists.append(*LINK(&pt));
  653. }
  654. }
  655. if (persists.ordinality() > keep)
  656. {
  657. persists.sort(comparePersistAccess);
  658. while (persists.ordinality() > keep)
  659. {
  660. Owned<IPropertyTree> oldest = &persists.popGet();
  661. const char *oldAccessTime = oldest->queryProp("@accessed");
  662. VStringBuffer goer("~%s", oldest->queryProp("@name")); // Make sure we don't keep adding the scope
  663. Owned<IRemoteConnection> persistLock = getPersistReadLock(goer);
  664. while (!changePersistLockMode(persistLock, RTM_LOCK_WRITE, goer, false))
  665. {
  666. persistLock.clear();
  667. MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
  668. persistLock.setown(getPersistReadLock(goer));
  669. }
  670. Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(goer, queryUserDescriptor(), true, false, false, nullptr, defaultPrivilegedUser);
  671. if (!f)
  672. goto restart; // Persist has been deleted since last checked - repeat the whole process
  673. const char *newAccessTime = f->queryAttributes().queryProp("@accessed");
  674. if (oldAccessTime && newAccessTime && !streq(oldAccessTime, newAccessTime))
  675. goto restart; // Persist has been accessed since last checked - repeat the whole process
  676. else if (newAccessTime && !oldAccessTime)
  677. goto restart; // Persist has been accessed since last checked - repeat the whole process
  678. DBGLOG("Deleting LRU persist %s (last accessed at %s)", goer.str(), oldAccessTime);
  679. f->detach();
  680. }
  681. }
  682. }
  683. IRemoteConnection *obtainCriticalLock(const char *name)
  684. {
  685. StringBuffer xpath;
  686. xpath.append("/WorkUnitCriticalLocks/").append(name);
  687. return querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, INFINITE);
  688. }
  689. void releaseCriticalLock(IRemoteConnection *criticalLock)
  690. {
  691. if(criticalLock && queryDaliServerVersion().compare("1.3") < 0)
  692. criticalLock->close(true);
  693. }
  694. IConstWorkUnit *workunit;
  695. IPropertyTree *workflowInfo;
  696. Owned<IWorkflowScheduleConnection> wfconn;
  697. Owned<PersistVersion> persist;
  698. IArray persistReadLocks;
  699. bool doOnce;
  700. };
  701. CRoxieWorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *_wu, bool _doOnce, const IRoxieContextLogger &_logctx)
  702. {
  703. return new CRoxieWorkflowMachine(_workflowInfo, _wu, _doOnce, _logctx);
  704. }
  705. //=======================================================================================================================
  706. typedef const byte *row_t;
  707. typedef const byte ** rowset_t;
  708. class DeserializedDataReader : implements IWorkUnitRowReader, public CInterface
  709. {
  710. rowset_t data;
  711. size32_t count;
  712. unsigned idx;
  713. public:
  714. IMPLEMENT_IINTERFACE;
  715. DeserializedDataReader(size32_t _count, rowset_t _data)
  716. : data(_data), count(_count)
  717. {
  718. idx = 0;
  719. }
  720. virtual const void * nextRow() override
  721. {
  722. if (idx < count)
  723. {
  724. const void *row = data[idx];
  725. if (row)
  726. LinkRoxieRow(row);
  727. idx++;
  728. return row;
  729. }
  730. return NULL;
  731. }
  732. virtual void getResultRowset(size32_t & tcount, const byte * * & tgt) override
  733. {
  734. tcount = count;
  735. if (data)
  736. rtlLinkRowset(data);
  737. tgt = data;
  738. }
  739. };
  740. class CDeserializedResultStore : implements IDeserializedResultStore, public CInterface
  741. {
  742. PointerArrayOf<row_t> stored;
  743. UnsignedArray counts;
  744. IPointerArrayOf<IOutputMetaData> metas;
  745. mutable SpinLock lock;
  746. public:
  747. IMPLEMENT_IINTERFACE;
  748. ~CDeserializedResultStore()
  749. {
  750. ForEachItemIn(idx, stored)
  751. {
  752. rowset_t rows = stored.item(idx);
  753. if (rows)
  754. {
  755. rtlReleaseRowset(counts.item(idx), rows);
  756. }
  757. }
  758. }
  759. virtual int addResult(size32_t count, rowset_t data, IOutputMetaData *meta) override
  760. {
  761. SpinBlock b(lock);
  762. stored.append(data);
  763. counts.append(count);
  764. metas.append(meta);
  765. return stored.ordinality()-1;
  766. }
  767. virtual void queryResult(int id, size32_t &count, rowset_t &data) const override
  768. {
  769. count = counts.item(id);
  770. data = stored.item(id);
  771. }
  772. virtual IWorkUnitRowReader *createDeserializedReader(int id) const override
  773. {
  774. return new DeserializedDataReader(counts.item(id), stored.item(id));
  775. }
  776. virtual void serialize(unsigned & tlen, void * & tgt, int id, ICodeContext *codectx) const override
  777. {
  778. IOutputMetaData *meta = metas.item(id);
  779. rowset_t data = stored.item(id);
  780. size32_t count = counts.item(id);
  781. MemoryBuffer result;
  782. Owned<IOutputRowSerializer> rowSerializer = meta->createDiskSerializer(codectx, 0); // NOTE - we don't have a meaningful activity id. Only used for error reporting.
  783. bool grouped = meta->isGrouped();
  784. for (size32_t idx = 0; idx<count; idx++)
  785. {
  786. const byte *row = data[idx];
  787. if (grouped && idx)
  788. result.append(row == NULL);
  789. if (row)
  790. {
  791. CThorDemoRowSerializer serializerTarget(result);
  792. rowSerializer->serialize(serializerTarget, row);
  793. }
  794. }
  795. tlen = result.length();
  796. tgt= result.detach();
  797. }
  798. };
  799. extern IDeserializedResultStore *createDeserializedResultStore()
  800. {
  801. return new CDeserializedResultStore;
  802. }
  803. class WorkUnitRowReaderBase : implements IWorkUnitRowReader, public CInterface
  804. {
  805. protected:
  806. bool isGrouped;
  807. Linked<IEngineRowAllocator> rowAllocator;
  808. public:
  809. IMPLEMENT_IINTERFACE;
  810. WorkUnitRowReaderBase(IEngineRowAllocator *_rowAllocator, bool _isGrouped)
  811. : isGrouped(_isGrouped), rowAllocator(_rowAllocator)
  812. {
  813. }
  814. virtual void getResultRowset(size32_t & tcount, const byte * * & tgt) override
  815. {
  816. bool atEOG = true;
  817. RtlLinkedDatasetBuilder builder(rowAllocator);
  818. for (;;)
  819. {
  820. const void *ret = nextRow();
  821. if (!ret)
  822. {
  823. if (atEOG || !isGrouped)
  824. break;
  825. atEOG = true;
  826. }
  827. else
  828. atEOG = false;
  829. builder.appendOwn(ret);
  830. }
  831. tcount = builder.getcount();
  832. tgt = builder.linkrows();
  833. }
  834. };
  835. class RawDataReader : public WorkUnitRowReaderBase
  836. {
  837. protected:
  838. const IRoxieContextLogger &logctx;
  839. byte *bufferBase;
  840. MemoryBuffer blockBuffer;
  841. Owned<ISerialStream> bufferStream;
  842. CThorStreamDeserializerSource rowSource;
  843. bool eof;
  844. bool eogPending;
  845. Owned<IOutputRowDeserializer> rowDeserializer;
  846. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) = 0;
  847. bool reload()
  848. {
  849. free(bufferBase);
  850. size32_t lenData;
  851. bufferBase = NULL;
  852. void *tempData, *base;
  853. eof = !nextBlock(lenData, tempData, base);
  854. bufferBase = (byte *) base;
  855. blockBuffer.setBuffer(lenData, tempData, false);
  856. return !eof;
  857. }
  858. public:
  859. RawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx)
  860. : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), logctx(_logctx)
  861. {
  862. eof = false;
  863. eogPending = false;
  864. bufferBase = NULL;
  865. rowDeserializer.setown(rowAllocator->createDiskDeserializer(codeContext));
  866. bufferStream.setown(createMemoryBufferSerialStream(blockBuffer));
  867. rowSource.setStream(bufferStream);
  868. }
  869. ~RawDataReader()
  870. {
  871. if (bufferBase)
  872. free(bufferBase);
  873. }
  874. virtual const void *nextRow() override
  875. {
  876. if (eof)
  877. return NULL;
  878. if (rowSource.eos() && !reload())
  879. return NULL;
  880. if (eogPending)
  881. {
  882. eogPending = false;
  883. return NULL;
  884. }
  885. #if 0
  886. // MORE - think a bit about what happens on incomplete rows - I think deserializer will throw an exception?
  887. unsigned thisSize = meta.getRecordSize(data+cursor);
  888. if (thisSize > lenData-cursor)
  889. {
  890. CTXLOG("invalid stored dataset - incomplete row at end");
  891. throw MakeStringException(ROXIE_DATA_ERROR, "invalid stored dataset - incomplete row at end");
  892. }
  893. #endif
  894. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  895. size32_t size = rowDeserializer->deserialize(rowBuilder, rowSource);
  896. if (isGrouped)
  897. rowSource.read(sizeof(bool), &eogPending);
  898. rowsIn++;
  899. return rowBuilder.finalizeRowClear(size);
  900. }
  901. };
  902. class InlineRawDataReader : public RawDataReader
  903. {
  904. Linked<IPropertyTree> xml;
  905. public:
  906. InlineRawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, IPropertyTree *_xml)
  907. : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), xml(_xml)
  908. {
  909. }
  910. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) override
  911. {
  912. base = tgt = NULL;
  913. if (xml)
  914. {
  915. MemoryBuffer result;
  916. xml->getPropBin(NULL, result);
  917. tlen = result.length();
  918. base = tgt = result.detach();
  919. xml.clear();
  920. return tlen != 0;
  921. }
  922. else
  923. {
  924. tlen = 0;
  925. return false;
  926. }
  927. }
  928. };
  929. class StreamedRawDataReader : public RawDataReader
  930. {
  931. SafeSocket &client;
  932. StringAttr id;
  933. offset_t offset;
  934. public:
  935. StreamedRawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, SafeSocket &_client, const char *_id)
  936. : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), client(_client), id(_id)
  937. {
  938. offset = 0;
  939. }
  940. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) override
  941. {
  942. try
  943. {
  944. #ifdef FAKE_EXCEPTIONS
  945. if (offset > 0x10000)
  946. throw MakeStringException(ROXIE_INTERNAL_ERROR, "TEST EXCEPTION");
  947. #endif
  948. // Go request from the socket
  949. MemoryBuffer request;
  950. request.reserve(sizeof(size32_t));
  951. request.append('D');
  952. offset_t loffset = offset;
  953. _WINREV(loffset);
  954. request.append(sizeof(loffset), &loffset);
  955. request.append(strlen(id)+1, id);
  956. size32_t len = request.length() - sizeof(size32_t);
  957. len |= 0x80000000;
  958. _WINREV(len);
  959. *(size32_t *) request.toByteArray() = len;
  960. client.write(request.toByteArray(), request.length());
  961. // Note: I am the only thread reading (we only support a single input dataset in roxiepipe mode)
  962. MemoryBuffer reply;
  963. client.readBlock(reply, readTimeout);
  964. tlen = reply.length();
  965. // MORE - not very robust!
  966. // skip past block header
  967. if (tlen > 0)
  968. {
  969. tgt = base = reply.detach();
  970. tgt = ((char *)base) + 9;
  971. tgt = strchr((char *)tgt, '\0') + 1;
  972. tlen -= ((char *)tgt - (char *)base);
  973. offset += tlen;
  974. }
  975. else
  976. tgt = base = NULL;
  977. return tlen != 0;
  978. }
  979. catch (IException *E)
  980. {
  981. StringBuffer text;
  982. E->errorMessage(text);
  983. int errCode = E->errorCode();
  984. E->Release();
  985. IException *ee = MakeStringException(MSGAUD_programmer, errCode, "%s", text.str());
  986. logctx.logOperatorException(ee, __FILE__, __LINE__, "Exception caught in RawDataReader::nextBlock");
  987. throw ee;
  988. }
  989. catch (...)
  990. {
  991. logctx.logOperatorException(NULL, __FILE__, __LINE__, "Unknown exception caught in RawDataReader::nextBlock");
  992. throw;
  993. }
  994. }
  995. };
  996. class WuResultDataReader : public RawDataReader
  997. {
  998. Owned<IConstWUResult> result;
  999. IXmlToRowTransformer *rowTransformer;
  1000. public:
  1001. WuResultDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, IConstWUResult *_result, IXmlToRowTransformer *_rowTransformer)
  1002. : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), result(_result), rowTransformer(_rowTransformer)
  1003. {
  1004. }
  1005. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) override
  1006. {
  1007. tgt = NULL;
  1008. base = NULL;
  1009. if (result)
  1010. {
  1011. Variable2IDataVal r(&tlen, &tgt);
  1012. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(rowTransformer);
  1013. result->getResultRaw(r, rawXmlTransformer, NULL);
  1014. base = tgt;
  1015. result.clear();
  1016. return tlen != 0;
  1017. }
  1018. else
  1019. {
  1020. tlen = 0;
  1021. return false;
  1022. }
  1023. }
  1024. };
  1025. class InlineXmlDataReader : public WorkUnitRowReaderBase
  1026. {
  1027. Linked<IPropertyTree> xml;
  1028. Owned <XmlColumnProvider> columns;
  1029. Owned<IPropertyTreeIterator> rows;
  1030. IXmlToRowTransformer &rowTransformer;
  1031. public:
  1032. InlineXmlDataReader(IXmlToRowTransformer &_rowTransformer, IPropertyTree *_xml, IEngineRowAllocator *_rowAllocator, bool _isGrouped)
  1033. : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), xml(_xml), rowTransformer(_rowTransformer)
  1034. {
  1035. columns.setown(new XmlDatasetColumnProvider);
  1036. rows.setown(xml->getElements("Row")); // NOTE - the 'hack for Gordon' as found in thorxmlread is not implemented here. Does it need to be ?
  1037. rows->first();
  1038. }
  1039. virtual const void *nextRow() override
  1040. {
  1041. if (rows->isValid())
  1042. {
  1043. columns->setRow(&rows->query());
  1044. rows->next();
  1045. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  1046. NullDiskCallback callback;
  1047. size_t outSize = rowTransformer.transform(rowBuilder, columns, &callback);
  1048. return rowBuilder.finalizeRowClear(outSize);
  1049. }
  1050. return NULL;
  1051. }
  1052. };
  1053. //---------------------------------------------------------------------------------------
  1054. static const StatisticsMapping graphStatistics({});
  1055. class CRoxieContextBase : implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback, public CInterface
  1056. {
  1057. protected:
  1058. Owned<IWUGraphStats> graphStats; // This needs to be destroyed very late (particularly, after the childgraphs)
  1059. mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
  1060. Owned<IRowManager> rowManager; // NOTE: the order of destruction here is significant. For leak check to work destroy this BEFORE allAllocators, but after most other things
  1061. Owned <IDebuggableContext> debugContext;
  1062. const IQueryFactory *factory;
  1063. Owned<IProbeManager> probeManager; // must be destroyed after childGraphs
  1064. MapXToMyClass<unsigned, unsigned, IActivityGraph> childGraphs;
  1065. Owned<IActivityGraph> graph;
  1066. StringBuffer authToken;
  1067. Owned<IPropertyTree> probeQuery;
  1068. unsigned lastWuAbortCheck;
  1069. unsigned startTime;
  1070. unsigned totSlavesReplyLen;
  1071. CCycleTimer elapsedTimer;
  1072. QueryOptions options;
  1073. Owned<IConstWorkUnit> workUnit;
  1074. Owned<IRoxieDaliHelper> daliHelperLink;
  1075. Owned<IDistributedFileTransaction> superfileTransaction;
  1076. mutable CriticalSection statsCrit;
  1077. const IRoxieContextLogger &logctx;
  1078. protected:
  1079. bool exceptionLogged;
  1080. bool aborted;
  1081. CriticalSection abortLock; // NOTE: we don't bother to get lock when just reading to see whether to abort
  1082. Owned<IException> exception;
  1083. static void _toXML(IPropertyTree *tree, StringBuffer &xgmml, unsigned indent)
  1084. {
  1085. if (tree->getPropInt("att[@name='_roxieStarted']/@value", 1) == 0)
  1086. return;
  1087. if (0 && tree->getPropInt("att[@name='_kind']/@value", 0) == 496)
  1088. {
  1089. Owned<IPropertyTreeIterator> sub = tree->getElements(".//att[@name='_roxieStarted']");
  1090. bool foundOne = false;
  1091. ForEach(*sub)
  1092. {
  1093. if (sub->query().getPropInt("@value", 1)==0)
  1094. {
  1095. foundOne = true;
  1096. break;
  1097. }
  1098. }
  1099. if (!foundOne)
  1100. return;
  1101. }
  1102. const char *name = tree->queryName();
  1103. xgmml.pad(indent).append('<').append(name);
  1104. Owned<IAttributeIterator> it = tree->getAttributes(true);
  1105. if (it->first())
  1106. {
  1107. do
  1108. {
  1109. const char *key = it->queryName();
  1110. xgmml.append(' ').append(key+1).append("=\"");
  1111. encodeXML(it->queryValue(), xgmml, ENCODE_NEWLINES);
  1112. xgmml.append("\"");
  1113. }
  1114. while (it->next());
  1115. }
  1116. Owned<IPropertyTreeIterator> sub = tree->getElements("*", iptiter_sort);
  1117. if (!sub->first())
  1118. {
  1119. xgmml.append("/>\n");
  1120. }
  1121. else
  1122. {
  1123. xgmml.append(">\n");
  1124. for(; sub->isValid(); sub->next())
  1125. _toXML(&sub->query(), xgmml, indent+1);
  1126. xgmml.pad(indent).append("</").append(name).append(">\n");
  1127. }
  1128. }
  1129. public:
  1130. IMPLEMENT_IINTERFACE;
  1131. CRoxieContextBase(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
  1132. : factory(_factory), options(factory->queryOptions()), logctx(_logctx), globalStats(graphStatistics)
  1133. {
  1134. startTime = lastWuAbortCheck = msTick();
  1135. persists = NULL;
  1136. temporaries = NULL;
  1137. deserializedResultStore = NULL;
  1138. rereadResults = NULL;
  1139. xmlStoredDatasetReadFlags = ptr_none;
  1140. aborted = false;
  1141. exceptionLogged = false;
  1142. totSlavesReplyLen = 0;
  1143. allocatorMetaCache.setown(createRowAllocatorCache(this));
  1144. rowManager.setown(roxiemem::createRowManager(options.memoryLimit, this, logctx, allocatorMetaCache, false));
  1145. //MORE: If checking heap required then should have
  1146. //rowManager.setown(createCheckingHeap(rowManager)) or something similar.
  1147. }
  1148. ~CRoxieContextBase()
  1149. {
  1150. ::Release(rereadResults);
  1151. ::Release(persists);
  1152. ::Release(temporaries);
  1153. ::Release(deserializedResultStore);
  1154. }
  1155. // interface IRoxieServerContext
  1156. virtual bool collectingDetailedStatistics() const
  1157. {
  1158. return (workUnit != nullptr);
  1159. }
  1160. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
  1161. {
  1162. logctx.noteStatistic(kind, value);
  1163. }
  1164. virtual void mergeStats(const CRuntimeStatisticCollection &from) const
  1165. {
  1166. logctx.mergeStats(from);
  1167. }
  1168. virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
  1169. {
  1170. logctx.gatherStats(merged);
  1171. }
  1172. virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
  1173. {
  1174. logctx.CTXLOGa(category, prefix, text);
  1175. }
  1176. virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
  1177. {
  1178. logctx.CTXLOGaeva(E, file, line, prefix, format, args);
  1179. }
  1180. virtual void CTXLOGl(LogItem *log) const
  1181. {
  1182. logctx.CTXLOGl(log);
  1183. }
  1184. virtual unsigned logString(const char *text) const
  1185. {
  1186. if (text && *text)
  1187. {
  1188. CTXLOG("USER: %s", text);
  1189. return strlen(text);
  1190. }
  1191. else
  1192. return 0;
  1193. }
  1194. virtual const IContextLogger &queryContextLogger() const
  1195. {
  1196. return logctx;
  1197. }
  1198. virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
  1199. {
  1200. return logctx.getLogPrefix(ret);
  1201. }
  1202. virtual bool isIntercepted() const
  1203. {
  1204. return logctx.isIntercepted();
  1205. }
  1206. virtual bool isBlind() const
  1207. {
  1208. return logctx.isBlind();
  1209. }
  1210. virtual unsigned queryTraceLevel() const
  1211. {
  1212. return logctx.queryTraceLevel();
  1213. }
  1214. virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
  1215. {
  1216. const_cast<IRoxieContextLogger&>(logctx).setGlobalId(id, ep, pid);
  1217. }
  1218. virtual void setCallerId(const char *id) override
  1219. {
  1220. const_cast<IRoxieContextLogger&>(logctx).setCallerId(id);
  1221. }
  1222. virtual const char *queryGlobalId() const
  1223. {
  1224. return logctx.queryGlobalId();
  1225. }
  1226. virtual const char *queryCallerId() const override
  1227. {
  1228. return logctx.queryCallerId();
  1229. }
  1230. virtual const char *queryLocalId() const
  1231. {
  1232. return logctx.queryLocalId();
  1233. }
  1234. virtual void setHttpIdHeaders(const char *global, const char *caller)
  1235. {
  1236. const_cast<IRoxieContextLogger&>(logctx).setHttpIdHeaders(global, caller);
  1237. }
  1238. virtual const char *queryGlobalIdHttpHeader() const
  1239. {
  1240. return logctx.queryGlobalIdHttpHeader();
  1241. }
  1242. virtual const char *queryCallerIdHttpHeader() const
  1243. {
  1244. return logctx.queryCallerIdHttpHeader();
  1245. }
  1246. virtual void checkAbort()
  1247. {
  1248. // MORE - really should try to apply limits at slave end too
  1249. #ifdef __linux__
  1250. if (linuxYield)
  1251. sched_yield();
  1252. #endif
  1253. #ifdef _DEBUG
  1254. if (shuttingDown)
  1255. throw MakeStringException(ROXIE_FORCE_SHUTDOWN, "Roxie is shutting down");
  1256. #endif
  1257. if (aborted) // NOTE - don't bother getting lock before reading this (for speed) - a false read is very unlikely and not a problem
  1258. {
  1259. CriticalBlock b(abortLock);
  1260. if (!exception)
  1261. exception.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Query was aborted"));
  1262. throw exception.getLink();
  1263. }
  1264. if (graph)
  1265. graph->checkAbort();
  1266. if (options.timeLimit && (msTick() - startTime > options.timeLimit))
  1267. {
  1268. unsigned oldLimit = options.timeLimit;
  1269. //timeLimit = 0; // to prevent exceptions in cleanup - this means only one arm gets stopped!
  1270. CriticalBlock b(abortLock);
  1271. IException *E = MakeStringException(ROXIE_TIMEOUT, "Query %s exceeded time limit (%d ms) - terminated", factory->queryQueryName(), oldLimit);
  1272. if (!exceptionLogged)
  1273. {
  1274. logOperatorException(E, NULL, 0, NULL);
  1275. exceptionLogged = true;
  1276. }
  1277. throw E;
  1278. }
  1279. if (workUnit && (msTick() - lastWuAbortCheck > 5000))
  1280. {
  1281. CriticalBlock b(abortLock);
  1282. if (workUnit->aborting())
  1283. {
  1284. if (!exception)
  1285. exception.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Query was aborted"));
  1286. throw exception.getLink();
  1287. }
  1288. lastWuAbortCheck = msTick();
  1289. }
  1290. }
  1291. virtual unsigned checkInterval() const
  1292. {
  1293. unsigned interval = MAX_ABORT_CHECK_INTERVAL;
  1294. if (options.timeLimit)
  1295. {
  1296. interval = options.timeLimit / 10;
  1297. if (interval < MIN_ABORT_CHECK_INTERVAL)
  1298. interval = MIN_ABORT_CHECK_INTERVAL;
  1299. if (interval > MAX_ABORT_CHECK_INTERVAL)
  1300. interval = MAX_ABORT_CHECK_INTERVAL;
  1301. }
  1302. return interval;
  1303. }
  1304. virtual void notifyAbort(IException *E)
  1305. {
  1306. CriticalBlock b(abortLock);
  1307. if (!aborted && QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
  1308. {
  1309. aborted = true;
  1310. exception.set(E);
  1311. setWUState(WUStateAborting);
  1312. }
  1313. }
  1314. virtual void setWUState(WUState state)
  1315. {
  1316. if (workUnit)
  1317. {
  1318. WorkunitUpdate w(&workUnit->lock());
  1319. w->setState(state);
  1320. }
  1321. }
  1322. virtual bool checkWuAborted()
  1323. {
  1324. return workUnit && workUnit->aborting();
  1325. }
  1326. virtual const QueryOptions &queryOptions() const
  1327. {
  1328. return options;
  1329. }
  1330. const char *queryAuthToken()
  1331. {
  1332. return authToken.str();
  1333. }
  1334. virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph)
  1335. {
  1336. if (queryTraceLevel() > 10)
  1337. CTXLOG("CSlaveContext %p noteChildGraph %d=%p", this, id, childGraph);
  1338. childGraphs.setValue(id, childGraph);
  1339. }
  1340. virtual IActivityGraph *getLibraryGraph(const LibraryCallFactoryExtra &extra, IRoxieServerActivity *parentActivity)
  1341. {
  1342. if (extra.embedded)
  1343. {
  1344. return factory->lookupGraph(this, extra.embeddedGraphName, probeManager, *this, parentActivity);
  1345. }
  1346. else
  1347. {
  1348. Owned<IQueryFactory> libraryQuery = factory->lookupLibrary(extra.libraryName, extra.interfaceHash, *this);
  1349. assertex(libraryQuery);
  1350. return libraryQuery->lookupGraph(this, "graph1", probeManager, *this, parentActivity);
  1351. }
  1352. }
  1353. void beginGraph(const char *graphName)
  1354. {
  1355. if (debugContext)
  1356. {
  1357. probeManager.clear(); // Hack!
  1358. probeManager.setown(createDebugManager(debugContext, graphName));
  1359. debugContext->checkBreakpoint(DebugStateGraphCreate, NULL, graphName);
  1360. }
  1361. else if (probeAllRows || probeQuery != NULL)
  1362. probeManager.setown(createProbeManager());
  1363. graph.setown(factory->lookupGraph(this, graphName, probeManager, *this, NULL));
  1364. graph->onCreate(NULL); // MORE - is that right
  1365. if (debugContext)
  1366. debugContext->checkBreakpoint(DebugStateGraphStart, NULL, graphName);
  1367. if (collectingDetailedStatistics())
  1368. graphStats.setown(workUnit->updateStats(graph->queryName(), SCTroxie, queryStatisticsComponentName(), getWorkflowId(), 0));
  1369. }
  1370. virtual void endGraph(unsigned __int64 startTimeStamp, cycle_t startCycles, bool aborting)
  1371. {
  1372. if (graph)
  1373. {
  1374. IException * error = NULL;
  1375. try
  1376. {
  1377. unsigned __int64 elapsedTime = cycle_to_nanosec(get_cycles_now() - startCycles);
  1378. if (debugContext)
  1379. debugContext->checkBreakpoint(aborting ? DebugStateGraphAbort : DebugStateGraphEnd, NULL, graph->queryName());
  1380. if (aborting)
  1381. graph->abort();
  1382. if (workUnit)
  1383. {
  1384. const char * graphName = graph->queryName();
  1385. StringBuffer graphDesc;
  1386. formatGraphTimerLabel(graphDesc, graphName);
  1387. WorkunitUpdate progressWorkUnit(&workUnit->lock());
  1388. StringBuffer graphScope;
  1389. graphScope.append(WorkflowScopePrefix).append(getWorkflowId()).append(":").append(graphName);
  1390. progressWorkUnit->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StWhenStarted, NULL, startTimeStamp, 1, 0, StatsMergeAppend);
  1391. updateWorkunitStat(progressWorkUnit, SSTgraph, graphScope, StTimeElapsed, graphDesc, elapsedTime);
  1392. addTimeStamp(progressWorkUnit, SSTgraph, graphName, StWhenFinished, getWorkflowId());
  1393. }
  1394. graph->reset();
  1395. }
  1396. catch (IException * e)
  1397. {
  1398. error = e;
  1399. }
  1400. cleanupGraphs();
  1401. graphStats.clear();
  1402. if (error)
  1403. throw error;
  1404. }
  1405. }
  1406. void cleanupGraphs()
  1407. {
  1408. IStatisticGatherer * builder = nullptr;
  1409. if (graphStats)
  1410. builder = &graphStats->queryStatsBuilder();
  1411. if (graph)
  1412. graph->gatherStatistics(builder);
  1413. SuperHashIteratorOf<decltype(childGraphs)::ELEMENT> iter(childGraphs);
  1414. ForEach(iter)
  1415. {
  1416. IActivityGraph * curChildGraph = static_cast<IActivityGraph *>(iter.query().getValue());
  1417. curChildGraph->gatherStatistics(builder);
  1418. }
  1419. graph.clear();
  1420. childGraphs.kill();
  1421. }
  1422. void runGraph()
  1423. {
  1424. try
  1425. {
  1426. graph->execute();
  1427. if (probeQuery)
  1428. graph->getProbeResponse(probeQuery);
  1429. }
  1430. catch(...)
  1431. {
  1432. if (probeQuery)
  1433. graph->getProbeResponse(probeQuery);
  1434. throw;
  1435. }
  1436. }
  1437. virtual void executeGraph(const char * name, bool realThor, size32_t parentExtractSize, const void * parentExtract)
  1438. {
  1439. assertex(parentExtractSize == 0);
  1440. if (queryTraceLevel() > 8)
  1441. CTXLOG("Executing graph %s", name);
  1442. if (realThor)
  1443. {
  1444. executeThorGraph(name);
  1445. }
  1446. else
  1447. {
  1448. bool created = false;
  1449. cycle_t startCycles = get_cycles_now();
  1450. unsigned __int64 startTimeStamp = getTimeStampNowValue();
  1451. try
  1452. {
  1453. beginGraph(name);
  1454. created = true;
  1455. runGraph();
  1456. }
  1457. catch (IException *e)
  1458. {
  1459. if (e->errorAudience() == MSGAUD_operator)
  1460. EXCLOG(e, "Exception thrown in query - cleaning up"); // if an IException is throw let EXCLOG determine if a trap should be generated
  1461. else
  1462. {
  1463. StringBuffer s;
  1464. CTXLOG("Exception thrown in query - cleaning up: %d: %s", e->errorCode(), e->errorMessage(s).str());
  1465. }
  1466. if (created) // Partially-created graphs are liable to crash if you call abort() on them...
  1467. endGraph(startTimeStamp, startCycles, true);
  1468. else
  1469. {
  1470. // Bit of a hack... needed to avoid pure virtual calls if these are left to the CRoxieContextBase destructor
  1471. cleanupGraphs();
  1472. }
  1473. CTXLOG("Done cleaning up");
  1474. throw;
  1475. }
  1476. catch (...)
  1477. {
  1478. CTXLOG("Exception thrown in query - cleaning up");
  1479. if (created)
  1480. endGraph(startTimeStamp, startCycles, true);
  1481. else
  1482. {
  1483. // Bit of a hack... needed to avoid pure virtual calls if these are left to the CRoxieContextBase destructor
  1484. cleanupGraphs();
  1485. }
  1486. CTXLOG("Done cleaning up");
  1487. throw;
  1488. }
  1489. endGraph(startTimeStamp, startCycles, false);
  1490. }
  1491. }
  1492. virtual IActivityGraph * queryChildGraph(unsigned id)
  1493. {
  1494. if (queryTraceLevel() > 10)
  1495. CTXLOG("CSlaveContext %p resolveChildGraph %d", this, id);
  1496. if (id == 0)
  1497. return graph;
  1498. IActivityGraph *childGraph = childGraphs.getValue(id);
  1499. assertex(childGraph);
  1500. return childGraph;
  1501. }
  1502. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
  1503. {
  1504. // NOTE - part of ICodeContext interface
  1505. return LINK(queryChildGraph((unsigned) activityId)->queryChildGraph());
  1506. }
  1507. virtual IEclGraphResults * resolveLocalQuery(__int64 id)
  1508. {
  1509. return queryChildGraph((unsigned) id)->queryLocalGraph();
  1510. }
  1511. virtual IRowManager &queryRowManager()
  1512. {
  1513. return *rowManager;
  1514. }
  1515. virtual void addSlavesReplyLen(unsigned len)
  1516. {
  1517. CriticalBlock b(statsCrit); // MORE: change to atomic_add, or may not need it at all?
  1518. totSlavesReplyLen += len;
  1519. }
  1520. virtual const char *loadResource(unsigned id)
  1521. {
  1522. ILoadedDllEntry *dll = factory->queryDll();
  1523. return (const char *) dll->getResource(id);
  1524. }
  1525. virtual ICodeContext *queryCodeContext()
  1526. {
  1527. return this;
  1528. }
  1529. virtual IProbeManager *queryProbeManager() const
  1530. {
  1531. return probeManager;
  1532. }
  1533. virtual IDebuggableContext *queryDebugContext() const
  1534. {
  1535. return debugContext;
  1536. }
  1537. virtual char *getOS()
  1538. {
  1539. #ifdef _WIN32
  1540. return strdup("windows");
  1541. #else
  1542. return strdup("linux");
  1543. #endif
  1544. }
  1545. virtual const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  1546. {
  1547. return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  1548. }
  1549. virtual const void * fromJson(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  1550. {
  1551. return createRowFromJson(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  1552. }
  1553. virtual IEngineContext *queryEngineContext() { return NULL; }
  1554. virtual char *getDaliServers() { throwUnexpected(); }
  1555. virtual unsigned getWorkflowId() { return 0; } // this is a virtual which is implemented in IGlobalContext
  1556. // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server, they will be implemented by more derived CRoxieServerContext class
  1557. virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
  1558. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  1559. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
  1560. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { throwUnexpected(); }
  1561. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  1562. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
  1563. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
  1564. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
  1565. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { throwUnexpected(); }
  1566. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
  1567. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
  1568. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
  1569. virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
  1570. virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
  1571. virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) { throwUnexpected(); }
  1572. virtual char *getWuid() { throwUnexpected(); }
  1573. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1574. virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
  1575. virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
  1576. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
  1577. virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
  1578. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); }
  1579. virtual unsigned getGraphLoopCounter() const override { return 0; }
  1580. virtual unsigned getNodes() { throwUnexpected(); }
  1581. virtual unsigned getNodeNum() { throwUnexpected(); }
  1582. virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); }
  1583. virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
  1584. virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
  1585. virtual char *getJobName() { throwUnexpected(); }
  1586. virtual char *getJobOwner() { throwUnexpected(); }
  1587. virtual char *getClusterName() { throwUnexpected(); }
  1588. virtual char *getGroupName() { throwUnexpected(); }
  1589. virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
  1590. virtual unsigned getPriority() const { throwUnexpected(); }
  1591. virtual char *getPlatform() { throwUnexpected(); }
  1592. virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
  1593. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1594. {
  1595. return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
  1596. }
  1597. virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned flags) const
  1598. {
  1599. return allocatorMetaCache->ensure(meta, activityId, (roxiemem::RoxieHeapFlags)flags);
  1600. }
  1601. virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  1602. {
  1603. return allocatorMetaCache->ensure(meta, activityId, flags);
  1604. }
  1605. virtual const char *cloneVString(const char *str) const
  1606. {
  1607. return rowManager->cloneVString(str);
  1608. }
  1609. virtual const char *cloneVString(size32_t len, const char *str) const
  1610. {
  1611. return rowManager->cloneVString(len, str);
  1612. }
  1613. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  1614. {
  1615. convertRowToXML(lenResult, result, info, row, flags);
  1616. }
  1617. virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  1618. {
  1619. convertRowToJSON(lenResult, result, info, row, flags);
  1620. }
  1621. virtual IWorkUnit *updateWorkUnit() const
  1622. {
  1623. if (workUnit)
  1624. return &workUnit->lock();
  1625. else
  1626. return NULL;
  1627. }
  1628. virtual IConstWorkUnit *queryWorkUnit() const
  1629. {
  1630. return workUnit;
  1631. }
  1632. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1633. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
  1634. {
  1635. if (options.checkingHeap)
  1636. return createCrcRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
  1637. else
  1638. return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
  1639. }
  1640. virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  1641. {
  1642. try
  1643. {
  1644. Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(NULL, stepname, sequence, xmlTransformer, _rowAllocator, isGrouped);
  1645. wuReader->getResultRowset(tcount, tgt);
  1646. }
  1647. catch (IException * e)
  1648. {
  1649. StringBuffer text;
  1650. e->errorMessage(text);
  1651. e->Release();
  1652. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1653. }
  1654. catch (...)
  1655. {
  1656. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1657. }
  1658. }
  1659. virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
  1660. {
  1661. try
  1662. {
  1663. Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(NULL, stepname, sequence, xmlTransformer, _rowAllocator, false);
  1664. wuReader->getResultRowset(tcount, tgt);
  1665. }
  1666. catch (IException * e)
  1667. {
  1668. StringBuffer text;
  1669. e->errorMessage(text);
  1670. e->Release();
  1671. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1672. }
  1673. catch (...)
  1674. {
  1675. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1676. }
  1677. }
  1678. virtual bool getResultBool(const char * name, unsigned sequence)
  1679. {
  1680. CriticalBlock b(contextCrit);
  1681. return useContext(sequence).getPropBool(name);
  1682. }
  1683. static unsigned hex2digit(char c)
  1684. {
  1685. // MORE - what about error cases?
  1686. if (c >= 'a')
  1687. return (c - 'a' + 10);
  1688. else if (c >= 'A')
  1689. return (c - 'A' + 10);
  1690. return (c - '0');
  1691. }
  1692. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
  1693. {
  1694. MemoryBuffer result;
  1695. CriticalBlock b(contextCrit);
  1696. const char *val = useContext(sequence).queryProp(name);
  1697. if (val)
  1698. {
  1699. for (;;)
  1700. {
  1701. char c0 = *val++;
  1702. if (!c0)
  1703. break;
  1704. char c1 = *val++;
  1705. if (!c1)
  1706. break; // Shouldn't really happen - we expect even length
  1707. unsigned c2 = (hex2digit(c0) << 4) | hex2digit(c1);
  1708. result.append((unsigned char) c2);
  1709. }
  1710. }
  1711. tlen = result.length();
  1712. tgt = result.detach();
  1713. }
  1714. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  1715. {
  1716. if (isSpecialResultSequence(sequence))
  1717. {
  1718. MemoryBuffer m;
  1719. CriticalBlock b(contextCrit);
  1720. useContext(sequence).getPropBin(stepname, m);
  1721. if (m.length())
  1722. {
  1723. assertex(m.length() == tlen);
  1724. m.read(tlen, tgt);
  1725. }
  1726. else
  1727. memset(tgt, 0, tlen);
  1728. }
  1729. else
  1730. {
  1731. StringBuffer x;
  1732. {
  1733. CriticalBlock b(contextCrit);
  1734. useContext(sequence).getProp(stepname, x);
  1735. }
  1736. Decimal d;
  1737. d.setString(x.length(), x.str());
  1738. if (isSigned)
  1739. d.getDecimal(tlen, precision, tgt);
  1740. else
  1741. d.getUDecimal(tlen, precision, tgt);
  1742. }
  1743. }
  1744. virtual __int64 getResultInt(const char * name, unsigned sequence)
  1745. {
  1746. CriticalBlock b(contextCrit);
  1747. const char *val = useContext(sequence).queryProp(name);
  1748. if (val)
  1749. {
  1750. // NOTE - we use this rather than getPropInt64 since it handles uint64 values up to MAX_UINT better (for our purposes)
  1751. return rtlStrToInt8(strlen(val), val);
  1752. }
  1753. else
  1754. return 0;
  1755. }
  1756. virtual double getResultReal(const char * name, unsigned sequence)
  1757. {
  1758. CriticalBlock b(contextCrit);
  1759. IPropertyTree &ctx = useContext(sequence);
  1760. double ret = 0;
  1761. if (ctx.hasProp(name))
  1762. {
  1763. if (ctx.isBinary(name))
  1764. {
  1765. MemoryBuffer buf;
  1766. ctx.getPropBin(name, buf);
  1767. buf.read(ret);
  1768. }
  1769. else
  1770. {
  1771. const char *val = ctx.queryProp(name);
  1772. if (val)
  1773. ret = atof(val);
  1774. }
  1775. }
  1776. return ret;
  1777. }
  1778. virtual void getResultSet(bool & tisAll, unsigned & tlen, void * & tgt, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  1779. {
  1780. try
  1781. {
  1782. CriticalBlock b(contextCrit);
  1783. IPropertyTree &ctx = useContext(sequence);
  1784. IPropertyTree *val = ctx.queryPropTree(stepname);
  1785. doExtractRawResultX(tlen, tgt, val, sequence, xmlTransformer, csvTransformer, true);
  1786. tisAll = val ? val->getPropBool("@isAll", false) : false;
  1787. }
  1788. catch (IException * e)
  1789. {
  1790. StringBuffer text;
  1791. e->errorMessage(text);
  1792. e->Release();
  1793. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve set \"%s\". [%s]", stepname, text.str());
  1794. }
  1795. catch (...)
  1796. {
  1797. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve set \"%s\"", stepname);
  1798. }
  1799. }
  1800. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  1801. {
  1802. try
  1803. {
  1804. CriticalBlock b(contextCrit);
  1805. IPropertyTree &ctx = useContext(sequence);
  1806. IPropertyTree *val = ctx.queryPropTree(stepname);
  1807. doExtractRawResultX(tlen, tgt, val, sequence, xmlTransformer, csvTransformer, false);
  1808. }
  1809. catch (IException * e)
  1810. {
  1811. StringBuffer text;
  1812. e->errorMessage(text);
  1813. e->Release();
  1814. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1815. }
  1816. catch (...)
  1817. {
  1818. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1819. }
  1820. }
  1821. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
  1822. {
  1823. MemoryBuffer x;
  1824. bool isBinary;
  1825. {
  1826. CriticalBlock b(contextCrit);
  1827. IPropertyTree &ctx = useContext(sequence);
  1828. isBinary = ctx.isBinary(name);
  1829. ctx.getPropBin(name, x);
  1830. }
  1831. if (isBinary) // No utf8 translation if previously set via setResultString
  1832. {
  1833. tlen = x.length();
  1834. tgt = (char *) x.detach();
  1835. }
  1836. else
  1837. rtlUtf8ToStrX(tlen, tgt, rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1838. }
  1839. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
  1840. {
  1841. MemoryBuffer x;
  1842. bool isBinary;
  1843. {
  1844. CriticalBlock b(contextCrit);
  1845. IPropertyTree &ctx = useContext(sequence);
  1846. isBinary = ctx.isBinary(name);
  1847. ctx.getPropBin(name, x);
  1848. }
  1849. if (isBinary)
  1850. rtlStrToStr(tlen, tgt, x.length(), x.toByteArray());
  1851. else
  1852. rtlUtf8ToStr(tlen, tgt, rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1853. }
  1854. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
  1855. {
  1856. StringBuffer x;
  1857. {
  1858. CriticalBlock b(contextCrit);
  1859. useContext(sequence).getProp(name, x);
  1860. }
  1861. tgt = rtlCodepageToVUnicodeX(x.length(), x.str(), "utf-8");
  1862. tlen = rtlUnicodeStrlen(tgt);
  1863. }
  1864. virtual char *getResultVarString(const char * name, unsigned sequence)
  1865. {
  1866. CriticalBlock b(contextCrit);
  1867. IPropertyTree &ctx = useContext(sequence);
  1868. bool isBinary = ctx.isBinary(name);
  1869. if (isBinary)
  1870. {
  1871. StringBuffer s;
  1872. ctx.getProp(name, s);
  1873. return s.detach();
  1874. }
  1875. else
  1876. {
  1877. MemoryBuffer x;
  1878. ctx.getPropBin(name, x);
  1879. return rtlUtf8ToVStr(rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1880. }
  1881. }
  1882. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
  1883. {
  1884. StringBuffer x;
  1885. CriticalBlock b(contextCrit);
  1886. useContext(sequence).getProp(name, x);
  1887. return rtlVCodepageToVUnicodeX(x.str(), "utf-8");
  1888. }
  1889. virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
  1890. {
  1891. CriticalBlock b(contextCrit);
  1892. ISectionTimer *timer = functionTimers.getValue(name);
  1893. if (!timer)
  1894. {
  1895. timer = ThorSectionTimer::createTimer(globalStats, name);
  1896. functionTimers.setValue(name, timer);
  1897. timer->Release(); // Value returned is not linked
  1898. }
  1899. return timer;
  1900. }
  1901. virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char * source) override { throwUnexpected(); }
  1902. protected:
  1903. mutable CriticalSection contextCrit;
  1904. Owned<IPropertyTree> context;
  1905. IPropertyTree *persists;
  1906. IPropertyTree *temporaries;
  1907. IPropertyTree *rereadResults;
  1908. PTreeReaderOptions xmlStoredDatasetReadFlags;
  1909. CDeserializedResultStore *deserializedResultStore;
  1910. MapStringToMyClass<ThorSectionTimer> functionTimers;
  1911. CRuntimeStatisticCollection globalStats;
  1912. IPropertyTree &useContext(unsigned sequence)
  1913. {
  1914. checkAbort();
  1915. switch ((int) sequence)
  1916. {
  1917. case ResultSequenceStored:
  1918. if (context)
  1919. return *context;
  1920. else
  1921. throw MakeStringException(ROXIE_CODEGEN_ERROR, "Code generation error - attempting to access stored variable on slave");
  1922. case ResultSequencePersist:
  1923. {
  1924. CriticalBlock b(contextCrit);
  1925. if (!persists)
  1926. persists = createPTree(ipt_fast);
  1927. return *persists;
  1928. }
  1929. case ResultSequenceOnce:
  1930. {
  1931. if (!workUnit)
  1932. return factory->queryOnceContext(logctx);
  1933. }
  1934. //fall through
  1935. case ResultSequenceInternal:
  1936. {
  1937. CriticalBlock b(contextCrit);
  1938. if (!temporaries)
  1939. temporaries = createPTree(ipt_fast);
  1940. return *temporaries;
  1941. }
  1942. default:
  1943. {
  1944. CriticalBlock b(contextCrit);
  1945. if (!rereadResults)
  1946. rereadResults = createPTree(ipt_fast);
  1947. return *rereadResults;
  1948. }
  1949. }
  1950. }
  1951. IDeserializedResultStore &useResultStore(unsigned sequence)
  1952. {
  1953. checkAbort();
  1954. switch ((int) sequence)
  1955. {
  1956. case ResultSequenceOnce:
  1957. if (!workUnit)
  1958. return factory->queryOnceResultStore();
  1959. //fall through
  1960. default:
  1961. // No need to have separate stores for other temporaries...
  1962. CriticalBlock b(contextCrit);
  1963. if (!deserializedResultStore)
  1964. deserializedResultStore = new CDeserializedResultStore;
  1965. return *deserializedResultStore;
  1966. }
  1967. }
  1968. void doExtractRawResultX(unsigned & tlen, void * & tgt, IPropertyTree *val, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, bool isSet)
  1969. {
  1970. tgt = NULL;
  1971. tlen = 0;
  1972. if (val)
  1973. {
  1974. if (val->isBinary())
  1975. {
  1976. MemoryBuffer m;
  1977. val->getPropBin(NULL, m);
  1978. tlen = m.length();
  1979. tgt= m.detach();
  1980. }
  1981. else
  1982. {
  1983. const char *format = val->queryProp("@format");
  1984. if (!format || strcmp(format, "xml")==0)
  1985. {
  1986. assertex(xmlTransformer);
  1987. Variable2IDataVal result(&tlen, &tgt);
  1988. CXmlToRawTransformer rawTransformer(*xmlTransformer, xmlStoredDatasetReadFlags);
  1989. rawTransformer.transformTree(result, *val, !isSet);
  1990. }
  1991. else if (strcmp(format, "deserialized")==0)
  1992. {
  1993. IDeserializedResultStore &resultStore = useResultStore(sequence);
  1994. resultStore.serialize(tlen, tgt, val->getPropInt("@id", -1), queryCodeContext());
  1995. }
  1996. else if (strcmp(format, "csv")==0)
  1997. {
  1998. // MORE - never tested this code.....
  1999. assertex(csvTransformer);
  2000. Variable2IDataVal result(&tlen, &tgt);
  2001. MemoryBuffer m;
  2002. val->getPropBin(NULL, m);
  2003. CCsvToRawTransformer rawCsvTransformer(*csvTransformer);
  2004. rawCsvTransformer.transform(result, m.length(), m.toByteArray(), !isSet);
  2005. }
  2006. else
  2007. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "no transform function available");
  2008. }
  2009. }
  2010. }
  2011. virtual IWorkUnitRowReader *createStreamedRawRowReader(IEngineRowAllocator *rowAllocator, bool isGrouped, const char *id)
  2012. {
  2013. throwUnexpected(); // Should only see on server
  2014. }
  2015. virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
  2016. {
  2017. try
  2018. {
  2019. if (wuid)
  2020. {
  2021. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  2022. if (daliHelper && daliHelper->connected())
  2023. {
  2024. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2025. Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid);
  2026. if (!externalWU)
  2027. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to open workunit %s", wuid);
  2028. externalWU->remoteCheckAccess(queryUserDescriptor(), false);
  2029. Owned<IConstWUResult> wuResult = getWorkUnitResult(externalWU, stepname, sequence);
  2030. if (!wuResult)
  2031. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to find value %s:%d in workunit %s", stepname ? stepname : "(null)", sequence, wuid);
  2032. return new WuResultDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, wuResult.getClear(), xmlTransformer);
  2033. }
  2034. else
  2035. throw MakeStringException(ROXIE_DALI_ERROR, "WorkUnit read: no dali connection available");
  2036. }
  2037. else
  2038. {
  2039. CriticalBlock b(contextCrit);
  2040. IPropertyTree &ctx = useContext(sequence);
  2041. IPropertyTree *val = ctx.queryPropTree(stepname);
  2042. if (val)
  2043. {
  2044. const char *id = val->queryProp("@id");
  2045. const char *format = val->queryProp("@format");
  2046. if (id)
  2047. {
  2048. if (!format || strcmp(format, "raw") == 0)
  2049. {
  2050. return createStreamedRawRowReader(rowAllocator, isGrouped, id);
  2051. }
  2052. else if (strcmp(format, "deserialized") == 0)
  2053. {
  2054. IDeserializedResultStore &resultStore = useResultStore(sequence);
  2055. return resultStore.createDeserializedReader(atoi(id));
  2056. }
  2057. else
  2058. throwUnexpected();
  2059. }
  2060. else
  2061. {
  2062. if (!format || strcmp(format, "xml") == 0)
  2063. {
  2064. if (xmlTransformer)
  2065. return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator, isGrouped);
  2066. }
  2067. else if (strcmp(format, "raw") == 0)
  2068. {
  2069. return new InlineRawDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, val);
  2070. }
  2071. else
  2072. throwUnexpected();
  2073. }
  2074. }
  2075. }
  2076. }
  2077. catch (IException * e)
  2078. {
  2079. StringBuffer text;
  2080. e->errorMessage(text);
  2081. e->Release();
  2082. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s. [%s]", stepname, text.str());
  2083. }
  2084. catch (...)
  2085. {
  2086. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
  2087. }
  2088. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
  2089. }
  2090. // Copied from eclgraph.cpp, in the hope that we will be deleting that code soon
  2091. void executeThorGraph(const char *graphName)
  2092. {
  2093. assertex(workUnit);
  2094. StringAttr wuid(workUnit->queryWuid());
  2095. StringAttr owner(workUnit->queryUser());
  2096. StringAttr cluster(workUnit->queryClusterName());
  2097. int priority = workUnit->getPriorityValue();
  2098. unsigned timelimit = workUnit->getDebugValueInt("thorConnectTimeout", defaultThorConnectTimeout);
  2099. Owned<IConstWUClusterInfo> c = getTargetClusterInfo(cluster.str());
  2100. if (!c)
  2101. throw MakeStringException(0, "Invalid thor cluster %s", cluster.str());
  2102. SCMStringBuffer queueName;
  2103. c->getThorQueue(queueName);
  2104. Owned<IJobQueue> jq = createJobQueue(queueName.str());
  2105. Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
  2106. bool resubmit;
  2107. do // loop if pause interrupted graph and needs resubmitting on resume
  2108. {
  2109. resubmit = false; // set if job interrupted in thor
  2110. if (WUStatePaused == workUnit->getState()) // check initial state - and wait if paused
  2111. {
  2112. for (;;)
  2113. {
  2114. WUAction action = wuFactory->waitForWorkUnitAction(wuid, queryWorkUnit()->getAction());
  2115. if (action == WUActionUnknown)
  2116. throw new WorkflowException(0, "Workunit aborting", 0, WorkflowException::ABORT, MSGAUD_user);
  2117. if (action != WUActionPause && action != WUActionPauseNow)
  2118. break;
  2119. }
  2120. }
  2121. setWUState(WUStateBlocked);
  2122. class cPollThread: public Thread // MORE - why do we ned a thread here?
  2123. {
  2124. Semaphore sem;
  2125. bool stopped;
  2126. IJobQueue *jq;
  2127. IConstWorkUnit *wu;
  2128. public:
  2129. bool timedout;
  2130. CTimeMon tm;
  2131. cPollThread(IJobQueue *_jq, IConstWorkUnit *_wu, unsigned timelimit)
  2132. : tm(timelimit)
  2133. {
  2134. stopped = false;
  2135. jq = _jq;
  2136. wu = _wu;
  2137. timedout = false;
  2138. }
  2139. ~cPollThread()
  2140. {
  2141. stop();
  2142. }
  2143. int run()
  2144. {
  2145. while (!stopped) {
  2146. sem.wait(ABORT_POLL_PERIOD);
  2147. if (stopped)
  2148. break;
  2149. if (tm.timedout()) {
  2150. timedout = true;
  2151. stopped = true;
  2152. jq->cancelInitiateConversation();
  2153. }
  2154. else if (wu->aborting()) {
  2155. stopped = true;
  2156. jq->cancelInitiateConversation();
  2157. }
  2158. }
  2159. return 0;
  2160. }
  2161. void stop()
  2162. {
  2163. stopped = true;
  2164. sem.signal();
  2165. }
  2166. } pollthread(jq, workUnit, timelimit*1000);
  2167. pollthread.start();
  2168. PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
  2169. IJobQueueItem* item = createJobQueueItem(wuid.str());
  2170. item->setOwner(owner.str());
  2171. item->setPriority(priority);
  2172. Owned<IConversation> conversation = jq->initiateConversation(item);
  2173. bool got = conversation.get()!=NULL;
  2174. pollthread.stop();
  2175. pollthread.join();
  2176. if (!got)
  2177. {
  2178. if (pollthread.timedout)
  2179. throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d) seconds", wuid.str(), timelimit);
  2180. throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
  2181. }
  2182. // get the thor ep from whoever picked up
  2183. SocketEndpoint thorMaster;
  2184. MemoryBuffer msg;
  2185. if (!conversation->recv(msg,1000*60)) {
  2186. throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
  2187. }
  2188. thorMaster.deserialize(msg);
  2189. msg.clear().append(graphName);
  2190. SocketEndpoint myep;
  2191. myep.setLocalHost(0);
  2192. myep.serialize(msg); // only used for tracing
  2193. if (!conversation->send(msg)) {
  2194. StringBuffer s("Failed to send query to Thor on ");
  2195. thorMaster.getUrlStr(s);
  2196. throw MakeStringExceptionDirect(-1, s.str()); // maybe retry?
  2197. }
  2198. StringBuffer eps;
  2199. PROGLOG("Thor on %s running %s",thorMaster.getUrlStr(eps).str(),wuid.str());
  2200. MemoryBuffer reply;
  2201. try
  2202. {
  2203. if (!conversation->recv(reply,INFINITE))
  2204. {
  2205. StringBuffer s("Failed to receive reply from thor ");
  2206. thorMaster.getUrlStr(s);
  2207. throw MakeStringExceptionDirect(-1, s.str());
  2208. }
  2209. }
  2210. catch (IException *e)
  2211. {
  2212. StringBuffer s("Failed to receive reply from thor ");
  2213. thorMaster.getUrlStr(s);
  2214. s.append("; (").append(e->errorCode()).append(", ");
  2215. e->errorMessage(s).append(")");
  2216. e->Release();
  2217. throw MakeStringExceptionDirect(-1, s.str());
  2218. }
  2219. unsigned replyCode;
  2220. reply.read(replyCode);
  2221. switch ((ThorReplyCodes) replyCode)
  2222. {
  2223. case DAMP_THOR_REPLY_PAUSED:
  2224. {
  2225. bool isException ;
  2226. reply.read(isException);
  2227. if (isException)
  2228. {
  2229. Owned<IException> e = deserializeException(reply);
  2230. VStringBuffer str("Pausing job %s caused exception", wuid.str());
  2231. EXCLOG(e, str.str());
  2232. }
  2233. WorkunitUpdate w(&workUnit->lock());
  2234. w->setState(WUStatePaused); // will trigger executeThorGraph to pause next time around.
  2235. WUAction action = w->getAction();
  2236. switch (action)
  2237. {
  2238. case WUActionPause:
  2239. case WUActionPauseNow:
  2240. w->setAction(WUActionUnknown);
  2241. }
  2242. resubmit = true; // JCSMORE - all subgraph _could_ be done, thor will check though and not rerun
  2243. break;
  2244. }
  2245. case DAMP_THOR_REPLY_GOOD:
  2246. break;
  2247. case DAMP_THOR_REPLY_ERROR:
  2248. {
  2249. throw deserializeException(reply);
  2250. }
  2251. case DAMP_THOR_REPLY_ABORT:
  2252. throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
  2253. default:
  2254. throwUnexpected();
  2255. }
  2256. workUnit->forceReload();
  2257. }
  2258. while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
  2259. }
  2260. };
  2261. //-----------------------------------------------------------------------------------------------
  2262. class CSlaveContext : public CRoxieContextBase
  2263. {
  2264. protected:
  2265. RoxiePacketHeader *header;
  2266. public:
  2267. CSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
  2268. : CRoxieContextBase(_factory, _logctx)
  2269. {
  2270. if (_packet)
  2271. {
  2272. header = &_packet->queryHeader();
  2273. const byte *traceInfo = _packet->queryTraceInfo();
  2274. options.setFromSlaveLoggingFlags(*traceInfo);
  2275. bool debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren; // No option to debug simple remote activity
  2276. if (debuggerActive)
  2277. {
  2278. CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
  2279. slaveDebugContext->init(_packet);
  2280. debugContext.setown(slaveDebugContext);
  2281. probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
  2282. }
  2283. }
  2284. else
  2285. {
  2286. assertex(selfTestMode);
  2287. header = nullptr;
  2288. }
  2289. }
  2290. virtual void beforeDispose()
  2291. {
  2292. // NOTE: This is needed to ensure that owned activities are destroyed BEFORE I am,
  2293. // to avoid pure virtual calls when they come to call noteProcessed()
  2294. logctx.mergeStats(globalStats);
  2295. childGraphs.releaseAll();
  2296. }
  2297. virtual IRoxieServerContext *queryServerContext()
  2298. {
  2299. return NULL;
  2300. }
  2301. virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser)
  2302. {
  2303. CDateTime cacheDate; // Note - this is empty meaning we don't know...
  2304. return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
  2305. }
  2306. virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters, bool isPrivilegedUser)
  2307. {
  2308. throwUnexpected(); // only support writing on the server
  2309. }
  2310. virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal, bool isPrivilegedUser)
  2311. {
  2312. // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
  2313. // (possibly we could get just local if the channel matches but not sure there is any point)
  2314. Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt, isPrivilegedUser);
  2315. if (dFile)
  2316. {
  2317. MemoryBuffer mb;
  2318. mb.append(sizeof(RoxiePacketHeader), &header);
  2319. mb.append(lfn);
  2320. dFile->serializePartial(mb, header.channel, isLocal);
  2321. ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
  2322. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  2323. reply->queryHeader().retries = 0;
  2324. ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
  2325. return;
  2326. }
  2327. ROQ->sendAbortCallback(header, lfn, *this);
  2328. throwUnexpected();
  2329. }
  2330. virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed, unsigned _strands) const
  2331. {
  2332. const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
  2333. slaveLogCtx.putStatProcessed(subgraphId, activityId, _idx, _processed, _strands);
  2334. }
  2335. virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId) const
  2336. {
  2337. const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
  2338. slaveLogCtx.putStats(subgraphId, activityId, fromStats);
  2339. }
  2340. };
  2341. IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
  2342. {
  2343. return new CSlaveContext(_factory, _logctx, packet, hasChildren);
  2344. }
  2345. //-----------------------------------------------------------------------------------------------
  2346. class CRoxieServerDebugContext : extends CBaseServerDebugContext
  2347. {
  2348. // Some questions:
  2349. // 1. Do we let all threads go even when say step? Probably... (may allow a thread to be suspended at some point)
  2350. // 2. Doesn't that then make a bit of a mockery of step (when there are multiple threads active)... I _think_ it actually means we DON'T try to wait for all
  2351. // threads to hit a stop, but allow any that hit stop while we are paused to be queued up to be returned by step.... perhaps actually stop them in critsec rather than
  2352. // semaphore and it all becomes easier to code... Anything calling checkBreakPoint while program state is "in debugger" will block on that critSec.
  2353. // 3. I think we need to recheck breakpoints on Roxie server but just check not deleted
  2354. public:
  2355. IRoxieSlaveContext *ctx;
  2356. CRoxieServerDebugContext(IRoxieSlaveContext *_ctx, const IContextLogger &_logctx, IPropertyTree *_queryXGMML)
  2357. : CBaseServerDebugContext(_logctx, _queryXGMML), ctx(_ctx)
  2358. {
  2359. }
  2360. void debugCounts(IXmlWriter *output, unsigned sinceSequence, bool reset)
  2361. {
  2362. CriticalBlock b(debugCrit);
  2363. if (running)
  2364. throw MakeStringException(ROXIE_DEBUG_ERROR, "Command not available while query is running");
  2365. if (currentGraph)
  2366. currentGraph->mergeRemoteCounts(this);
  2367. CBaseServerDebugContext::debugCounts(output, sinceSequence, reset);
  2368. }
  2369. virtual void waitForDebugger(DebugState state, IActivityDebugContext *probe)
  2370. {
  2371. ctx->setWUState(WUStateDebugPaused);
  2372. CBaseServerDebugContext::waitForDebugger(state, probe);
  2373. ctx->setWUState(WUStateDebugRunning);
  2374. }
  2375. virtual bool onDebuggerTimeout()
  2376. {
  2377. return ctx->checkWuAborted();
  2378. }
  2379. virtual void debugInitialize(const char *id, const char *_queryName, bool _breakAtStart)
  2380. {
  2381. CBaseServerDebugContext::debugInitialize(id, _queryName, _breakAtStart);
  2382. queryRoxieDebugSessionManager().registerDebugId(id, this);
  2383. }
  2384. virtual void debugTerminate()
  2385. {
  2386. CriticalBlock b(debugCrit);
  2387. assertex(running);
  2388. currentState = DebugStateUnloaded;
  2389. running = false;
  2390. queryRoxieDebugSessionManager().deregisterDebugId(debugId);
  2391. if (debuggerActive)
  2392. {
  2393. debuggerSem.signal(debuggerActive);
  2394. debuggerActive = 0;
  2395. }
  2396. }
  2397. virtual IRoxieQueryPacket *onDebugCallback(const RoxiePacketHeader &header, size32_t len, char *data)
  2398. {
  2399. MemoryBuffer slaveInfo;
  2400. slaveInfo.setBuffer(len, data, false);
  2401. unsigned debugSequence;
  2402. slaveInfo.read(debugSequence);
  2403. {
  2404. CriticalBlock b(breakCrit); // we want to wait until it's our turn before updating the graph info or the counts get ahead of the current row and life is confusing
  2405. char slaveStateChar;
  2406. slaveInfo.read(slaveStateChar);
  2407. DebugState slaveState = (DebugState) slaveStateChar;
  2408. if (slaveState==DebugStateGraphFinished)
  2409. {
  2410. unsigned numCounts;
  2411. slaveInfo.read(numCounts);
  2412. while (numCounts)
  2413. {
  2414. StringAttr edgeId;
  2415. unsigned edgeCount;
  2416. slaveInfo.read(edgeId);
  2417. slaveInfo.read(edgeCount);
  2418. Owned<IGlobalEdgeRecord> thisEdge = getEdgeRecord(edgeId);
  2419. thisEdge->incrementCount(edgeCount, sequence);
  2420. numCounts--;
  2421. }
  2422. }
  2423. slaveInfo.read(currentBreakpointUID);
  2424. memsize_t slaveActivity;
  2425. unsigned channel;
  2426. __uint64 tmp;
  2427. slaveInfo.read(tmp);
  2428. slaveActivity = (memsize_t)tmp;
  2429. slaveInfo.read(channel);
  2430. assertex(currentGraph);
  2431. currentGraph->deserializeProxyGraphs(slaveState, slaveInfo, (IActivityBase *) slaveActivity, channel);
  2432. if (slaveState != DebugStateGraphFinished) // MORE - this is debatable - may (at least sometimes) want a child graph finished to be a notified event...
  2433. {
  2434. StringBuffer slaveActivityId;
  2435. slaveInfo.read(slaveActivityId);
  2436. IActivityDebugContext *slaveActivityCtx = slaveActivityId.length() ? currentGraph->lookupActivityByEdgeId(slaveActivityId.str()) : NULL;
  2437. checkBreakpoint(slaveState, slaveActivityCtx , NULL);
  2438. }
  2439. }
  2440. MemoryBuffer mb;
  2441. mb.append(sizeof(RoxiePacketHeader), &header);
  2442. StringBuffer debugIdString;
  2443. debugIdString.appendf(".debug.%x", debugSequence);
  2444. mb.append(debugIdString.str());
  2445. serialize(mb);
  2446. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  2447. reply->queryHeader().activityId = ROXIE_DEBUGCALLBACK;
  2448. reply->queryHeader().retries = 0;
  2449. return reply.getClear();
  2450. }
  2451. virtual void debugPrintVariable(IXmlWriter *output, const char *name, const char *type) const
  2452. {
  2453. CriticalBlock b(debugCrit);
  2454. if (running)
  2455. throw MakeStringException(ROXIE_DEBUG_ERROR, "Command not available while query is running");
  2456. output->outputBeginNested("Variables", true);
  2457. if (!type || stricmp(type, "temporary"))
  2458. {
  2459. output->outputBeginNested("Temporary", true);
  2460. ctx->printResults(output, name, (unsigned) ResultSequenceInternal);
  2461. output->outputEndNested("Temporary");
  2462. }
  2463. if (!type || stricmp(type, "global"))
  2464. {
  2465. output->outputBeginNested("Global", true);
  2466. ctx->printResults(output, name, (unsigned) ResultSequenceStored);
  2467. output->outputEndNested("Global");
  2468. }
  2469. output->outputEndNested("Variables");
  2470. }
  2471. };
  2472. class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext, implements IEngineContext
  2473. {
  2474. const IQueryFactory *serverQueryFactory = nullptr;
  2475. IHpccProtocolResponse *protocol = nullptr;
  2476. IHpccProtocolResultsWriter *results = nullptr;
  2477. IHpccNativeProtocolResponse *nativeProtocol = nullptr;
  2478. CriticalSection daliUpdateCrit;
  2479. StringAttr querySetName;
  2480. bool isRaw;
  2481. bool sendHeartBeats;
  2482. unsigned lastSocketCheckTime;
  2483. unsigned lastHeartBeat;
  2484. protected:
  2485. Owned<CRoxieWorkflowMachine> workflow;
  2486. Owned<ITimeReporter> myTimer;
  2487. mutable MapStringToMyClass<IResolvedFile> fileCache;
  2488. StringArray clusterNames;
  2489. int clusterWidth = -1;
  2490. bool isBlocked;
  2491. bool isNative;
  2492. bool trim;
  2493. void doPostProcess()
  2494. {
  2495. if (workUnit)
  2496. {
  2497. WorkunitUpdate w(&workUnit->lock());
  2498. Owned<IStatisticGatherer> builder = createGlobalStatisticGatherer(w);
  2499. globalStats.recordStatistics(*builder);
  2500. }
  2501. logctx.mergeStats(globalStats);
  2502. globalStats.reset();
  2503. if (!protocol)
  2504. return;
  2505. if (!isRaw && !isBlocked)
  2506. protocol->flush();
  2507. if (probeQuery)
  2508. {
  2509. // loop through all of the graphs and create a _Probe to output each xgmml
  2510. Owned<IPropertyTreeIterator> graphs = probeQuery->getElements("Graph");
  2511. ForEach(*graphs)
  2512. {
  2513. IPropertyTree &graph = graphs->query();
  2514. StringBuffer xgmml;
  2515. _toXML(&graph, xgmml, 0);
  2516. protocol->appendProbeGraph(xgmml.str());
  2517. }
  2518. }
  2519. }
  2520. void addWuException(IException *E)
  2521. {
  2522. if (workUnit)
  2523. ::addWuException(workUnit, E);
  2524. }
  2525. void init()
  2526. {
  2527. totSlavesReplyLen = 0;
  2528. isRaw = false;
  2529. isBlocked = false;
  2530. isNative = true;
  2531. sendHeartBeats = false;
  2532. trim = false;
  2533. lastSocketCheckTime = startTime;
  2534. lastHeartBeat = startTime;
  2535. myTimer.setown(createStdTimeReporter());
  2536. }
  2537. void startWorkUnit()
  2538. {
  2539. WorkunitUpdate wu(&workUnit->lock());
  2540. wu->subscribe(SubscribeOptionAbort);
  2541. addTimeStamp(wu, SSTglobal, NULL, StWhenStarted);
  2542. if (!context->getPropBool("@outputToSocket", false))
  2543. protocol = NULL;
  2544. updateSuppliedXmlParams(wu);
  2545. SCMStringBuffer wuParams;
  2546. if (workUnit->getXmlParams(wuParams, false).length())
  2547. {
  2548. // Merge in params from WU. Ones on command line take precedence though...
  2549. Owned<IPropertyTree> wuParamTree = createPTreeFromXMLString(wuParams.str(), ipt_caseInsensitive|ipt_fast);
  2550. Owned<IPropertyTreeIterator> params = wuParamTree->getElements("*");
  2551. ForEach(*params)
  2552. {
  2553. IPropertyTree &param = params->query();
  2554. if (!context->hasProp(param.queryName()))
  2555. context->addPropTree(param.queryName(), LINK(&param));
  2556. }
  2557. }
  2558. if (workUnit->getDebugValueBool("Debug", false))
  2559. {
  2560. bool breakAtStart = workUnit->getDebugValueBool("BreakAtStart", true);
  2561. wu->setState(WUStateDebugRunning);
  2562. initDebugMode(breakAtStart, workUnit->queryWuid());
  2563. }
  2564. else
  2565. wu->setState(WUStateRunning);
  2566. clusterNames.append(workUnit->queryClusterName());
  2567. clusterWidth = -1;
  2568. }
  2569. void initDebugMode(bool breakAtStart, const char *debugUID)
  2570. {
  2571. if (!debugPermitted || !debugEndpoint.port || nativeProtocol)
  2572. throw MakeStringException(ROXIE_ACCESS_ERROR, "Debug query not permitted here");
  2573. debugContext.setown(new CRoxieServerDebugContext(this, logctx, factory->cloneQueryXGMML()));
  2574. debugContext->debugInitialize(debugUID, factory->queryQueryName(), breakAtStart);
  2575. if (workUnit)
  2576. {
  2577. WorkunitUpdate wu(&workUnit->lock());
  2578. wu->setDebugAgentListenerPort(debugEndpoint.port); //tells debugger what port to write commands to
  2579. StringBuffer sb;
  2580. debugEndpoint.getIpText(sb);
  2581. wu->setDebugAgentListenerIP(sb); //tells debugger what IP to write commands to
  2582. }
  2583. options.timeLimit = 0;
  2584. options.warnTimeLimit = 0;
  2585. }
  2586. public:
  2587. IMPLEMENT_IINTERFACE;
  2588. CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
  2589. : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), results(NULL)
  2590. {
  2591. init();
  2592. rowManager->setMemoryLimit(options.memoryLimit);
  2593. workflow.setown(_factory->createWorkflowMachine(workUnit, true, logctx));
  2594. context.setown(createPTree(ipt_caseInsensitive|ipt_fast));
  2595. }
  2596. CRoxieServerContext(IConstWorkUnit *_workUnit, const IQueryFactory *_factory, const ContextLogger &_logctx)
  2597. : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), results(NULL)
  2598. {
  2599. init();
  2600. workUnit.set(_workUnit);
  2601. rowManager->setMemoryLimit(options.memoryLimit);
  2602. workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
  2603. context.setown(createPTree(ipt_caseInsensitive|ipt_fast));
  2604. //MORE: Use various debug settings to override settings:
  2605. rowManager->setActivityTracking(workUnit->getDebugValueBool("traceRoxiePeakMemory", false));
  2606. startWorkUnit();
  2607. }
  2608. CRoxieServerContext(IPropertyTree *_context, IHpccProtocolResponse *_protocol, const IQueryFactory *_factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
  2609. : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), protocol(_protocol), results(NULL), querySetName(_querySetName)
  2610. {
  2611. init();
  2612. if (protocol)
  2613. {
  2614. nativeProtocol = dynamic_cast<IHpccNativeProtocolResponse*>(protocol);
  2615. results = protocol->queryHpccResultsSection();
  2616. }
  2617. context.set(_context);
  2618. options.setFromContext(context);
  2619. isNative = (flags & HPCC_PROTOCOL_NATIVE);
  2620. isRaw = (flags & HPCC_PROTOCOL_NATIVE_RAW);
  2621. isBlocked = (flags & HPCC_PROTOCOL_BLOCKED);
  2622. trim = (flags & HPCC_PROTOCOL_TRIM);
  2623. xmlStoredDatasetReadFlags = _xmlReadFlags;
  2624. sendHeartBeats = enableHeartBeat && isRaw && isBlocked && options.priority==0;
  2625. const char *wuid = context->queryProp("@wuid");
  2626. if (wuid)
  2627. {
  2628. IRoxieDaliHelper *daliHelper = checkDaliConnection();
  2629. assertex(daliHelper );
  2630. workUnit.setown(daliHelper->attachWorkunit(wuid, _factory->queryDll()));
  2631. if (!workUnit)
  2632. throw MakeStringException(ROXIE_DALI_ERROR, "Failed to open workunit %s", wuid);
  2633. startWorkUnit();
  2634. }
  2635. else if (context->getPropBool("@debug", false))
  2636. {
  2637. bool breakAtStart = context->getPropBool("@break", true);
  2638. const char *debugUID = context->queryProp("@uid");
  2639. if (debugUID && *debugUID)
  2640. initDebugMode(breakAtStart, debugUID);
  2641. }
  2642. else if (context->getPropBool("_Probe", false))
  2643. probeQuery.setown(_factory->cloneQueryXGMML());
  2644. // MORE some of these might be appropriate in wu case too?
  2645. rowManager->setActivityTracking(context->getPropBool("_TraceMemory", false));
  2646. rowManager->setMemoryLimit(options.memoryLimit);
  2647. workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
  2648. }
  2649. virtual roxiemem::IRowManager &queryRowManager()
  2650. {
  2651. return *rowManager;
  2652. }
  2653. virtual IRoxieDaliHelper *checkDaliConnection()
  2654. {
  2655. CriticalBlock b(daliUpdateCrit);
  2656. if (!daliHelperLink)
  2657. daliHelperLink.setown(::connectToDali());
  2658. return daliHelperLink;
  2659. }
  2660. virtual void checkAbort()
  2661. {
  2662. CRoxieContextBase::checkAbort();
  2663. unsigned ticksNow = msTick();
  2664. if (options.warnTimeLimit)
  2665. {
  2666. unsigned elapsed = ticksNow - startTime;
  2667. if (elapsed > options.warnTimeLimit)
  2668. {
  2669. CriticalBlock b(abortLock);
  2670. if (elapsed > options.warnTimeLimit) // we don't want critsec on the first check (for efficiency) but check again inside the critsec
  2671. {
  2672. logOperatorException(NULL, NULL, 0, "SLOW (%d ms): %s", elapsed, factory->queryQueryName());
  2673. options.warnTimeLimit = elapsed + options.warnTimeLimit;
  2674. }
  2675. }
  2676. }
  2677. if (protocol)
  2678. {
  2679. if (socketCheckInterval)
  2680. {
  2681. if (ticksNow - lastSocketCheckTime > socketCheckInterval)
  2682. {
  2683. CriticalBlock b(abortLock);
  2684. if (!protocol->checkConnection())
  2685. throw MakeStringException(ROXIE_CLIENT_CLOSED, "Client socket closed");
  2686. lastSocketCheckTime = ticksNow;
  2687. }
  2688. }
  2689. if (sendHeartBeats)
  2690. {
  2691. unsigned hb = ticksNow - lastHeartBeat;
  2692. if (hb > 30000)
  2693. {
  2694. lastHeartBeat = msTick();
  2695. protocol->sendHeartBeat();
  2696. }
  2697. }
  2698. }
  2699. }
  2700. virtual unsigned getXmlFlags() const
  2701. {
  2702. return trim ? XWFtrim|XWFopt : XWFexpandempty;
  2703. }
  2704. virtual const IProperties *queryXmlns(unsigned seqNo)
  2705. {
  2706. IConstWorkUnit *cw = serverQueryFactory->queryWorkUnit();
  2707. if (cw)
  2708. {
  2709. Owned<IConstWUResult> result = cw->getResultBySequence(seqNo);
  2710. if (result)
  2711. return result->queryResultXmlns(); // This is not safe - result is (theoretically, if not actually) freed!
  2712. }
  2713. return NULL;
  2714. }
  2715. virtual memsize_t getMemoryUsage()
  2716. {
  2717. return rowManager->getMemoryUsage();
  2718. }
  2719. virtual unsigned getSlavesReplyLen()
  2720. {
  2721. return totSlavesReplyLen;
  2722. }
  2723. virtual void process()
  2724. {
  2725. MTIME_SECTION(myTimer, "Process");
  2726. QueryTerminationCleanup threadCleanup;
  2727. EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
  2728. Owned<IEclProcess> p = pf();
  2729. try
  2730. {
  2731. if (debugContext)
  2732. debugContext->checkBreakpoint(DebugStateReady, NULL, NULL);
  2733. if (workflow)
  2734. workflow->perform(this, p);
  2735. else
  2736. p->perform(this, 0);
  2737. }
  2738. catch(WorkflowException *E)
  2739. {
  2740. if (debugContext)
  2741. debugContext->checkBreakpoint(DebugStateException, NULL, static_cast<IException *>(E));
  2742. addWuException(E);
  2743. doPostProcess();
  2744. throw;
  2745. }
  2746. catch(IException *E)
  2747. {
  2748. if (debugContext)
  2749. debugContext->checkBreakpoint(DebugStateException, NULL, E);
  2750. addWuException(E);
  2751. doPostProcess();
  2752. throw;
  2753. }
  2754. catch(...)
  2755. {
  2756. if (debugContext)
  2757. debugContext->checkBreakpoint(DebugStateFailed, NULL, NULL);
  2758. if (workUnit)
  2759. {
  2760. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception caught in CRoxieServerContext::process");
  2761. addWuException(E);
  2762. }
  2763. doPostProcess();
  2764. throw;
  2765. }
  2766. if (debugContext)
  2767. debugContext->checkBreakpoint(DebugStateFinished, NULL, NULL);
  2768. doPostProcess();
  2769. }
  2770. virtual void done(bool failed)
  2771. {
  2772. if (debugContext)
  2773. debugContext->debugTerminate();
  2774. if (workUnit)
  2775. {
  2776. if (options.failOnLeaks && !failed)
  2777. {
  2778. cleanupGraphs();
  2779. probeManager.clear();
  2780. ::Release(deserializedResultStore);
  2781. deserializedResultStore = nullptr;
  2782. if (rowManager && rowManager->allocated())
  2783. {
  2784. rowManager->reportLeaks();
  2785. failed = true;
  2786. Owned <IException> E = makeStringException(ROXIE_INTERNAL_ERROR, "Row leaks detected");
  2787. ::addWuException(workUnit, E);
  2788. }
  2789. }
  2790. WorkunitUpdate w(&workUnit->lock());
  2791. if (aborted)
  2792. w->setState(WUStateAborted);
  2793. else if (failed)
  2794. w->setState(WUStateFailed);
  2795. else if (workflow && workflow->hasItemsWaiting())
  2796. w->setState(WUStateWait);
  2797. else
  2798. w->setState(WUStateCompleted);
  2799. if(w->queryEventScheduledCount() > 0 && w->getState() != WUStateWait)
  2800. {
  2801. try
  2802. {
  2803. w->deschedule();
  2804. }
  2805. catch(IException * e)
  2806. {
  2807. int code = e->errorCode();
  2808. VStringBuffer msg("Failed to deschedule workunit %s: ", w->queryWuid());
  2809. e->errorMessage(msg);
  2810. addExceptionToWorkunit(w, SeverityError, "Roxie", code, msg.str(), NULL, 0, 0, 0);
  2811. e->Release();
  2812. OWARNLOG("%s (%d)", msg.str(), code);
  2813. }
  2814. }
  2815. while (clusterNames.ordinality())
  2816. restoreCluster();
  2817. addTimeStamp(w, SSTglobal, NULL, StWhenFinished);
  2818. updateWorkunitTimings(w, myTimer);
  2819. Owned<IStatisticGatherer> gatherer = createGlobalStatisticGatherer(w);
  2820. CRuntimeStatisticCollection merged(allStatistics);
  2821. logctx.gatherStats(merged);
  2822. merged.recordStatistics(*gatherer);
  2823. //MORE: If executed more than once (e.g., scheduled), then TimeElapsed isn't particularly correct.
  2824. gatherer->updateStatistic(StTimeElapsed, elapsedTimer.elapsedNs(), StatsMergeReplace);
  2825. WuStatisticTarget statsTarget(w, "roxie");
  2826. rowManager->reportPeakStatistics(statsTarget, 0);
  2827. }
  2828. }
  2829. virtual ICodeContext *queryCodeContext()
  2830. {
  2831. return this;
  2832. }
  2833. virtual IRoxieServerContext *queryServerContext()
  2834. {
  2835. return this;
  2836. }
  2837. virtual const IQueryFactory *queryQueryFactory() const override
  2838. {
  2839. return factory;
  2840. }
  2841. virtual IGlobalCodeContext *queryGlobalCodeContext()
  2842. {
  2843. return this;
  2844. }
  2845. virtual char *getDaliServers()
  2846. {
  2847. try
  2848. {
  2849. IRoxieDaliHelper *daliHelper = checkDaliConnection();
  2850. if (daliHelper)
  2851. {
  2852. StringBuffer ip;
  2853. daliHelper->getDaliIp(ip);
  2854. return ip.detach();
  2855. }
  2856. }
  2857. catch (IException *E)
  2858. {
  2859. E->Release();
  2860. }
  2861. return strdup("");
  2862. }
  2863. virtual IHpccProtocolResponse *queryProtocol()
  2864. {
  2865. return protocol;
  2866. }
  2867. virtual IEngineContext *queryEngineContext() { return this; }
  2868. virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
  2869. {
  2870. if (num==0)
  2871. return 0;
  2872. SocketEndpoint foreignNode;
  2873. if (_foreignNode && !_foreignNode->isNull())
  2874. foreignNode.set(*_foreignNode);
  2875. else
  2876. {
  2877. Owned<IRoxieDaliHelper> dali = ::connectToDali();
  2878. if (!dali)
  2879. return 0;
  2880. StringBuffer daliIp;
  2881. dali->getDaliIp(daliIp);
  2882. foreignNode.set(daliIp.str());
  2883. }
  2884. return ::getGlobalUniqueIds(num, &foreignNode);
  2885. }
  2886. virtual bool allowDaliAccess() const
  2887. {
  2888. Owned<IRoxieDaliHelper> dali = ::connectToDali();
  2889. return dali != nullptr;
  2890. }
  2891. virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
  2892. {
  2893. if (workUnit)
  2894. result.append(workUnit->queryWuid()); // In workunit mode, this works for both shared and non-shared variants
  2895. else if (isShared)
  2896. result.append('Q').append(factory->queryHash());
  2897. else
  2898. logctx.getLogPrefix(result);
  2899. return result;
  2900. }
  2901. virtual const StringArray &queryManifestFiles(const char *type) const override
  2902. {
  2903. ILoadedDllEntry *dll = factory->queryDll();
  2904. StringBuffer id;
  2905. return dll->queryManifestFiles(type, getQueryId(id, true).str());
  2906. }
  2907. mutable CIArrayOf<TerminationCallbackInfo> callbacks;
  2908. mutable CriticalSection callbacksCrit;
  2909. virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
  2910. {
  2911. TerminationCallbackInfo *term(new TerminationCallbackInfo(callback, key));
  2912. if (isShared)
  2913. factory->onTermination(term);
  2914. else
  2915. {
  2916. CriticalBlock b(callbacksCrit);
  2917. callbacks.append(*term);
  2918. }
  2919. }
  2920. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  2921. {
  2922. if (isSpecialResultSequence(sequence))
  2923. {
  2924. CriticalBlock b(contextCrit);
  2925. useContext(sequence).setPropBool(name, value);
  2926. }
  2927. else if (results)
  2928. {
  2929. results->setResultBool(name, sequence, value);
  2930. }
  2931. if (workUnit)
  2932. {
  2933. try
  2934. {
  2935. WorkunitUpdate wu(&workUnit->lock());
  2936. CriticalBlock b(daliUpdateCrit); // MORE - do we really need these locks?
  2937. wu->setResultBool(name, sequence, value);
  2938. }
  2939. catch(IException *e)
  2940. {
  2941. StringBuffer text;
  2942. e->errorMessage(text);
  2943. CTXLOG("Error trying to update dali: %s", text.str());
  2944. e->Release();
  2945. }
  2946. catch(...)
  2947. {
  2948. CTXLOG("Unknown exception trying to update dali");
  2949. }
  2950. }
  2951. }
  2952. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  2953. {
  2954. static char hexchar[] = "0123456789ABCDEF";
  2955. if (isSpecialResultSequence(sequence))
  2956. {
  2957. StringBuffer s;
  2958. const byte *field = (const byte *) data;
  2959. for (int i = 0; i < len; i++)
  2960. s.append(hexchar[field[i] >> 4]).append(hexchar[field[i] & 0x0f]);
  2961. CriticalBlock b(contextCrit);
  2962. IPropertyTree &ctx = useContext(sequence);
  2963. ctx.setProp(name, s.str());
  2964. }
  2965. else if (results)
  2966. {
  2967. results->setResultData(name, sequence, len, data);
  2968. }
  2969. if (workUnit)
  2970. {
  2971. try
  2972. {
  2973. WorkunitUpdate wu(&workUnit->lock());
  2974. CriticalBlock b(daliUpdateCrit);
  2975. wu->setResultData(name, sequence, len, data);
  2976. }
  2977. catch(IException *e)
  2978. {
  2979. StringBuffer text;
  2980. e->errorMessage(text);
  2981. CTXLOG("Error trying to update dali: %s", text.str());
  2982. e->Release();
  2983. }
  2984. catch(...)
  2985. {
  2986. CTXLOG("Unknown exception trying to update dali");
  2987. }
  2988. }
  2989. }
  2990. virtual void appendResultDeserialized(const char *name, unsigned sequence, size32_t count, rowset_t data, bool extend, IOutputMetaData *meta) override
  2991. {
  2992. CriticalBlock b(contextCrit);
  2993. IPropertyTree &ctx = useContext(sequence);
  2994. IDeserializedResultStore &resultStore = useResultStore(sequence);
  2995. IPropertyTree *val = ctx.queryPropTree(name);
  2996. if (extend && val)
  2997. {
  2998. int oldId = val->getPropInt("@id", -1);
  2999. const char * oldFormat = val->queryProp("@format");
  3000. assertex(oldId != -1);
  3001. assertex(oldFormat && strcmp(oldFormat, "deserialized")==0);
  3002. size32_t oldCount;
  3003. rowset_t oldData;
  3004. resultStore.queryResult(oldId, oldCount, oldData);
  3005. Owned<IEngineRowAllocator> allocator = getRowAllocator(meta, 0);
  3006. RtlLinkedDatasetBuilder builder(allocator);
  3007. builder.appendRows(oldCount, oldData);
  3008. builder.appendRows(count, data);
  3009. rtlReleaseRowset(count, data);
  3010. val->setPropInt("@id", resultStore.addResult(builder.getcount(), builder.linkrows(), meta));
  3011. }
  3012. else
  3013. {
  3014. if (!val)
  3015. val = ctx.addPropTree(name, createPTree(ipt_fast));
  3016. val->setProp("@format", "deserialized");
  3017. val->setPropInt("@id", resultStore.addResult(count, data, meta));
  3018. }
  3019. }
  3020. virtual void appendResultRawContext(const char *name, unsigned sequence, int len, const void * data, int numRows, bool extend, bool saveInContext)
  3021. {
  3022. if (saveInContext)
  3023. {
  3024. CriticalBlock b(contextCrit);
  3025. IPropertyTree &ctx = useContext(sequence);
  3026. ctx.appendPropBin(name, len, data);
  3027. ctx.queryPropTree(name)->setProp("@format", "raw");
  3028. }
  3029. if (workUnit)
  3030. {
  3031. try
  3032. {
  3033. WorkunitUpdate wu(&workUnit->lock());
  3034. CriticalBlock b(daliUpdateCrit);
  3035. wu->setResultDataset(name, sequence, len, data, numRows, extend);
  3036. }
  3037. catch(IException *e)
  3038. {
  3039. StringBuffer text;
  3040. e->errorMessage(text);
  3041. CTXLOG("Error trying to update dali: %s", text.str());
  3042. e->Release();
  3043. }
  3044. catch(...)
  3045. {
  3046. CTXLOG("Unknown exception trying to update dali");
  3047. }
  3048. }
  3049. }
  3050. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  3051. {
  3052. if (isSpecialResultSequence(sequence))
  3053. {
  3054. CriticalBlock b(contextCrit);
  3055. IPropertyTree &ctx = useContext(sequence);
  3056. ctx.setPropBin(name, len, data);
  3057. ctx.queryPropTree(name)->setProp("@format", "raw");
  3058. }
  3059. else if (results)
  3060. {
  3061. results->setResultRaw(name, sequence, len, data);
  3062. }
  3063. if (workUnit)
  3064. {
  3065. try
  3066. {
  3067. WorkunitUpdate wu(&workUnit->lock());
  3068. CriticalBlock b(daliUpdateCrit);
  3069. wu->setResultRaw(name, sequence, len, data);
  3070. }
  3071. catch(IException *e)
  3072. {
  3073. StringBuffer text;
  3074. e->errorMessage(text);
  3075. CTXLOG("Error trying to update dali: %s", text.str());
  3076. e->Release();
  3077. }
  3078. catch(...)
  3079. {
  3080. CTXLOG("Unknown exception trying to update dali");
  3081. }
  3082. }
  3083. }
  3084. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  3085. {
  3086. if (isSpecialResultSequence(sequence))
  3087. {
  3088. CriticalBlock b(contextCrit);
  3089. IPropertyTree &ctx = useContext(sequence);
  3090. ctx.setPropBin(name, len, data);
  3091. ctx.queryPropTree(name)->setProp("@format", "raw");
  3092. ctx.queryPropTree(name)->setPropBool("@isAll", isAll);
  3093. }
  3094. else if (results)
  3095. {
  3096. results->setResultSet(name, sequence, isAll, len, data, transformer);
  3097. }
  3098. if (workUnit)
  3099. {
  3100. try
  3101. {
  3102. WorkunitUpdate wu(&workUnit->lock());
  3103. CriticalBlock b(daliUpdateCrit);
  3104. wu->setResultSet(name, sequence, isAll, len, data, transformer);
  3105. }
  3106. catch(IException *e)
  3107. {
  3108. StringBuffer text;
  3109. e->errorMessage(text);
  3110. CTXLOG("Error trying to update dali: %s", text.str());
  3111. e->Release();
  3112. }
  3113. catch(...)
  3114. {
  3115. CTXLOG("Unknown exception trying to update dali");
  3116. }
  3117. }
  3118. }
  3119. virtual void setResultXml(const char *name, unsigned sequence, const char *xml)
  3120. {
  3121. CriticalBlock b(contextCrit);
  3122. useContext(sequence).setPropTree(name, createPTreeFromXMLString(xml, ipt_caseInsensitive|ipt_fast));
  3123. }
  3124. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  3125. {
  3126. if (isSpecialResultSequence(sequence))
  3127. {
  3128. MemoryBuffer m;
  3129. serializeFixedData(len, val, m);
  3130. CriticalBlock b(contextCrit);
  3131. useContext(sequence).setPropBin(name, m.length(), m.toByteArray());
  3132. }
  3133. else if (results)
  3134. {
  3135. results->setResultDecimal(name, sequence, len, precision, isSigned, val);
  3136. }
  3137. if (workUnit)
  3138. {
  3139. try
  3140. {
  3141. WorkunitUpdate wu(&workUnit->lock());
  3142. CriticalBlock b(daliUpdateCrit);
  3143. wu->setResultDecimal(name, sequence, len, precision, isSigned, val);
  3144. }
  3145. catch(IException *e)
  3146. {
  3147. StringBuffer text;
  3148. e->errorMessage(text);
  3149. CTXLOG("Error trying to update dali: %s", text.str());
  3150. e->Release();
  3151. }
  3152. catch(...)
  3153. {
  3154. CTXLOG("Unknown exception trying to update dali");
  3155. }
  3156. }
  3157. }
  3158. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size)
  3159. {
  3160. if (isSpecialResultSequence(sequence))
  3161. {
  3162. CriticalBlock b(contextCrit);
  3163. useContext(sequence).setPropInt64(name, value);
  3164. }
  3165. else if (results)
  3166. {
  3167. results->setResultInt(name, sequence, value, size);
  3168. }
  3169. if (workUnit)
  3170. {
  3171. try
  3172. {
  3173. WorkunitUpdate wu(&workUnit->lock());
  3174. CriticalBlock b(daliUpdateCrit);
  3175. wu->setResultInt(name, sequence, value);
  3176. }
  3177. catch(IException *e)
  3178. {
  3179. StringBuffer text;
  3180. e->errorMessage(text);
  3181. CTXLOG("Error trying to update dali: %s", text.str());
  3182. e->Release();
  3183. }
  3184. catch(...)
  3185. {
  3186. CTXLOG("Unknown exception trying to update dali");
  3187. }
  3188. }
  3189. }
  3190. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size)
  3191. {
  3192. if (isSpecialResultSequence(sequence))
  3193. {
  3194. CriticalBlock b(contextCrit);
  3195. useContext(sequence).setPropInt64(name, value);
  3196. }
  3197. else if (results)
  3198. {
  3199. results->setResultUInt(name, sequence, value, size);
  3200. }
  3201. if (workUnit)
  3202. {
  3203. try
  3204. {
  3205. WorkunitUpdate wu(&workUnit->lock());
  3206. CriticalBlock b(daliUpdateCrit);
  3207. wu->setResultUInt(name, sequence, value);
  3208. }
  3209. catch(IException *e)
  3210. {
  3211. StringBuffer text;
  3212. e->errorMessage(text);
  3213. CTXLOG("Error trying to update dali: %s", text.str());
  3214. e->Release();
  3215. }
  3216. catch(...)
  3217. {
  3218. CTXLOG("Unknown exception trying to update dali");
  3219. }
  3220. }
  3221. }
  3222. virtual void setResultReal(const char *name, unsigned sequence, double value)
  3223. {
  3224. if (isSpecialResultSequence(sequence))
  3225. {
  3226. CriticalBlock b(contextCrit);
  3227. useContext(sequence).setPropBin(name, sizeof(value), &value);
  3228. }
  3229. else if (results)
  3230. {
  3231. results->setResultReal(name, sequence, value);
  3232. }
  3233. if (workUnit)
  3234. {
  3235. try
  3236. {
  3237. WorkunitUpdate wu(&workUnit->lock());
  3238. CriticalBlock b(daliUpdateCrit);
  3239. wu->setResultReal(name, sequence, value);
  3240. }
  3241. catch(IException *e)
  3242. {
  3243. StringBuffer text;
  3244. e->errorMessage(text);
  3245. CTXLOG("Error trying to update dali: %s", text.str());
  3246. e->Release();
  3247. }
  3248. catch(...)
  3249. {
  3250. CTXLOG("Unknown exception trying to update dali");
  3251. }
  3252. }
  3253. }
  3254. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  3255. {
  3256. if (isSpecialResultSequence(sequence))
  3257. {
  3258. CriticalBlock b(contextCrit);
  3259. useContext(sequence).setPropBin(name, len, str);
  3260. }
  3261. else if (results)
  3262. {
  3263. results->setResultString(name, sequence, len, str);
  3264. }
  3265. if (workUnit)
  3266. {
  3267. try
  3268. {
  3269. WorkunitUpdate wu(&workUnit->lock());
  3270. CriticalBlock b(daliUpdateCrit);
  3271. wu->setResultString(name, sequence, len, str);
  3272. }
  3273. catch(IException *e)
  3274. {
  3275. StringBuffer text;
  3276. e->errorMessage(text);
  3277. CTXLOG("Error trying to update dali: %s", text.str());
  3278. e->Release();
  3279. }
  3280. catch(...)
  3281. {
  3282. CTXLOG("Unknown exception trying to update dali");
  3283. }
  3284. }
  3285. }
  3286. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  3287. {
  3288. if (isSpecialResultSequence(sequence))
  3289. {
  3290. rtlDataAttr buff;
  3291. unsigned bufflen = 0;
  3292. rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
  3293. CriticalBlock b(contextCrit);
  3294. useContext(sequence).setPropBin(name, bufflen, buff.getstr());
  3295. }
  3296. else if (results)
  3297. {
  3298. results->setResultUnicode(name, sequence, len, str);
  3299. }
  3300. if (workUnit)
  3301. {
  3302. try
  3303. {
  3304. WorkunitUpdate wu(&workUnit->lock());
  3305. CriticalBlock b(daliUpdateCrit);
  3306. wu->setResultUnicode(name, sequence, len, str);
  3307. }
  3308. catch(IException *e)
  3309. {
  3310. StringBuffer text;
  3311. e->errorMessage(text);
  3312. CTXLOG("Error trying to update dali: %s", text.str());
  3313. e->Release();
  3314. }
  3315. catch(...)
  3316. {
  3317. CTXLOG("Unknown exception trying to update dali");
  3318. }
  3319. }
  3320. }
  3321. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  3322. {
  3323. setResultString(name, sequence, strlen(value), value);
  3324. }
  3325. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  3326. {
  3327. setResultUnicode(name, sequence, rtlUnicodeStrlen(value), value);
  3328. }
  3329. virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *data, unsigned numRows, bool extend)
  3330. {
  3331. appendResultRawContext(name, sequence, len, data, numRows, extend, true);
  3332. }
  3333. virtual IWorkUnitRowReader *createStreamedRawRowReader(IEngineRowAllocator *rowAllocator, bool isGrouped, const char *id)
  3334. {
  3335. if (!nativeProtocol)
  3336. throwUnexpected();
  3337. return new StreamedRawDataReader(this, rowAllocator, isGrouped, logctx, *nativeProtocol->querySafeSocket(), id);
  3338. }
  3339. virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence)
  3340. {
  3341. CriticalBlock b(contextCrit);
  3342. IPropertyTree &tree = useContext(sequence);
  3343. if (name)
  3344. {
  3345. const char *val = tree.queryProp(name);
  3346. if (val)
  3347. output->outputCString(val, name);
  3348. }
  3349. else
  3350. {
  3351. StringBuffer hack;
  3352. toXML(&tree, hack);
  3353. output->outputString(0, NULL, NULL); // Hack upon hack...
  3354. output->outputQuoted(hack.str());
  3355. }
  3356. }
  3357. virtual const IResolvedFile *resolveLFN(const char *fileName, bool isOpt, bool isPrivilegedUser)
  3358. {
  3359. CriticalBlock b(contextCrit);
  3360. StringBuffer expandedName;
  3361. expandLogicalFilename(expandedName, fileName, workUnit, false, false);
  3362. Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
  3363. if (!ret)
  3364. {
  3365. ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit, false, isPrivilegedUser));
  3366. if (ret)
  3367. {
  3368. IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());
  3369. fileCache.setValue(expandedName, add);
  3370. }
  3371. }
  3372. return ret.getClear();
  3373. }
  3374. virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters, bool isPrivilegedUser)
  3375. {
  3376. return factory->queryPackage().createFileName(filename, overwrite, extend, clusters, workUnit, isPrivilegedUser);
  3377. }
  3378. virtual void endGraph(unsigned __int64 startTimeStamp, cycle_t startCycles, bool aborting) override
  3379. {
  3380. fileCache.kill();
  3381. CRoxieContextBase::endGraph(startTimeStamp, startCycles, aborting);
  3382. }
  3383. virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal, bool isPrivilegedUser)
  3384. {
  3385. Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt, isPrivilegedUser);
  3386. if (dFile)
  3387. {
  3388. MemoryBuffer mb;
  3389. mb.append(sizeof(RoxiePacketHeader), &header);
  3390. mb.append(lfn);
  3391. dFile->serializePartial(mb, header.channel, isLocal);
  3392. ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
  3393. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  3394. reply->queryHeader().retries = 0;
  3395. ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
  3396. return;
  3397. }
  3398. ROQ->sendAbortCallback(header, lfn, *this);
  3399. throwUnexpected();
  3400. }
  3401. IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence)
  3402. {
  3403. Owned <IRoxieDaliHelper> daliHelper = connectToDali();
  3404. if (daliHelper && daliHelper->connected())
  3405. {
  3406. Owned<IConstWorkUnit> externalWU = daliHelper->attachWorkunit(wuid, NULL);
  3407. if (externalWU)
  3408. {
  3409. externalWU->remoteCheckAccess(queryUserDescriptor(), false);
  3410. return getWorkUnitResult(externalWU, name, sequence);
  3411. }
  3412. else
  3413. {
  3414. throw MakeStringException(0, "Missing or invalid workunit name %s in getExternalResult()", nullText(wuid));
  3415. }
  3416. }
  3417. else
  3418. throw MakeStringException(ROXIE_DALI_ERROR, "WorkUnit read: no dali connection available");
  3419. }
  3420. unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
  3421. {
  3422. Owned<IConstWUResult> r = getExternalResult(wuid, name, sequence);
  3423. if (!r)
  3424. throw MakeStringException(0, "Failed to retrieve hash value %s from workunit %s", name, wuid);
  3425. return r->getResultHash();
  3426. }
  3427. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  3428. {
  3429. UNIMPLEMENTED;
  3430. }
  3431. virtual void addWuException(const char * text, unsigned code, unsigned _severity, const char * source)
  3432. {
  3433. addWuExceptionEx(text, code, _severity, MSGAUD_operator, source);
  3434. }
  3435. virtual void addWuExceptionEx(const char * text, unsigned code, unsigned _severity, unsigned audience, const char * source)
  3436. {
  3437. ErrorSeverity severity = (ErrorSeverity) _severity;
  3438. CTXLOG("%s", text);
  3439. if (severity > SeverityInformation)
  3440. LOG(mapToLogMsgCategory(severity, (MessageAudience)audience), "%d - %s", code, text);
  3441. if (workUnit)
  3442. {
  3443. WorkunitUpdate wu(&workUnit->lock());
  3444. addExceptionToWorkunit(wu, severity, source, code, text, NULL, 0 ,0, 0);
  3445. }
  3446. }
  3447. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  3448. {
  3449. CTXLOG("%s", text);
  3450. OERRLOG("%d - %s", code, text);
  3451. if (workUnit)
  3452. {
  3453. WorkunitUpdate wu(&workUnit->lock());
  3454. addExceptionToWorkunit(wu, SeverityError, "user", code, text, filename, lineno, column, 0);
  3455. }
  3456. if (isAbort)
  3457. throw makeStringException(MSGAUD_user, code, text);
  3458. }
  3459. IUserDescriptor *queryUserDescriptor()
  3460. {
  3461. if (workUnit)
  3462. return workUnit->queryUserDescriptor();//ad-hoc mode
  3463. else
  3464. {
  3465. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  3466. if (daliHelper)
  3467. return daliHelper->queryUserDescriptor();//predeployed query mode
  3468. }
  3469. return NULL;
  3470. }
  3471. virtual bool isResult(const char * name, unsigned sequence)
  3472. {
  3473. CriticalBlock b(contextCrit);
  3474. return useContext(sequence).hasProp(name);
  3475. }
  3476. virtual char *getClusterName()
  3477. {
  3478. if (workUnit)
  3479. {
  3480. return strdup(workUnit->queryClusterName());
  3481. }
  3482. else
  3483. {
  3484. // predeployed queries with no workunit should return the querySet name
  3485. return strdup(querySetName.str()); // StringAttr::str() will return "" rather than NULL
  3486. }
  3487. }
  3488. virtual char *getGroupName()
  3489. {
  3490. StringBuffer groupName;
  3491. if (workUnit && clusterNames.length())
  3492. {
  3493. const char * cluster = clusterNames.tos();
  3494. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  3495. if (!clusterInfo)
  3496. throw MakeStringException(-1, "Unknown cluster '%s'", cluster);
  3497. const StringArray &thors = clusterInfo->getThorProcesses();
  3498. if (thors.length())
  3499. {
  3500. StringArray envClusters, envGroups, envTargets, envQueues;
  3501. getEnvironmentThorClusterNames(envClusters, envGroups, envTargets, envQueues);
  3502. ForEachItemIn(i, thors)
  3503. {
  3504. const char *thorName = thors.item(i);
  3505. ForEachItemIn(j, envClusters)
  3506. {
  3507. if (strieq(thorName, envClusters.item(j)))
  3508. {
  3509. const char *envGroup = envGroups.item(j);
  3510. if (groupName.length())
  3511. {
  3512. if (!strieq(groupName, envGroup))
  3513. throw MakeStringException(-1, "getGroupName(): ambiguous groups %s, %s", groupName.str(), envGroup);
  3514. }
  3515. else
  3516. groupName.append(envGroup);
  3517. break;
  3518. }
  3519. }
  3520. }
  3521. }
  3522. else
  3523. {
  3524. StringBufferAdaptor a(groupName);
  3525. clusterInfo->getRoxieProcess(a);
  3526. }
  3527. }
  3528. return groupName.detach();
  3529. }
  3530. virtual char *queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
  3531. virtual char *getEnv(const char *name, const char *defaultValue) const
  3532. {
  3533. return serverQueryFactory->getEnv(name, defaultValue);
  3534. }
  3535. virtual char *getJobName()
  3536. {
  3537. if (workUnit)
  3538. {
  3539. return strdup(workUnit->queryJobName());
  3540. }
  3541. return strdup(factory->queryQueryName());
  3542. }
  3543. virtual char *getJobOwner()
  3544. {
  3545. if (workUnit)
  3546. {
  3547. return strdup(workUnit->queryUser());
  3548. }
  3549. return strdup("");
  3550. }
  3551. virtual char *getPlatform()
  3552. {
  3553. if (clusterNames.length())
  3554. {
  3555. const char * cluster = clusterNames.tos();
  3556. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  3557. if (!clusterInfo)
  3558. throw MakeStringException(-1, "Unknown Cluster '%s'", cluster);
  3559. return strdup(clusterTypeString(clusterInfo->getPlatform(), false));
  3560. }
  3561. else
  3562. return strdup("roxie");
  3563. }
  3564. virtual char *getWuid()
  3565. {
  3566. if (workUnit)
  3567. {
  3568. return strdup(workUnit->queryWuid());
  3569. }
  3570. else
  3571. {
  3572. return strdup("");
  3573. }
  3574. }
  3575. // persist-related code
  3576. virtual char * getExpandLogicalName(const char * logicalName)
  3577. {
  3578. StringBuffer lfn;
  3579. expandLogicalFilename(lfn, logicalName, workUnit, false, false);
  3580. return lfn.detach();
  3581. }
  3582. virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }
  3583. virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
  3584. {
  3585. if (workflow)
  3586. workflow->returnPersistVersion(logicalName, eclCRC, allCRC, isFile);
  3587. }
  3588. virtual void fail(int code, const char *text)
  3589. {
  3590. addWuExceptionEx(text, code, 2, MSGAUD_user, "user");
  3591. }
  3592. virtual unsigned getWorkflowId() { return workflow->queryCurrentWfid(); }
  3593. void doNotify(char const * name, char const * text)
  3594. {
  3595. doNotify(name, text, NULL);
  3596. }
  3597. void doNotify(char const * name, char const * text, const char * target)
  3598. {
  3599. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  3600. if (daliHelper && daliHelper->connected())
  3601. {
  3602. Owned<IScheduleEventPusher> pusher(getScheduleEventPusher());
  3603. pusher->push(name, text, target);
  3604. }
  3605. else
  3606. throw MakeStringException(ROXIE_DALI_ERROR, "doNotify: no dali connection available");
  3607. }
  3608. static unsigned __int64 crcLogicalFileTime(IDistributedFile * file, unsigned __int64 crc, const char * filename)
  3609. {
  3610. CDateTime dt;
  3611. file->getModificationTime(dt);
  3612. unsigned __int64 modifiedTime = dt.getSimple();
  3613. return rtlHash64Data(sizeof(modifiedTime), &modifiedTime, crc);
  3614. }
  3615. virtual unsigned __int64 getDatasetHash(const char * logicalName, unsigned __int64 crc)
  3616. {
  3617. StringBuffer fullname;
  3618. expandLogicalFilename(fullname, logicalName, workUnit, false, false);
  3619. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(fullname.str(),queryUserDescriptor(),false,false,false,nullptr,defaultPrivilegedUser);
  3620. if (file)
  3621. {
  3622. WorkunitUpdate wu = updateWorkUnit();
  3623. wu->noteFileRead(file);
  3624. IDistributedSuperFile * super = file->querySuperFile();
  3625. if (super)
  3626. {
  3627. Owned<IDistributedFileIterator> iter = super->getSubFileIterator(true);
  3628. ForEach(*iter)
  3629. {
  3630. IDistributedFile & cur = iter->query();
  3631. const char * name = cur.queryLogicalName();
  3632. crc = rtlHash64Data(strlen(name), name, crc);
  3633. crc = crcLogicalFileTime(&cur, crc, name);
  3634. }
  3635. }
  3636. else
  3637. crc = crcLogicalFileTime(file, crc, fullname.str());
  3638. }
  3639. return crc;
  3640. }
  3641. virtual int queryLastFailCode()
  3642. {
  3643. if(!workflow)
  3644. return 0;
  3645. return workflow->queryLastFailCode();
  3646. }
  3647. virtual void getLastFailMessage(size32_t & outLen, char * &outStr, const char * tag)
  3648. {
  3649. const char * text = "";
  3650. if(workflow)
  3651. text = workflow->queryLastFailMessage();
  3652. rtlExceptionExtract(outLen, outStr, text, tag);
  3653. }
  3654. virtual void getEventName(size32_t & outLen, char * & outStr)
  3655. {
  3656. const char * text = "";
  3657. if(workflow)
  3658. text = workflow->queryEventName();
  3659. rtlExtractTag(outLen, outStr, text, NULL, "Event");
  3660. }
  3661. virtual void getEventExtra(size32_t & outLen, char * & outStr, const char * tag)
  3662. {
  3663. const char * text = "";
  3664. if(workflow)
  3665. text = workflow->queryEventExtra();
  3666. rtlExtractTag(outLen, outStr, text, tag, "Event");
  3667. }
  3668. virtual bool fileExists(const char * filename) { throwUnexpected(); }
  3669. virtual void deleteFile(const char * logicalName) { throwUnexpected(); }
  3670. virtual unsigned getNodes()
  3671. {
  3672. if (clusterNames.length())
  3673. {
  3674. if (clusterWidth == -1)
  3675. {
  3676. const char * cluster = clusterNames.tos();
  3677. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  3678. if (!clusterInfo)
  3679. throw MakeStringException(-1, "Unknown cluster '%s'", cluster);
  3680. if (clusterInfo->getPlatform() == RoxieCluster)
  3681. clusterWidth = numChannels; // We assume it's the current roxie - that's ok so long as roxie's don't call other roxies.
  3682. else
  3683. clusterWidth = clusterInfo->getSize();
  3684. }
  3685. return clusterWidth;
  3686. }
  3687. else
  3688. return numChannels;
  3689. }
  3690. virtual unsigned getNodeNum() { return 0; }
  3691. virtual char *getFilePart(const char *logicalPart, bool create=false) { UNIMPLEMENTED; }
  3692. virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
  3693. virtual IDistributedFileTransaction *querySuperFileTransaction()
  3694. {
  3695. CriticalBlock b(contextCrit);
  3696. if (!superfileTransaction.get())
  3697. superfileTransaction.setown(createDistributedFileTransaction(queryUserDescriptor(), queryCodeContext()));
  3698. return superfileTransaction.get();
  3699. }
  3700. virtual void finalize(unsigned seqNo)
  3701. {
  3702. if (!protocol)
  3703. throwUnexpected();
  3704. protocol->finalize(seqNo);
  3705. }
  3706. virtual unsigned getPriority() const { return options.priority; }
  3707. virtual IConstWorkUnit *queryWorkUnit() const { return workUnit; }
  3708. virtual bool outputResultsToSocket() const { return protocol != NULL; }
  3709. virtual void selectCluster(const char * newCluster)
  3710. {
  3711. if (workUnit)
  3712. {
  3713. const char *oldCluster = workUnit->queryClusterName();
  3714. SCMStringBuffer bStr;
  3715. ClusterType targetClusterType = getClusterType(workUnit->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
  3716. if (targetClusterType==RoxieCluster)
  3717. {
  3718. if (!streq(oldCluster, newCluster))
  3719. throw MakeStringException(-1, "Error - cannot switch cluster if not targetting thor jobs");
  3720. }
  3721. clusterNames.append(oldCluster);
  3722. WorkunitUpdate wu = updateWorkUnit();
  3723. if (wu)
  3724. wu->setClusterName(newCluster);
  3725. clusterWidth = -1;
  3726. }
  3727. }
  3728. virtual void restoreCluster()
  3729. {
  3730. WorkunitUpdate wu = updateWorkUnit();
  3731. if (wu)
  3732. wu->setClusterName(clusterNames.item(clusterNames.length()-1));
  3733. clusterNames.pop();
  3734. clusterWidth = -1;
  3735. }
  3736. };
  3737. //================================================================================================
  3738. class CSoapRoxieServerContext : public CRoxieServerContext
  3739. {
  3740. private:
  3741. StringAttr queryName;
  3742. public:
  3743. CSoapRoxieServerContext(IPropertyTree *_context, IHpccProtocolResponse *_protocol, const IQueryFactory *_factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *_querySetName)
  3744. : CRoxieServerContext(_context, _protocol, _factory, flags, _logctx, xmlReadFlags, _querySetName)
  3745. {
  3746. queryName.set(_context->queryName());
  3747. }
  3748. virtual void process()
  3749. {
  3750. EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
  3751. Owned<IEclProcess> p = pf();
  3752. if (workflow)
  3753. workflow->perform(this, p);
  3754. else
  3755. p->perform(this, 0);
  3756. }
  3757. };
  3758. IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, IHpccProtocolResponse *protocol, const IQueryFactory *factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions readFlags, const char *querySetName)
  3759. {
  3760. if (flags & HPCC_PROTOCOL_NATIVE)
  3761. return new CRoxieServerContext(context, protocol, factory, flags, _logctx, readFlags, querySetName);
  3762. return new CSoapRoxieServerContext(context, protocol, factory, flags, _logctx, readFlags, querySetName);
  3763. }
  3764. IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx)
  3765. {
  3766. return new CRoxieServerContext(factory, _logctx);
  3767. }
  3768. IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &_logctx)
  3769. {
  3770. return new CRoxieServerContext(wu, factory, _logctx);
  3771. }