ccddebug.cpp 59 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "ccd.hpp"
  16. #include "ccdcontext.hpp"
  17. #include "ccddali.hpp"
  18. #include "ccdquery.hpp"
  19. #include "ccdqueue.ipp"
  20. #include "ccdsnmp.hpp"
  21. #include "ccdstate.hpp"
  22. #include "thorstrand.hpp"
  23. using roxiemem::IRowManager;
  24. //=======================================================================================================================
  25. class InputProbe : implements IEngineRowStream, implements IFinalRoxieInput, implements IRoxieProbe, public CInterface // base class for the edge probes used for tracing and debugging....
  26. {
  27. protected:
  28. IFinalRoxieInput *in;
  29. IEngineRowStream *inStream;
  30. unsigned sourceId;
  31. unsigned sourceIdx;
  32. unsigned targetId;
  33. unsigned targetIdx;
  34. unsigned iteration;
  35. unsigned channel;
  36. IOutputMetaData *inMeta;
  37. IDebuggableContext *debugContext;
  38. unsigned rowCount;
  39. unsigned totalRowCount;
  40. size32_t maxRowSize;
  41. bool everStarted;
  42. bool hasStarted;
  43. bool hasStopped;
  44. public:
  45. IMPLEMENT_IINTERFACE
  46. InputProbe(IFinalRoxieInput *_in, IDebuggableContext *_debugContext,
  47. unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
  48. : in(_in), debugContext(_debugContext),
  49. sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
  50. {
  51. inStream = NULL;
  52. hasStarted = false;
  53. everStarted = false;
  54. hasStopped = false;
  55. rowCount = 0;
  56. totalRowCount = 0;
  57. maxRowSize = 0;
  58. inMeta = NULL;
  59. }
  60. virtual IInputSteppingMeta * querySteppingMeta()
  61. {
  62. return in->querySteppingMeta();
  63. }
  64. virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector)
  65. {
  66. return in->gatherConjunctions(collector);
  67. }
  68. virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
  69. {
  70. assertex (!idx);
  71. //Need to call
  72. //extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered)
  73. PointerArrayOf<IEngineRowStream> instreams;
  74. Owned<IStrandJunction> junction = in->getOutputStreams(ctx, sourceIdx, instreams, NULL, consumerOrdered, nullptr);
  75. //MORE: This needs to create a junction if instreams > 1
  76. // We forced to single, so should not be getting anything but a single stream back
  77. assertex(junction==NULL);
  78. assertex(instreams.length()==1);
  79. inStream = instreams.item(0);
  80. // Return a single stream too...
  81. streams.append(this);
  82. return NULL;
  83. }
  84. virtual void resetEOF()
  85. {
  86. inStream->resetEOF();
  87. }
  88. virtual unsigned numConcreteOutputs() const
  89. {
  90. return in->numConcreteOutputs();
  91. }
  92. virtual IFinalRoxieInput * queryConcreteInput(unsigned idx)
  93. {
  94. // MORE - not sure what is right here!
  95. if (in->queryConcreteInput(idx) == in)
  96. {
  97. assertex(idx==0);
  98. return this;
  99. }
  100. else
  101. return in->queryConcreteInput(idx);
  102. }
  103. virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
  104. {
  105. return this;
  106. }
  107. virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
  108. {
  109. return nullptr;
  110. }
  111. virtual IRoxieServerActivity *queryActivity()
  112. {
  113. return in->queryActivity();
  114. }
  115. virtual IIndexReadActivityInfo *queryIndexReadActivity()
  116. {
  117. return in->queryIndexReadActivity();
  118. }
  119. virtual IOutputMetaData * queryOutputMeta() const
  120. {
  121. return in->queryOutputMeta();
  122. }
  123. IInputBase &queryInput()
  124. {
  125. return *this;
  126. }
  127. virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
  128. {
  129. // NOTE: totalRowCount/maxRowSize not reset, as we want them cumulative when working in a child query.
  130. rowCount = 0;
  131. hasStarted = true;
  132. hasStopped = false;
  133. everStarted = true;
  134. in->start(parentExtractSize, parentExtract, paused);
  135. inMeta = in->queryOutputMeta();
  136. assertex(inMeta);
  137. }
  138. virtual void stop()
  139. {
  140. hasStopped = true;
  141. inStream->stop();
  142. }
  143. virtual void reset()
  144. {
  145. hasStarted = false;
  146. in->reset();
  147. }
  148. virtual unsigned __int64 queryTotalCycles() const
  149. {
  150. return in->queryTotalCycles();
  151. }
  152. virtual const void *nextRow()
  153. {
  154. const void *ret = inStream->nextRow();
  155. if (ret)
  156. {
  157. size32_t size = inMeta->getRecordSize(ret);
  158. if (size > maxRowSize)
  159. maxRowSize = size;
  160. rowCount++;
  161. totalRowCount++;
  162. }
  163. return ret;
  164. }
  165. virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
  166. {
  167. const void *ret = inStream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  168. if (ret && wasCompleteMatch) // GH is this test right?
  169. {
  170. size32_t size = inMeta->getRecordSize(ret);
  171. if (size > maxRowSize)
  172. maxRowSize = size;
  173. rowCount++;
  174. totalRowCount++;
  175. }
  176. return ret;
  177. }
  178. };
  179. class TraceProbe : public InputProbe
  180. {
  181. public:
  182. TraceProbe(IFinalRoxieInput *_in, unsigned _sourceId, unsigned _targetId, unsigned _sourceIdx, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
  183. : InputProbe(_in, NULL, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel)
  184. {
  185. }
  186. bool matches(IPropertyTree &edge, bool forNode)
  187. {
  188. if (forNode)
  189. {
  190. unsigned id = edge.getPropInt("@id", 0);
  191. if (id && (id == sourceId || id == targetId))
  192. {
  193. return true;
  194. }
  195. }
  196. else
  197. {
  198. unsigned id = edge.getPropInt("@source", 0);
  199. if (id && id == sourceId)
  200. {
  201. id = edge.getPropInt("@target", 0);
  202. if (id && id == targetId)
  203. {
  204. unsigned idx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  205. if (idx == sourceIdx)
  206. return true;
  207. }
  208. }
  209. id = edge.getPropInt("att[@name=\"_sourceActivity\"]/@value");
  210. if (id && id == sourceId)
  211. {
  212. id = edge.getPropInt("att[@name=\"_targetActivity\"]/@value");
  213. if (id && id == targetId)
  214. {
  215. unsigned idx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  216. if (idx == sourceIdx)
  217. return true;
  218. }
  219. }
  220. }
  221. return false;
  222. }
  223. const void * _next(const void *inputRow)
  224. {
  225. const byte *ret = (const byte *) inputRow;
  226. if (ret && probeAllRows)
  227. {
  228. CommonXmlWriter xmlwrite(XWFnoindent|XWFtrim|XWFopt);
  229. if (inMeta && inMeta->hasXML())
  230. inMeta->toXML(ret, xmlwrite);
  231. DBGLOG("ROW: [%d->%d] {%p} %s", sourceId, targetId, ret, xmlwrite.str());
  232. }
  233. return ret;
  234. }
  235. virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
  236. {
  237. // MORE - should probably only note them when wasCompleteMatch is true?
  238. return _next(InputProbe::nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
  239. }
  240. virtual const void *nextRow()
  241. {
  242. return _next(InputProbe::nextRow());
  243. }
  244. void getNodeProgressInfo(IPropertyTree &node)
  245. {
  246. // node is the input (or possibly output) of this probe edge
  247. unsigned started = everStarted;
  248. putStatsValue(&node, "_roxieStarted", "sum", started);
  249. unsigned id = node.getPropInt("@id", 0);
  250. bool isOutput = (id != 0) && (id != sourceId);
  251. unsigned totalTime = (unsigned) (cycle_to_nanosec(in->queryTotalCycles())/1000);
  252. if (isOutput)
  253. totalTime += 10; // Fudge factor - I don't really know the times but this makes the graph more useable than not supplying a totalTime value
  254. if (totalTime)
  255. putStatsValue(&node, "totalTime", "sum", totalTime);
  256. unsigned localTime = isOutput ? 10 : (unsigned) (cycle_to_nanosec(in->queryActivity()->queryLocalCycles())/1000); // Fudge factor - I don't really know the times but this makes the graph more useable than not supplying a localTime value
  257. if (localTime)
  258. putStatsValue(&node, "localTime", "sum", localTime);
  259. }
  260. void getEdgeProgressInfo(IPropertyTree &edge)
  261. {
  262. putStatsValue(&edge, "_roxieStarted", "sum", hasStarted);
  263. if (hasStarted)
  264. {
  265. putStatsValue(&edge, "count", "sum", totalRowCount);
  266. putStatsValue(&edge, "maxrowsize", "max", maxRowSize);
  267. }
  268. }
  269. };
  270. class CProbeManager : implements IProbeManager, public CInterface
  271. {
  272. IArrayOf<IFinalRoxieInput> probes; // May want to replace with hash table at some point....
  273. public:
  274. IMPLEMENT_IINTERFACE;
  275. IRoxieProbe *createProbe(IInputBase *in, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
  276. {
  277. unsigned idIn = inAct->queryId();
  278. unsigned idOut = outAct->queryId();
  279. TraceProbe *probe = new TraceProbe(static_cast<IFinalRoxieInput*>(in), idIn, idOut, sourceIdx, targetIdx, iteration, 0);
  280. probes.append(*probe);
  281. return probe;
  282. }
  283. TraceProbe *findProbe(IPropertyTree &edge, bool forNode, unsigned &startat)
  284. {
  285. // MORE - this is n-squared on number of edges in the graph. Could get painful - recode if needed
  286. // However I think that the "startat" cache probably prevents the pain
  287. unsigned probeCount = probes.ordinality();
  288. unsigned search = probeCount;
  289. unsigned idx = startat;
  290. while (search--)
  291. {
  292. idx++;
  293. if (idx>=probeCount) idx = 0;
  294. TraceProbe &p = static_cast<TraceProbe &> (probes.item(idx));
  295. if (p.matches(edge, forNode))
  296. {
  297. startat = idx;
  298. return &p;
  299. }
  300. }
  301. return NULL;
  302. }
  303. virtual void noteSink(IActivityBase *)
  304. {
  305. }
  306. virtual IDebugGraphManager *queryDebugManager()
  307. {
  308. return NULL;
  309. }
  310. virtual void noteDependency(IActivityBase *sourceActivity, unsigned sourceIndex, unsigned controlId, const char *edgeId, IActivityBase *targetActivity)
  311. {
  312. }
  313. virtual IProbeManager *startChildGraph(unsigned childGraphId, IActivityBase *parent)
  314. {
  315. return LINK(this);
  316. }
  317. virtual void endChildGraph(IProbeManager *child, IActivityBase *parent)
  318. {
  319. }
  320. virtual void deleteGraph(IArrayOf<IActivityBase> *activities, IArrayOf<IInputBase> *goers)
  321. {
  322. if (goers)
  323. {
  324. ForEachItemIn(probeIdx, *goers)
  325. {
  326. TraceProbe &probe = (TraceProbe &) goers->item(probeIdx);
  327. probes.zap(probe);
  328. }
  329. }
  330. }
  331. virtual void setNodeProperty(IActivityBase *node, const char *propName, const char *propVvalue)
  332. {
  333. // MORE - we could note these in probe mode too...
  334. }
  335. virtual void setNodePropertyInt(IActivityBase *node, const char *propName, unsigned __int64 propVvalue)
  336. {
  337. // MORE - we could note these in probe mode too...
  338. }
  339. virtual void getProbeResponse(IPropertyTree *query)
  340. {
  341. Owned<IPropertyTreeIterator> graphs = query->getElements("Graph");
  342. ForEach(*graphs)
  343. {
  344. IPropertyTree &graph = graphs->query();
  345. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("xgmml/graph");
  346. ForEach(*subgraphs)
  347. {
  348. IPropertyTree &subgraph = subgraphs->query();
  349. Owned<IPropertyTreeIterator> nodes = subgraph.getElements(".//node");
  350. unsigned startat = 0;
  351. ForEach(*nodes)
  352. {
  353. IPropertyTree &node = nodes->query();
  354. TraceProbe *currentProbe = findProbe(node, true, startat);
  355. if (currentProbe)
  356. {
  357. currentProbe->getNodeProgressInfo(node);
  358. }
  359. }
  360. Owned<IPropertyTreeIterator> edges = subgraph.getElements(".//edge");
  361. startat = 0;
  362. ForEach(*edges)
  363. {
  364. IPropertyTree &edge = edges->query();
  365. if (edge.getPropInt("att[@name='_dependsOn']/@value", 0) != 0)
  366. {
  367. const char *targetNode = edge.queryProp("att[@name='_targetActivity']/@value");
  368. if (targetNode)
  369. {
  370. StringBuffer xpath;
  371. IPropertyTree *target = query->queryPropTree(xpath.append(".//node[@id='").append(targetNode).append("']"));
  372. if (target)
  373. {
  374. unsigned started = target->getPropInt("att[@name='_roxieStarted']/@value", 0);
  375. IPropertyTree *att = edge.queryPropTree("att[@name=\"_roxieStarted\"]");
  376. if (!att)
  377. {
  378. att = edge.addPropTree("att", createPTree());
  379. att->setProp("@name", "_roxieStarted");
  380. }
  381. else
  382. started += att->getPropInt("@value");
  383. att->setPropInt("@value", started);
  384. }
  385. }
  386. }
  387. else
  388. {
  389. TraceProbe *currentProbe = findProbe(edge, false, startat);
  390. if (currentProbe)
  391. {
  392. currentProbe->getEdgeProgressInfo(edge);
  393. }
  394. else
  395. {
  396. const char *targetNode = edge.queryProp("att[@name='_targetActivity']/@value");
  397. if (targetNode)
  398. {
  399. StringBuffer xpath;
  400. IPropertyTree *target = query->queryPropTree(xpath.append(".//node[@id='").append(targetNode).append("']"));
  401. if (target)
  402. {
  403. unsigned started = target->getPropInt("att[@name='_roxieStarted']/@value", 0);
  404. IPropertyTree *att = edge.queryPropTree("att[@name=\"_roxieStarted\"]");
  405. if (!att)
  406. {
  407. att = edge.addPropTree("att", createPTree());
  408. att->setProp("@name", "_roxieStarted");
  409. }
  410. else
  411. started += att->getPropInt("@value");
  412. att->setPropInt("@value", started);
  413. }
  414. }
  415. }
  416. }
  417. }
  418. }
  419. }
  420. }
  421. };
  422. typedef const IInterface *CIptr;
  423. typedef MapBetween<unsigned, unsigned, CIptr, CIptr> ProxyMap;
  424. static ProxyMap *registeredProxies;
  425. static CriticalSection proxyLock;
  426. static memsize_t nextProxyId = 1;
  427. static memsize_t registerProxyId(const IInterface * object)
  428. {
  429. CriticalBlock b(proxyLock);
  430. if (!registeredProxies)
  431. registeredProxies = new ProxyMap;
  432. registeredProxies->setValue(nextProxyId, object);
  433. return nextProxyId++;
  434. }
  435. static void unregisterProxyId(memsize_t id)
  436. {
  437. // CriticalBlock b(proxyLock); done by caller
  438. if (registeredProxies)
  439. {
  440. registeredProxies->remove(id);
  441. if (!registeredProxies->count())
  442. {
  443. delete registeredProxies;
  444. registeredProxies = NULL;
  445. }
  446. }
  447. }
  448. static const IInterface *getProxy(memsize_t id)
  449. {
  450. CriticalBlock b(proxyLock);
  451. if (registeredProxies)
  452. {
  453. CIptr *proxy = registeredProxies->getValue(id);
  454. if (proxy)
  455. return LINK(*proxy);
  456. }
  457. return NULL;
  458. }
  459. //copied to eclagent, needs to be made common
  460. class DebugProbe : public InputProbe, implements IActivityDebugContext
  461. {
  462. Owned<IGlobalEdgeRecord> edgeRecord;
  463. ICopyArrayOf<IBreakpointInfo> breakpoints;
  464. HistoryRow *history;
  465. unsigned lastSequence;
  466. unsigned historySize;
  467. unsigned historyCapacity;
  468. unsigned nextHistorySlot;
  469. mutable memsize_t proxyId; // MORE - do we need a critsec to protect too?
  470. DebugActivityRecord *sourceAct;
  471. DebugActivityRecord *targetAct;
  472. StringAttr edgeId;
  473. bool forceEOF;
  474. bool EOGseen;
  475. bool EOGsent;
  476. static void putAttributeUInt(IXmlWriter *output, const char *name, unsigned value)
  477. {
  478. output->outputBeginNested("att", false);
  479. output->outputCString(name, "@name");
  480. output->outputInt(value, sizeof(int), "@value");
  481. output->outputEndNested("att");
  482. }
  483. void rowToXML(IXmlWriter *output, const void *row, unsigned sequence, unsigned rowCount, bool skipped, bool limited, bool eof, bool eog) const
  484. {
  485. output->outputBeginNested("Row", true);
  486. output->outputInt(sequence, sizeof(int), "@seq");
  487. if (skipped)
  488. output->outputBool(true, "@skip");
  489. if (limited)
  490. output->outputBool(true, "@limit");
  491. if (eof)
  492. output->outputBool(true, "@eof");
  493. if (eog)
  494. output->outputBool(true, "@eog");
  495. if (row)
  496. {
  497. output->outputInt(rowCount, sizeof(int), "@count");
  498. IOutputMetaData *meta = queryOutputMeta();
  499. output->outputInt(meta->getRecordSize(row), sizeof(int), "@size");
  500. meta->toXML((const byte *) row, *output);
  501. }
  502. output->outputEndNested("Row");
  503. }
  504. public:
  505. DebugProbe(IInputBase *_in, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
  506. : InputProbe(static_cast<IFinalRoxieInput*>(_in), _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
  507. sourceAct(_sourceAct), targetAct(_targetAct)
  508. {
  509. historyCapacity = debugContext->getDefaultHistoryCapacity();
  510. nextHistorySlot = 0;
  511. if (historyCapacity)
  512. history = new HistoryRow [historyCapacity];
  513. else
  514. history = NULL;
  515. historySize = 0;
  516. lastSequence = 0;
  517. StringBuffer idText;
  518. idText.appendf("%d_%d", sourceId, sourceIdx);
  519. edgeRecord.setown(debugContext->getEdgeRecord(idText));
  520. if (iteration || channel)
  521. idText.appendf(".%d", iteration);
  522. if (channel)
  523. idText.appendf("#%d", channel);
  524. edgeId.set(idText);
  525. debugContext->checkDelayedBreakpoints(this);
  526. forceEOF = false;
  527. EOGseen = false;
  528. EOGsent = false;
  529. proxyId = 0;
  530. }
  531. ~DebugProbe()
  532. {
  533. if (history)
  534. {
  535. for (unsigned idx = 0; idx < historyCapacity; idx++)
  536. ReleaseRoxieRow(history[idx].row);
  537. delete [] history;
  538. }
  539. ForEachItemIn(bpIdx, breakpoints)
  540. {
  541. breakpoints.item(bpIdx).removeEdge(*this);
  542. }
  543. }
  544. virtual void Link() const
  545. {
  546. CInterface::Link();
  547. }
  548. virtual bool Release() const
  549. {
  550. CriticalBlock b(proxyLock);
  551. if (!IsShared())
  552. {
  553. if (proxyId)
  554. unregisterProxyId(proxyId);
  555. }
  556. return CInterface::Release();
  557. }
  558. virtual memsize_t queryProxyId() const
  559. {
  560. if (!proxyId)
  561. proxyId = registerProxyId((const IActivityDebugContext *) this);
  562. return proxyId;
  563. }
  564. virtual void resetEOF()
  565. {
  566. forceEOF = false;
  567. EOGseen = false;
  568. EOGsent = false;
  569. InputProbe::resetEOF();
  570. }
  571. virtual const char *queryEdgeId() const
  572. {
  573. return edgeId.get();
  574. }
  575. virtual const char *querySourceId() const
  576. {
  577. UNIMPLEMENTED;
  578. }
  579. virtual void printEdge(IXmlWriter *output, unsigned startRow, unsigned numRows) const
  580. {
  581. output->outputBeginNested("edge", true);
  582. output->outputString(edgeId.length(), edgeId.get(), "@edgeId");
  583. if (startRow < historySize)
  584. {
  585. if (numRows > historySize - startRow)
  586. numRows = historySize - startRow;
  587. while (numRows)
  588. {
  589. IHistoryRow *rowData = queryHistoryRow(startRow+numRows-1);
  590. assertex(rowData);
  591. rowToXML(output, rowData->queryRow(), rowData->querySequence(), rowData->queryRowCount(), rowData->wasSkipped(), rowData->wasLimited(), rowData->wasEof(), rowData->wasEog());
  592. numRows--;
  593. }
  594. }
  595. output->outputEndNested("edge");
  596. }
  597. virtual void searchHistories(IXmlWriter *output, IRowMatcher *matcher, bool fullRows)
  598. {
  599. IOutputMetaData *meta = queryOutputMeta();
  600. bool anyMatchedYet = false;
  601. if (matcher->canMatchAny(meta))
  602. {
  603. for (unsigned i = 0; i < historySize; i++)
  604. {
  605. IHistoryRow *rowData = queryHistoryRow(i);
  606. assertex(rowData);
  607. const void *row = rowData->queryRow();
  608. if (row)
  609. {
  610. matcher->reset();
  611. meta->toXML((const byte *) rowData->queryRow(), *matcher);
  612. if (matcher->matched())
  613. {
  614. if (!anyMatchedYet)
  615. {
  616. output->outputBeginNested("edge", true);
  617. output->outputString(edgeId.length(), edgeId.get(), "@edgeId");
  618. anyMatchedYet = true;
  619. }
  620. if (fullRows)
  621. rowToXML(output, rowData->queryRow(), rowData->querySequence(), rowData->queryRowCount(), rowData->wasSkipped(), rowData->wasLimited(), rowData->wasEof(), rowData->wasEog());
  622. else
  623. {
  624. output->outputBeginNested("Row", true);
  625. output->outputInt(rowData->querySequence(), sizeof(int), "@sequence");
  626. output->outputInt(rowData->queryRowCount(), sizeof(int), "@count");
  627. output->outputEndNested("Row");
  628. }
  629. }
  630. }
  631. }
  632. if (anyMatchedYet)
  633. output->outputEndNested("edge");
  634. }
  635. }
  636. virtual void getXGMML(IXmlWriter *output) const
  637. {
  638. output->outputBeginNested("edge", false);
  639. sourceAct->outputId(output, "@source");
  640. targetAct->outputId(output, "@target");
  641. output->outputString(edgeId.length(), edgeId.get(), "@id");
  642. if (sourceIdx)
  643. putAttributeUInt(output, "_sourceIndex", sourceIdx);
  644. putAttributeUInt(output, "count", rowCount); //changed from totalRowCount
  645. putAttributeUInt(output, "maxRowSize", maxRowSize);
  646. putAttributeUInt(output, "_roxieStarted", everStarted);
  647. putAttributeUInt(output, "_started", hasStarted);
  648. putAttributeUInt(output, "_stopped", hasStopped);
  649. putAttributeUInt(output, "_eofSeen", forceEOF);
  650. if (breakpoints.ordinality())
  651. putAttributeUInt(output, "_breakpoints", breakpoints.ordinality());
  652. output->outputEndNested("edge");
  653. }
  654. virtual IOutputMetaData *queryOutputMeta() const
  655. {
  656. return InputProbe::queryOutputMeta();
  657. }
  658. virtual IActivityDebugContext *queryInputActivity() const
  659. {
  660. IFinalRoxieInput *x = in;
  661. while (x && QUERYINTERFACE(x->queryConcreteInput(0), IActivityDebugContext)==NULL)
  662. x = x->queryConcreteInput(0)->queryActivity()->queryInput(0);
  663. return x ? QUERYINTERFACE(x->queryConcreteInput(0), IActivityDebugContext) : NULL;
  664. }
  665. // NOTE - these functions are threadsafe because only called when query locked by debugger.
  666. // Even though this thread may not yet be blocked on the debugger's critsec, because all manipulation (including setting history rows) is from
  667. // within debugger it is ok.
  668. virtual unsigned queryHistorySize() const
  669. {
  670. return historySize;
  671. }
  672. virtual IHistoryRow *queryHistoryRow(unsigned idx) const
  673. {
  674. assertex(idx < historySize);
  675. int slotNo = nextHistorySlot - idx - 1;
  676. if (slotNo < 0)
  677. slotNo += historyCapacity;
  678. return &history[slotNo];
  679. }
  680. virtual unsigned queryHistoryCapacity() const
  681. {
  682. return historyCapacity;
  683. }
  684. virtual unsigned queryLastSequence() const
  685. {
  686. return lastSequence;
  687. }
  688. virtual IBreakpointInfo *debuggerCallback(unsigned sequence, const void *row)
  689. {
  690. // First put the row into the history buffer...
  691. lastSequence = sequence;
  692. if (historyCapacity)
  693. {
  694. ReleaseClearRoxieRow(history[nextHistorySlot].row);
  695. if (row) LinkRoxieRow(row);
  696. history[nextHistorySlot].sequence = sequence; // MORE - timing might be interesting too, but would need to exclude debug wait time somehow...
  697. history[nextHistorySlot].row = row;
  698. history[nextHistorySlot].rowCount = rowCount;
  699. if (!row)
  700. {
  701. if (forceEOF)
  702. history[nextHistorySlot].setEof();
  703. else
  704. history[nextHistorySlot].setEog();
  705. }
  706. if (historySize < historyCapacity)
  707. historySize++;
  708. nextHistorySlot++;
  709. if (nextHistorySlot==historyCapacity)
  710. nextHistorySlot = 0;
  711. }
  712. // Now check breakpoints...
  713. ForEachItemIn(idx, breakpoints)
  714. {
  715. IBreakpointInfo &bp = breakpoints.item(idx);
  716. if (bp.matches(row, forceEOF, rowCount, queryOutputMeta())) // should optimize to only call queryOutputMeta once - but not that common to have multiple breakpoints
  717. return &bp;
  718. }
  719. return NULL;
  720. }
  721. virtual void setHistoryCapacity(unsigned newCapacity)
  722. {
  723. if (newCapacity != historyCapacity)
  724. {
  725. HistoryRow *newHistory;
  726. if (newCapacity)
  727. {
  728. unsigned copyCount = historySize;
  729. if (copyCount > newCapacity)
  730. copyCount = newCapacity;
  731. newHistory = new HistoryRow [newCapacity];
  732. unsigned slot = 0;
  733. while (copyCount--)
  734. {
  735. IHistoryRow *oldrow = queryHistoryRow(copyCount);
  736. newHistory[slot].sequence = oldrow->querySequence();
  737. newHistory[slot].row = oldrow->queryRow();
  738. newHistory[slot].rowCount = oldrow->queryRowCount();
  739. if (newHistory[slot].row)
  740. LinkRoxieRow(newHistory[slot].row);
  741. slot++;
  742. }
  743. historySize = slot;
  744. nextHistorySlot = slot;
  745. if (nextHistorySlot==historyCapacity)
  746. nextHistorySlot = 0;
  747. }
  748. else
  749. {
  750. newHistory = NULL;
  751. historySize = 0;
  752. nextHistorySlot = 0;
  753. }
  754. for (unsigned idx = 0; idx < historyCapacity; idx++)
  755. ReleaseRoxieRow(history[idx].row);
  756. delete [] history;
  757. history = newHistory;
  758. historyCapacity = newCapacity;
  759. }
  760. }
  761. virtual void clearHistory()
  762. {
  763. for (unsigned idx = 0; idx < historyCapacity; idx++)
  764. ReleaseClearRoxieRow(history[idx].row);
  765. historySize = 0;
  766. nextHistorySlot = 0;
  767. }
  768. virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
  769. {
  770. forceEOF = false;
  771. EOGseen = false;
  772. EOGsent = false;
  773. if (!hasStarted)
  774. {
  775. lastSequence = debugContext->querySequence();
  776. edgeRecord->incrementCount(0, lastSequence);
  777. }
  778. InputProbe::start(parentExtractSize, parentExtract, paused);
  779. }
  780. virtual void reset()
  781. {
  782. InputProbe::reset();
  783. sourceAct->updateTimes(debugContext->querySequence());
  784. targetAct->updateTimes(debugContext->querySequence());
  785. }
  786. virtual void stop()
  787. {
  788. InputProbe::stop();
  789. sourceAct->updateTimes(debugContext->querySequence());
  790. targetAct->updateTimes(debugContext->querySequence());
  791. }
  792. virtual const void *nextRow()
  793. {
  794. // Code is a little complex to avoid interpreting a skip on all rows in a group as EOF
  795. try
  796. {
  797. if (forceEOF)
  798. return NULL;
  799. loop
  800. {
  801. const void *ret = InputProbe::nextRow();
  802. if (!ret)
  803. {
  804. if (EOGseen)
  805. forceEOF = true;
  806. else
  807. EOGseen = true;
  808. }
  809. else
  810. EOGseen = false;
  811. if (ret)
  812. edgeRecord->incrementCount(1, debugContext->querySequence());
  813. BreakpointActionMode action = debugContext->checkBreakpoint(DebugStateEdge, this, ret);
  814. if (action == BreakpointActionSkip && !forceEOF)
  815. {
  816. if (historyCapacity)
  817. queryHistoryRow(0)->setSkipped();
  818. if (ret)
  819. {
  820. edgeRecord->incrementCount(-1, debugContext->querySequence());
  821. ReleaseClearRoxieRow(ret);
  822. rowCount--;
  823. }
  824. continue;
  825. }
  826. else if (action == BreakpointActionLimit)
  827. {
  828. // This return value implies that we should not return the current row NOR should we return any more...
  829. forceEOF = true;
  830. if (ret)
  831. edgeRecord->incrementCount(-1, debugContext->querySequence());
  832. ReleaseClearRoxieRow(ret);
  833. if (historyCapacity)
  834. queryHistoryRow(0)->setLimited();
  835. rowCount--;
  836. }
  837. if (forceEOF || ret || !EOGsent)
  838. {
  839. EOGsent = (ret == NULL);
  840. sourceAct->updateTimes(debugContext->querySequence());
  841. targetAct->updateTimes(debugContext->querySequence());
  842. return ret;
  843. }
  844. }
  845. }
  846. catch (IException *E)
  847. {
  848. debugContext->checkBreakpoint(DebugStateException, this, E);
  849. throw;
  850. }
  851. }
  852. virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
  853. {
  854. // MORE - not sure that skip is safe here? Should the incomplete matches even be returned?
  855. // Code is a little complex to avoid interpreting a skip on all rows in a group as EOF
  856. // MORE - should probably only note them when wasCompleteMatch is true?
  857. try
  858. {
  859. if (forceEOF)
  860. return NULL;
  861. loop
  862. {
  863. const void *ret = InputProbe::nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  864. if (!ret)
  865. {
  866. if (EOGseen)
  867. forceEOF = true;
  868. else
  869. EOGseen = true;
  870. }
  871. else
  872. EOGseen = false;
  873. if (ret)
  874. edgeRecord->incrementCount(1, debugContext->querySequence());
  875. BreakpointActionMode action = debugContext->checkBreakpoint(DebugStateEdge, this, ret);
  876. if (action == BreakpointActionSkip && !forceEOF)
  877. {
  878. if (ret)
  879. edgeRecord->incrementCount(-1, debugContext->querySequence());
  880. ReleaseClearRoxieRow(ret);
  881. if (historyCapacity)
  882. queryHistoryRow(0)->setSkipped();
  883. rowCount--;
  884. continue;
  885. }
  886. else if (action == BreakpointActionLimit)
  887. {
  888. // This return value implies that we should not return the current row NOR should we return any more...
  889. forceEOF = true;
  890. if (ret)
  891. edgeRecord->incrementCount(-1, debugContext->querySequence());
  892. ReleaseClearRoxieRow(ret);
  893. if (historyCapacity)
  894. queryHistoryRow(0)->setLimited();
  895. rowCount--;
  896. }
  897. if (forceEOF || ret || !EOGsent)
  898. {
  899. EOGsent = (ret == NULL);
  900. sourceAct->updateTimes(debugContext->querySequence());
  901. targetAct->updateTimes(debugContext->querySequence());
  902. return ret;
  903. }
  904. }
  905. }
  906. catch (IException *E)
  907. {
  908. debugContext->checkBreakpoint(DebugStateException, this, E);
  909. throw;
  910. }
  911. }
  912. virtual void setBreakpoint(IBreakpointInfo &bp)
  913. {
  914. if (bp.canMatchAny(queryOutputMeta()))
  915. {
  916. breakpoints.append(bp);
  917. bp.noteEdge(*this);
  918. }
  919. }
  920. virtual void removeBreakpoint(IBreakpointInfo &bp)
  921. {
  922. breakpoints.zap(bp);
  923. bp.removeEdge(*this);
  924. }
  925. };
  926. extern IProbeManager *createProbeManager()
  927. {
  928. return new CProbeManager;
  929. }
  930. IDebugGraphManager *createProxyDebugGraphManager(unsigned graphId, unsigned channel, memsize_t remoteGraphId);
  931. class CRoxieDebugGraphManager : extends CBaseDebugGraphManager
  932. {
  933. unsigned subId;
  934. public:
  935. CRoxieDebugGraphManager(IDebuggableContext *_debugContext, unsigned _id, const char *_graphName, unsigned _subId)
  936. : CBaseDebugGraphManager(_debugContext, _id, _graphName), subId(_subId)
  937. {
  938. }
  939. bool Release() const
  940. {
  941. CriticalBlock b(proxyLock);
  942. if (!IsShared())
  943. {
  944. if (!id)
  945. debugContext->releaseManager(const_cast<CRoxieDebugGraphManager*> (this));
  946. if (proxyId)
  947. unregisterProxyId(proxyId);
  948. }
  949. return CInterface::Release();
  950. }
  951. virtual IRoxieProbe *createProbe(IInputBase *in, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
  952. {
  953. CriticalBlock b(crit);
  954. if (!iteration)
  955. iteration = subId;
  956. unsigned channel = debugContext->queryChannel();
  957. unsigned sourceId = sourceAct->queryId();
  958. unsigned targetId = targetAct->queryId();
  959. DebugActivityRecord *sourceActRecord = noteActivity(sourceAct, iteration, channel, debugContext->querySequence());
  960. DebugActivityRecord *targetActRecord = noteActivity(targetAct, iteration, channel, debugContext->querySequence());
  961. DebugProbe *probe = new DebugProbe(in, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
  962. #ifdef _DEBUG
  963. DBGLOG("Creating probe for edge id %s in graphManager %p", probe->queryEdgeId(), this);
  964. #endif
  965. assertex(!allProbes.getValue(probe->queryEdgeId()));
  966. allProbes.setValue(probe->queryEdgeId(), (IActivityDebugContext *) probe);
  967. probe->Release(); // the allProbes map will have linked, and is enough to ensure lifespan...
  968. return probe;
  969. }
  970. virtual memsize_t queryProxyId() const
  971. {
  972. if (!proxyId)
  973. proxyId = registerProxyId((const IDebugGraphManager *) this);
  974. return proxyId;
  975. }
  976. virtual void deserializeProxyGraphs(DebugState state, MemoryBuffer &buff, IActivityBase *parentActivity, unsigned channel)
  977. {
  978. Linked<DebugActivityRecord> parentNode = allActivities.getValue(parentActivity);
  979. assertex(parentNode != NULL);
  980. unsigned numChildren;
  981. buff.read(numChildren);
  982. while (numChildren--)
  983. {
  984. unsigned remoteId;
  985. memsize_t proxyId;
  986. buff.read(remoteId);
  987. __uint64 tmp;
  988. buff.read(tmp);
  989. proxyId = (memsize_t)tmp; // can't serialize memsize_t
  990. bool found = false;
  991. ForEachItemIn(idx, parentNode->childGraphs)
  992. {
  993. IDebugGraphManager &child = parentNode->childGraphs.item(idx);
  994. if (child.queryProxyId() == proxyId)
  995. {
  996. found = true;
  997. if (state == DebugStateGraphFinished)
  998. {
  999. parentNode->childGraphs.remove(idx);
  1000. debugContext->noteGraphChanged();
  1001. }
  1002. break;
  1003. }
  1004. }
  1005. if (!found && state != DebugStateGraphFinished)
  1006. {
  1007. IDebugGraphManager *proxy = createProxyDebugGraphManager(remoteId, channel, proxyId);
  1008. childGraphs.append(*LINK(proxy));
  1009. parentNode->childGraphs.append(*proxy);
  1010. debugContext->noteGraphChanged();
  1011. }
  1012. }
  1013. }
  1014. virtual IProbeManager *startChildGraph(unsigned childGraphId, IActivityBase *parent)
  1015. {
  1016. CriticalBlock b(crit);
  1017. if (childGraphId || parent)
  1018. {
  1019. CRoxieDebugGraphManager *childManager = new CRoxieDebugGraphManager(debugContext, childGraphId, NULL, parent ? parent->queryId() : 0);
  1020. IDebugGraphManager *graph = childManager;
  1021. childGraphs.append(*LINK(graph));
  1022. debugContext->noteGraphChanged();
  1023. return childManager;
  1024. }
  1025. else
  1026. return LINK(this);
  1027. }
  1028. virtual void deleteGraph(IArrayOf<IActivityBase> *activities, IArrayOf<IInputBase> *probes)
  1029. {
  1030. CriticalBlock b(crit);
  1031. if (activities)
  1032. {
  1033. ForEachItemIn(idx, *activities)
  1034. {
  1035. IActivityBase &activity = activities->item(idx);
  1036. if (activity.isSink())
  1037. sinks.zap(activity);
  1038. Linked<DebugActivityRecord> node = allActivities.getValue(&activity);
  1039. if (node)
  1040. allActivities.remove(&activity);
  1041. }
  1042. }
  1043. if (probes)
  1044. {
  1045. IArrayOf<IFinalRoxieInput>* fprobes = (IArrayOf<IFinalRoxieInput>*)(probes);
  1046. ForEachItemIn(probeIdx, *fprobes)
  1047. {
  1048. DebugProbe &probe = (DebugProbe &) fprobes->item(probeIdx);
  1049. #ifdef _DEBUG
  1050. DBGLOG("removing probe for edge id %s in graphManager %p", probe.queryEdgeId(), this);
  1051. #endif
  1052. allProbes.remove(probe.queryEdgeId());
  1053. }
  1054. }
  1055. debugContext->noteGraphChanged();
  1056. }
  1057. };
  1058. extern IProbeManager *createDebugManager(IDebuggableContext *debugContext, const char *graphName)
  1059. {
  1060. return new CRoxieDebugGraphManager(debugContext, 0, graphName, 0);
  1061. }
  1062. enum DebugRequestType
  1063. {
  1064. DEBUGREQUEST_OUTPUTCHILDGRAPH,
  1065. DEBUGREQUEST_OUTPUTLINKSFORCHILDGRAPH,
  1066. DEBUGREQUEST_LOOKUPACTIVITYBYEDGEID,
  1067. DEBUGREQUEST_PRINTEDGE,
  1068. DEBUGREQUEST_SETBREAKPOINT,
  1069. DEBUGREQUEST_SEARCHHISTORIES,
  1070. DEBUGREQUEST_GETRESETGLOBALCOUNTS
  1071. };
  1072. struct DebugRequestBase : public CInterface, implements IInterface
  1073. {
  1074. protected:
  1075. DebugRequestType function;
  1076. memsize_t proxyId; // MORE - at some point should really make this an int instead - but need to look into whether ever used to represent a pointer
  1077. public:
  1078. IMPLEMENT_IINTERFACE;
  1079. DebugRequestBase(DebugRequestType _function, memsize_t _proxyId) : function(_function), proxyId(_proxyId) {}
  1080. DebugRequestBase(MemoryBuffer &serialized)
  1081. {
  1082. byte fval;
  1083. serialized.read(fval); function = (DebugRequestType) fval;
  1084. unsigned __int64 tmp;
  1085. serialized.read(tmp); // can't serilalize memsize_t
  1086. proxyId = (memsize_t) tmp;
  1087. }
  1088. virtual void serialize(MemoryBuffer &buf)
  1089. {
  1090. buf.append((byte) function);
  1091. buf.append((unsigned __int64) proxyId); // can't serialize memsize_t
  1092. }
  1093. virtual void executeRequest(IXmlWriter *output) = 0;
  1094. inline IDebugGraphManager *getManager()
  1095. {
  1096. return (IDebugGraphManager *) getProxy(proxyId);
  1097. }
  1098. inline IActivityDebugContext *getActivity()
  1099. {
  1100. return (IActivityDebugContext *) getProxy(proxyId);
  1101. }
  1102. void inactive(IXmlWriter *output)
  1103. {
  1104. // MORE - what should I do here?
  1105. }
  1106. };
  1107. struct DebugRequestOutputChildGraph : public DebugRequestBase
  1108. {
  1109. private:
  1110. unsigned sequence;
  1111. public:
  1112. DebugRequestOutputChildGraph(memsize_t _proxyId, unsigned _sequence) : DebugRequestBase(DEBUGREQUEST_OUTPUTCHILDGRAPH, _proxyId), sequence(_sequence)
  1113. {
  1114. }
  1115. DebugRequestOutputChildGraph(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1116. {
  1117. serialized.read(sequence);
  1118. }
  1119. virtual void serialize(MemoryBuffer &buf)
  1120. {
  1121. DebugRequestBase::serialize(buf);
  1122. buf.append(sequence);
  1123. }
  1124. virtual void executeRequest(IXmlWriter *output)
  1125. {
  1126. Owned<IDebugGraphManager> manager = getManager();
  1127. if (manager)
  1128. manager->outputChildGraph(output, sequence);
  1129. else
  1130. inactive(output);
  1131. }
  1132. };
  1133. struct DebugRequestWithId : public DebugRequestBase
  1134. {
  1135. StringAttr id;
  1136. public:
  1137. DebugRequestWithId(DebugRequestType _function, memsize_t _proxyId, const char *_id) : DebugRequestBase(_function, _proxyId), id(_id)
  1138. {
  1139. }
  1140. DebugRequestWithId(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1141. {
  1142. serialized.read(id);
  1143. }
  1144. virtual void serialize(MemoryBuffer &buf)
  1145. {
  1146. DebugRequestBase::serialize(buf);
  1147. buf.append(id);
  1148. }
  1149. };
  1150. struct DebugRequestOutputLinksForChildGraph : public DebugRequestWithId
  1151. {
  1152. public:
  1153. DebugRequestOutputLinksForChildGraph(memsize_t _proxyId, const char *_id) : DebugRequestWithId(DEBUGREQUEST_OUTPUTLINKSFORCHILDGRAPH, _proxyId, _id) {}
  1154. DebugRequestOutputLinksForChildGraph(MemoryBuffer &serialized) : DebugRequestWithId(serialized)
  1155. {
  1156. }
  1157. virtual void executeRequest(IXmlWriter *output)
  1158. {
  1159. Owned<IDebugGraphManager> manager = getManager();
  1160. if (manager)
  1161. manager->outputLinksForChildGraph(output, id);
  1162. else
  1163. inactive(output);
  1164. }
  1165. };
  1166. struct DebugRequestLookupActivityByEdgeId : public DebugRequestWithId
  1167. {
  1168. public:
  1169. DebugRequestLookupActivityByEdgeId(memsize_t _proxyId, const char *_id) : DebugRequestWithId(DEBUGREQUEST_LOOKUPACTIVITYBYEDGEID, _proxyId, _id) {}
  1170. DebugRequestLookupActivityByEdgeId(MemoryBuffer &serialized) : DebugRequestWithId(serialized)
  1171. {
  1172. }
  1173. virtual void executeRequest(IXmlWriter *output)
  1174. {
  1175. Owned<IDebugGraphManager> manager = getManager();
  1176. if (manager)
  1177. {
  1178. output->outputBeginNested("Result", true);
  1179. IActivityDebugContext *edge = manager->lookupActivityByEdgeId(id);
  1180. if (edge)
  1181. output->outputInt(edge->queryProxyId(), sizeof(int), "@proxyId");
  1182. output->outputEndNested("Result");
  1183. }
  1184. else
  1185. inactive(output);
  1186. }
  1187. };
  1188. struct DebugRequestPrintEdge : public DebugRequestBase
  1189. {
  1190. private:
  1191. unsigned startRow;
  1192. unsigned numRows;
  1193. public:
  1194. DebugRequestPrintEdge(memsize_t _proxyId, unsigned _startRow, unsigned _numRows)
  1195. : DebugRequestBase(DEBUGREQUEST_PRINTEDGE, _proxyId), startRow(_startRow), numRows(_numRows)
  1196. {
  1197. }
  1198. DebugRequestPrintEdge(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1199. {
  1200. serialized.read(startRow);
  1201. serialized.read(numRows);
  1202. }
  1203. virtual void serialize(MemoryBuffer &buf)
  1204. {
  1205. DebugRequestBase::serialize(buf);
  1206. buf.append(startRow);
  1207. buf.append(numRows);
  1208. }
  1209. virtual void executeRequest(IXmlWriter *output)
  1210. {
  1211. Owned<IActivityDebugContext> activity = getActivity();
  1212. if (activity)
  1213. activity->printEdge(output, startRow, numRows);
  1214. else
  1215. inactive(output);
  1216. }
  1217. };
  1218. struct DebugRequestSetRemoveBreakpoint : public DebugRequestBase
  1219. {
  1220. private:
  1221. Linked<IBreakpointInfo> bp;
  1222. bool isRemove;
  1223. public:
  1224. inline DebugRequestSetRemoveBreakpoint(memsize_t _proxyId, IBreakpointInfo &_bp, bool _isRemove)
  1225. : DebugRequestBase(DEBUGREQUEST_SETBREAKPOINT, _proxyId), bp(&_bp), isRemove(_isRemove)
  1226. {
  1227. }
  1228. DebugRequestSetRemoveBreakpoint(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1229. {
  1230. bp.setown(new CBreakpointInfo(serialized));
  1231. serialized.read(isRemove);
  1232. }
  1233. virtual void serialize(MemoryBuffer &buf)
  1234. {
  1235. DebugRequestBase::serialize(buf);
  1236. bp->serialize(buf);
  1237. buf.append(isRemove);
  1238. }
  1239. virtual void executeRequest(IXmlWriter *output)
  1240. {
  1241. Owned<IDebugGraphManager> manager = getManager();
  1242. if (manager)
  1243. {
  1244. if (isRemove)
  1245. manager->queryContext()->removeBreakpoint(*bp);
  1246. else
  1247. manager->queryContext()->addBreakpoint(*bp.getLink());
  1248. }
  1249. else
  1250. inactive(output);
  1251. }
  1252. };
  1253. struct DebugRequestSearchHistories : public DebugRequestBase
  1254. {
  1255. private:
  1256. Linked<IRowMatcher> matcher;
  1257. bool fullRows;
  1258. public:
  1259. inline DebugRequestSearchHistories(memsize_t _proxyId, IRowMatcher *_matcher, bool _fullRows)
  1260. : DebugRequestBase(DEBUGREQUEST_SEARCHHISTORIES, _proxyId), matcher(_matcher), fullRows(_fullRows)
  1261. {
  1262. }
  1263. DebugRequestSearchHistories(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1264. {
  1265. matcher.setown(createRowMatcher(serialized));
  1266. serialized.read(fullRows);
  1267. }
  1268. virtual void serialize(MemoryBuffer &buf)
  1269. {
  1270. DebugRequestBase::serialize(buf);
  1271. matcher->serialize(buf);
  1272. buf.append(fullRows);
  1273. }
  1274. virtual void executeRequest(IXmlWriter *output)
  1275. {
  1276. Owned<IDebugGraphManager> manager = getManager();
  1277. if (manager)
  1278. manager->searchHistories(output, matcher, fullRows);
  1279. else
  1280. inactive(output);
  1281. }
  1282. };
  1283. class DebugRequestGetResetGlobalCounts : public DebugRequestBase
  1284. {
  1285. public:
  1286. inline DebugRequestGetResetGlobalCounts(memsize_t _proxyId)
  1287. : DebugRequestBase(DEBUGREQUEST_GETRESETGLOBALCOUNTS, _proxyId)
  1288. {
  1289. }
  1290. DebugRequestGetResetGlobalCounts(MemoryBuffer &serialized) : DebugRequestBase(serialized)
  1291. {
  1292. }
  1293. virtual void executeRequest(IXmlWriter *output)
  1294. {
  1295. Owned<IDebugGraphManager> manager = getManager();
  1296. if (manager)
  1297. manager->queryContext()->debugCounts(output, 0, true);
  1298. else
  1299. inactive(output);
  1300. }
  1301. };
  1302. extern void doDebugRequest(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  1303. {
  1304. RoxiePacketHeader newHeader(packet->queryHeader(), ROXIE_DEBUGREQUEST);
  1305. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1306. unsigned contextLength = packet->getContextLength();
  1307. Owned<DebugRequestBase> request;
  1308. MemoryBuffer serialized;
  1309. serialized.setBuffer(contextLength, (void*) packet->queryContextData(), false);
  1310. byte fval;
  1311. serialized.read(fval);
  1312. serialized.reset();
  1313. CommonXmlWriter xml(0);
  1314. switch ((DebugRequestType) fval)
  1315. {
  1316. case DEBUGREQUEST_OUTPUTCHILDGRAPH:
  1317. request.setown(new DebugRequestOutputChildGraph(serialized));
  1318. break;
  1319. case DEBUGREQUEST_OUTPUTLINKSFORCHILDGRAPH:
  1320. request.setown(new DebugRequestOutputLinksForChildGraph(serialized));
  1321. break;
  1322. case DEBUGREQUEST_LOOKUPACTIVITYBYEDGEID:
  1323. request.setown(new DebugRequestLookupActivityByEdgeId(serialized));
  1324. break;
  1325. case DEBUGREQUEST_PRINTEDGE:
  1326. request.setown(new DebugRequestPrintEdge(serialized));
  1327. break;
  1328. case DEBUGREQUEST_SETBREAKPOINT:
  1329. request.setown(new DebugRequestSetRemoveBreakpoint(serialized));
  1330. break;
  1331. case DEBUGREQUEST_SEARCHHISTORIES:
  1332. request.setown(new DebugRequestSearchHistories(serialized));
  1333. break;
  1334. case DEBUGREQUEST_GETRESETGLOBALCOUNTS:
  1335. request.setown(new DebugRequestGetResetGlobalCounts(serialized));
  1336. break;
  1337. default: throwUnexpected();
  1338. }
  1339. request->executeRequest(&xml);
  1340. void *ret = output->getBuffer(xml.length()+1, true);
  1341. memcpy(ret, xml.str(), xml.length()+1);
  1342. output->putBuffer(ret, xml.length()+1, true);
  1343. output->flush(true);
  1344. }
  1345. class CProxyDebugContext : public CInterface
  1346. {
  1347. protected:
  1348. memsize_t proxyId;
  1349. unsigned channel;
  1350. Owned<StringContextLogger> logctx;
  1351. void sendProxyRequest(IXmlWriter *output, DebugRequestBase &request) const
  1352. {
  1353. RemoteActivityId id(ROXIE_DEBUGREQUEST, 0);
  1354. ruid_t ruid = getNextRuid();
  1355. RoxiePacketHeader header(id, ruid, channel, 0);
  1356. MemoryBuffer b;
  1357. b.append(sizeof(header), &header);
  1358. b.append ((char) LOGGING_FLAGSPRESENT);
  1359. b.append("PROXY"); // MORE - a better log prefix might be good...
  1360. request.serialize(b);
  1361. Owned<IRowManager> rowManager = roxiemem::createRowManager(1, NULL, *logctx, NULL);
  1362. Owned<IMessageCollator> mc = ROQ->queryReceiveManager()->createMessageCollator(rowManager, ruid);
  1363. Owned<IRoxieQueryPacket> packet = createRoxiePacket(b);
  1364. ROQ->sendPacket(packet, *logctx);
  1365. for (unsigned retries = 1; retries <= MAX_DEBUGREQUEST_RETRIES; retries++)
  1366. {
  1367. bool anyActivity = false;
  1368. Owned<IMessageResult> mr = mc->getNextResult(DEBUGREQUEST_TIMEOUT, anyActivity);
  1369. if (mr)
  1370. {
  1371. unsigned roxieHeaderLen;
  1372. const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(roxieHeaderLen);
  1373. Owned<IMessageUnpackCursor> mu = mr->getCursor(rowManager);
  1374. if (header->activityId == ROXIE_EXCEPTION)
  1375. throwRemoteException(mu);
  1376. assertex(header->activityId == ROXIE_DEBUGREQUEST);
  1377. RecordLengthType *rowlen = (RecordLengthType *) mu->getNext(sizeof(RecordLengthType));
  1378. assertex(rowlen);
  1379. RecordLengthType len = *rowlen;
  1380. ReleaseRoxieRow(rowlen);
  1381. const char * reply = (const char *) mu->getNext(len);
  1382. if (output)
  1383. {
  1384. output->outputString(0, NULL, NULL);
  1385. output->outputQuoted(reply);
  1386. }
  1387. ReleaseRoxieRow(reply);
  1388. ROQ->queryReceiveManager()->detachCollator(mc);
  1389. mc.clear();
  1390. return;
  1391. }
  1392. else if (!anyActivity)
  1393. {
  1394. DBGLOG("Retrying debug request");
  1395. ROQ->sendPacket(packet, *logctx);
  1396. }
  1397. }
  1398. ROQ->queryReceiveManager()->detachCollator(mc);
  1399. mc.clear();
  1400. throwUnexpected(); // MORE - better error
  1401. }
  1402. public:
  1403. CProxyDebugContext(unsigned _channel, memsize_t _proxyId) : channel(_channel), proxyId(_proxyId)
  1404. {
  1405. logctx.setown(new StringContextLogger("CProxyDebugContext"));
  1406. }
  1407. };
  1408. class CProxyActivityDebugContext : public CProxyDebugContext, implements IActivityDebugContext
  1409. {
  1410. StringAttr edgeId;
  1411. public:
  1412. IMPLEMENT_IINTERFACE;
  1413. CProxyActivityDebugContext(unsigned _channel, memsize_t _proxyId, const char *_edgeId)
  1414. : CProxyDebugContext(_channel, _proxyId), edgeId(_edgeId)
  1415. {
  1416. }
  1417. virtual unsigned queryLastSequence() const { UNIMPLEMENTED; };
  1418. virtual IActivityDebugContext *queryInputActivity() const { UNIMPLEMENTED; };
  1419. virtual void getXGMML(IXmlWriter *output) const { UNIMPLEMENTED; };
  1420. virtual void searchHistories(IXmlWriter *output, IRowMatcher *matcher, bool fullRows) { UNIMPLEMENTED; }
  1421. virtual unsigned queryHistorySize() const { UNIMPLEMENTED; };
  1422. virtual IHistoryRow *queryHistoryRow(unsigned idx) const { UNIMPLEMENTED; };
  1423. virtual unsigned queryHistoryCapacity() const { UNIMPLEMENTED; };
  1424. virtual IBreakpointInfo *debuggerCallback(unsigned sequence, const void *row)
  1425. {
  1426. // was done on slave, don't do here too
  1427. return NULL;
  1428. };
  1429. virtual void setHistoryCapacity(unsigned newCapacity) { UNIMPLEMENTED; };
  1430. virtual void clearHistory() { UNIMPLEMENTED; };
  1431. virtual void printEdge(IXmlWriter *output, unsigned startRow, unsigned numRows) const
  1432. {
  1433. DebugRequestPrintEdge request(proxyId, startRow, numRows);
  1434. sendProxyRequest(output, request);
  1435. };
  1436. virtual void setBreakpoint(IBreakpointInfo &bp) { throwUnexpected(); }
  1437. virtual void removeBreakpoint(IBreakpointInfo &bp) { throwUnexpected(); };
  1438. virtual const char *queryEdgeId() const
  1439. {
  1440. return edgeId;
  1441. };
  1442. virtual const char *querySourceId() const { UNIMPLEMENTED; };
  1443. virtual unsigned queryChildGraphId() const { UNIMPLEMENTED; };
  1444. virtual memsize_t queryProxyId() const { UNIMPLEMENTED; };
  1445. };
  1446. class CProxyDebugGraphManager : public CProxyDebugContext, implements IDebugGraphManager
  1447. {
  1448. unsigned id;
  1449. StringBuffer idString;
  1450. MapStringToMyClass<IActivityDebugContext> edgeProxies;
  1451. public:
  1452. IMPLEMENT_IINTERFACE;
  1453. CProxyDebugGraphManager(unsigned _id, unsigned _channel, memsize_t _proxyId)
  1454. : CProxyDebugContext(_channel, _proxyId), id(_id)
  1455. {
  1456. idString.append(_id).append('#').append(channel);
  1457. }
  1458. virtual IActivityDebugContext *lookupActivityByEdgeId(const char *edgeId)
  1459. {
  1460. IActivityDebugContext *edge = edgeProxies.getValue(edgeId);
  1461. if (!edge)
  1462. {
  1463. const char *channelTail = strrchr(edgeId, '#');
  1464. if (channelTail && atoi(channelTail+1)==channel)
  1465. {
  1466. DebugRequestLookupActivityByEdgeId request(proxyId, edgeId);
  1467. CommonXmlWriter reply(0);
  1468. sendProxyRequest(&reply, request);
  1469. Owned<IPropertyTree> response = createPTreeFromXMLString(reply.str());
  1470. if (response)
  1471. {
  1472. memsize_t proxyId = (memsize_t) response->getPropInt64("@proxyId", 0);
  1473. if (proxyId)
  1474. {
  1475. edge = new CProxyActivityDebugContext(channel, proxyId, edgeId);
  1476. edgeProxies.setValue(edgeId, edge);
  1477. ::Release(edge);
  1478. }
  1479. }
  1480. }
  1481. }
  1482. return edge;
  1483. }
  1484. virtual const char *queryGraphName() const { UNIMPLEMENTED; }
  1485. virtual void getXGMML(IXmlWriter *output, unsigned sequence, bool isActive)
  1486. {
  1487. throwUnexpected();
  1488. }
  1489. virtual void setBreakpoint(IBreakpointInfo &bp)
  1490. {
  1491. DebugRequestSetRemoveBreakpoint request(proxyId, bp, false);
  1492. sendProxyRequest(NULL, request);
  1493. }
  1494. virtual void removeBreakpoint(IBreakpointInfo &bp)
  1495. {
  1496. DebugRequestSetRemoveBreakpoint request(proxyId, bp, true);
  1497. sendProxyRequest(NULL, request);
  1498. }
  1499. virtual void setHistoryCapacity(unsigned newCapacity) { UNIMPLEMENTED; }
  1500. virtual void clearHistories() { UNIMPLEMENTED; }
  1501. virtual void searchHistories(IXmlWriter *output, IRowMatcher *matcher, bool fullRows)
  1502. {
  1503. DebugRequestSearchHistories request(proxyId, matcher, fullRows);
  1504. sendProxyRequest(output, request);
  1505. }
  1506. virtual void setNodeProperty(IActivityBase *node, const char *propName, const char *propVvalue)
  1507. {
  1508. // MORE - should I do anything here?
  1509. }
  1510. virtual DebugActivityRecord *getNodeByActivityBase(IActivityBase *activity) const
  1511. {
  1512. // MORE - should I do anything here?
  1513. return NULL;
  1514. }
  1515. virtual void noteSlaveGraph(IActivityBase *parentActivity, unsigned graphId, unsigned channel, memsize_t remoteGraphId)
  1516. {
  1517. UNIMPLEMENTED; // MORE - can this happen? nested graphs?
  1518. }
  1519. virtual memsize_t queryProxyId() const
  1520. {
  1521. return proxyId;
  1522. }
  1523. virtual const char *queryIdString() const
  1524. {
  1525. return idString.str();
  1526. }
  1527. virtual unsigned queryId() const
  1528. {
  1529. return id;
  1530. }
  1531. virtual void outputChildGraph(IXmlWriter *output, unsigned sequence)
  1532. {
  1533. DebugRequestOutputChildGraph request(proxyId, sequence);
  1534. sendProxyRequest(output, request);
  1535. }
  1536. virtual void outputLinksForChildGraph(IXmlWriter *output, const char *parentId)
  1537. {
  1538. DebugRequestOutputLinksForChildGraph request(proxyId, parentId);
  1539. sendProxyRequest(output, request);
  1540. }
  1541. virtual void serializeProxyGraphs(MemoryBuffer &buff)
  1542. {
  1543. UNIMPLEMENTED;
  1544. }
  1545. virtual void deserializeProxyGraphs(DebugState state, MemoryBuffer &buff, IActivityBase *parentActivity, unsigned channel)
  1546. {
  1547. UNIMPLEMENTED;
  1548. }
  1549. virtual IDebuggableContext *queryContext() const
  1550. {
  1551. UNIMPLEMENTED;
  1552. }
  1553. virtual void mergeRemoteCounts(IDebuggableContext *into) const
  1554. {
  1555. DebugRequestGetResetGlobalCounts request(proxyId);
  1556. CommonXmlWriter reply(0);
  1557. reply.outputBeginNested("Counts", true);
  1558. sendProxyRequest(&reply, request);
  1559. reply.outputEndNested("Counts"); // strange way to do it...
  1560. Owned<IPropertyTree> response = createPTreeFromXMLString(reply.str());
  1561. if (response)
  1562. {
  1563. Owned<IPropertyTreeIterator> edges = response->getElements("edge");
  1564. ForEach(*edges)
  1565. {
  1566. IPropertyTree &edge = edges->query();
  1567. const char *edgeId = edge.queryProp("@edgeId");
  1568. unsigned edgeCount = edge.getPropInt("@count");
  1569. Owned<IGlobalEdgeRecord> thisEdge = into->getEdgeRecord(edgeId);
  1570. thisEdge->incrementCount(edgeCount, into->querySequence());
  1571. }
  1572. }
  1573. }
  1574. };
  1575. IDebugGraphManager *createProxyDebugGraphManager(unsigned graphId, unsigned channel, memsize_t remoteGraphId)
  1576. {
  1577. return new CProxyDebugGraphManager(graphId, channel, remoteGraphId);
  1578. }