ccdcontext.cpp 111 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "nbcd.hpp"
  16. #include "rtlread_imp.hpp"
  17. #include "thorplugin.hpp"
  18. #include "thorxmlread.hpp"
  19. #include "roxiemem.hpp"
  20. #include "ccd.hpp"
  21. #include "ccdcontext.hpp"
  22. #include "ccddebug.hpp"
  23. #include "ccddali.hpp"
  24. #include "ccdquery.hpp"
  25. #include "ccdqueue.ipp"
  26. #include "ccdsnmp.hpp"
  27. #include "ccdstate.hpp"
  28. using roxiemem::IRowManager;
  29. //=======================================================================================================================
  30. #define DEBUGEE_TIMEOUT 10000
  31. class CSlaveDebugContext : public CBaseDebugContext
  32. {
  33. /*
  34. Some thoughts on slave debugging
  35. 1. Something like a ping can be used to get data from slave when needed
  36. 2. Should disable IBYTI processing (always use primary) - DONE
  37. and server-side caching - DONE
  38. 3. Roxie server can know what slave transactions are pending by intercepting the sends - no need for slave to call back just to indicate start of slave subgraph
  39. 4. There is a problem when a slave hits a breakpoint in that the breakpoint cound have been deleted by the time it gets a chance to tell the Roxie server - can't
  40. happen in local case because of the critical block at the head of checkBreakpoint but the local copy of BPs out on slave CAN get out of date. Should we care?
  41. Should there be a "Sorry, your breakpoints are out of date, here's the new set" response?
  42. Actually what we do is recheck the BP on the server, and ensure that breakpoint indexes are persistant. DONE
  43. 5. We need to serialize over our graph info if changed since last time.
  44. 6. I think we need to change implementation of debugGraph to support children. Then we have a place to put a proxy for a remote one.
  45. - id's should probably be structured so we can use a hash table at each level
  46. */
  47. const RoxiePacketHeader &header;
  48. memsize_t parentActivity;
  49. unsigned channel;
  50. int debugSequence;
  51. CriticalSection crit;
  52. const IRoxieContextLogger &logctx; // hides base class definition with more derived class pointer
  53. public:
  54. CSlaveDebugContext(IRoxieSlaveContext *_ctx, const IRoxieContextLogger &_logctx, RoxiePacketHeader &_header)
  55. : CBaseDebugContext(_logctx), header(_header), logctx(_logctx)
  56. {
  57. channel = header.channel;
  58. debugSequence = 0;
  59. parentActivity = 0;
  60. }
  61. void init(const IRoxieQueryPacket *_packet)
  62. {
  63. unsigned traceLength = _packet->getTraceLength();
  64. assertex(traceLength);
  65. const byte *traceInfo = _packet->queryTraceInfo();
  66. assertex((*traceInfo & LOGGING_DEBUGGERACTIVE) != 0);
  67. unsigned debugLen = *(unsigned short *) (traceInfo + 1);
  68. MemoryBuffer b;
  69. b.setBuffer(debugLen, (char *) (traceInfo + 1 + sizeof(unsigned short)), false);
  70. deserialize(b);
  71. __uint64 tmp; // can't serialize memsize_t
  72. b.read(tmp); // note - this is written by the RemoteAdaptor not by the serialize....
  73. parentActivity = (memsize_t)tmp;
  74. }
  75. virtual unsigned queryChannel() const
  76. {
  77. return channel;
  78. }
  79. virtual BreakpointActionMode checkBreakpoint(DebugState state, IActivityDebugContext *probe, const void *extra)
  80. {
  81. return CBaseDebugContext::checkBreakpoint(state, probe, extra);
  82. }
  83. virtual void waitForDebugger(DebugState state, IActivityDebugContext *probe)
  84. {
  85. StringBuffer debugIdString;
  86. CriticalBlock b(crit); // Make sure send sequentially - don't know if this is strictly needed...
  87. debugSequence++;
  88. debugIdString.appendf(".debug.%x", debugSequence);
  89. IPendingCallback *callback = ROQ->notePendingCallback(header, debugIdString.str()); // note that we register before the send to avoid a race.
  90. try
  91. {
  92. RoxiePacketHeader newHeader(header, ROXIE_DEBUGCALLBACK);
  93. loop // retry indefinately, as more than likely Roxie server is waiting for user input ...
  94. {
  95. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  96. // These are deserialized in onDebugCallback..
  97. MemoryBuffer debugInfo;
  98. debugInfo.append(debugSequence);
  99. debugInfo.append((char) state);
  100. if (state==DebugStateGraphFinished)
  101. {
  102. debugInfo.append(globalCounts.count());
  103. HashIterator edges(globalCounts);
  104. ForEach(edges)
  105. {
  106. IGlobalEdgeRecord *edge = globalCounts.mapToValue(&edges.query());
  107. debugInfo.append((const char *) edges.query().getKey());
  108. debugInfo.append(edge->queryCount());
  109. }
  110. }
  111. debugInfo.append(currentBreakpointUID);
  112. debugInfo.append((__uint64)parentActivity); // can't serialize memsize_t
  113. debugInfo.append(channel);
  114. assertex (currentGraph); // otherwise what am I remote debugging?
  115. currentGraph->serializeProxyGraphs(debugInfo);
  116. debugInfo.append(probe ? probe->queryEdgeId() : "");
  117. char *buf = (char *) output->getBuffer(debugInfo.length(), true);
  118. memcpy(buf, debugInfo.toByteArray(), debugInfo.length());
  119. output->putBuffer(buf, debugInfo.length(), true);
  120. output->flush(true);
  121. output.clear();
  122. if (callback->wait(5000))
  123. break;
  124. }
  125. if (traceLevel > 6)
  126. { StringBuffer s; DBGLOG("Processing information from Roxie server in response to %s", newHeader.toString(s).str()); }
  127. MemoryBuffer &serverData = callback->queryData();
  128. deserialize(serverData);
  129. }
  130. catch (...)
  131. {
  132. ROQ->removePendingCallback(callback);
  133. throw;
  134. }
  135. ROQ->removePendingCallback(callback);
  136. }
  137. virtual IRoxieQueryPacket *onDebugCallback(const RoxiePacketHeader &header, size32_t len, char *data)
  138. {
  139. // MORE - Implies a server -> slave child -> slave grandchild type situation - need to pass call on to Roxie server (rather as I do for file callback)
  140. UNIMPLEMENTED;
  141. }
  142. virtual bool onDebuggerTimeout()
  143. {
  144. throwUnexpected();
  145. }
  146. virtual void debugCounts(IXmlWriter *output, unsigned sinceSequence, bool reset)
  147. {
  148. // This gives info for the global view - accumulated counts for all instances, plus the graph as fetched from the workunit
  149. HashIterator edges(globalCounts);
  150. ForEach(edges)
  151. {
  152. IGlobalEdgeRecord *edge = globalCounts.mapToValue(&edges.query());
  153. if (edge->queryLastSequence() && (!sinceSequence || edge->queryLastSequence() > sinceSequence))
  154. {
  155. output->outputBeginNested("edge", true);
  156. output->outputCString((const char *) edges.query().getKey(), "@edgeId");
  157. output->outputUInt(edge->queryCount(), "@count");
  158. output->outputEndNested("edge");
  159. }
  160. if (reset)
  161. edge->reset();
  162. }
  163. }
  164. };
  165. //=======================================================================================================================
  166. class CRoxieWorkflowMachine : public WorkflowMachine
  167. {
  168. public:
  169. CRoxieWorkflowMachine(IPropertyTree *_workflowInfo, bool _doOnce, const IRoxieContextLogger &_logctx) : WorkflowMachine(_logctx)
  170. {
  171. workflowInfo = _workflowInfo;
  172. doOnce = _doOnce;
  173. }
  174. protected:
  175. virtual void begin()
  176. {
  177. // MORE - should pre-do more of this work
  178. unsigned count = 0;
  179. Owned<IConstWorkflowItemIterator> iter = createWorkflowItemIterator(workflowInfo);
  180. for(iter->first(); iter->isValid(); iter->next())
  181. count++;
  182. workflow.setown(createWorkflowItemArray(count));
  183. for(iter->first(); iter->isValid(); iter->next())
  184. {
  185. IConstWorkflowItem *item = iter->query();
  186. bool isOnce = (item->queryMode() == WFModeOnce);
  187. workflow->addClone(item);
  188. if (isOnce != doOnce)
  189. workflow->queryWfid(item->queryWfid()).setState(WFStateDone);
  190. }
  191. }
  192. virtual void end()
  193. {
  194. workflow.clear();
  195. }
  196. virtual void schedulingStart() { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported in roxie"); }
  197. virtual bool schedulingPull() { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported in roxie"); }
  198. virtual bool schedulingPullStop() { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported in roxie"); }
  199. virtual void reportContingencyFailure(char const * type, IException * e) {}
  200. virtual void checkForAbort(unsigned wfid, IException * handling) {}
  201. virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Persists not supported in roxie"); }
  202. private:
  203. IPropertyTree *workflowInfo;
  204. bool doOnce;
  205. };
  206. WorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, bool _doOnce, const IRoxieContextLogger &_logctx)
  207. {
  208. return new CRoxieWorkflowMachine(_workflowInfo, _doOnce, _logctx);
  209. }
  210. //=======================================================================================================================
  211. typedef byte *row_t;
  212. typedef row_t * rowset_t;
  213. class DeserializedDataReader : public CInterface, implements IWorkUnitRowReader
  214. {
  215. const rowset_t data;
  216. size32_t count;
  217. unsigned idx;
  218. public:
  219. IMPLEMENT_IINTERFACE;
  220. DeserializedDataReader(size32_t _count, rowset_t _data)
  221. : data(_data), count(_count)
  222. {
  223. idx = 0;
  224. }
  225. virtual const void * nextInGroup()
  226. {
  227. if (idx < count)
  228. {
  229. const void *row = data[idx];
  230. if (row)
  231. LinkRoxieRow(row);
  232. idx++;
  233. return row;
  234. }
  235. return NULL;
  236. }
  237. virtual void getResultRowset(size32_t & tcount, byte * * & tgt)
  238. {
  239. tcount = count;
  240. if (data)
  241. rtlLinkRowset(data);
  242. tgt = data;
  243. }
  244. };
  245. class CDeserializedResultStore : public CInterface, implements IDeserializedResultStore
  246. {
  247. PointerArrayOf<row_t> stored;
  248. UnsignedArray counts;
  249. PointerIArrayOf<IOutputMetaData> metas;
  250. mutable SpinLock lock;
  251. public:
  252. IMPLEMENT_IINTERFACE;
  253. ~CDeserializedResultStore()
  254. {
  255. ForEachItemIn(idx, stored)
  256. {
  257. rowset_t rows = stored.item(idx);
  258. if (rows)
  259. {
  260. rtlReleaseRowset(counts.item(idx), (byte**) rows);
  261. }
  262. }
  263. }
  264. virtual int addResult(size32_t count, rowset_t data, IOutputMetaData *meta)
  265. {
  266. SpinBlock b(lock);
  267. stored.append(data);
  268. counts.append(count);
  269. metas.append(meta);
  270. return stored.ordinality()-1;
  271. }
  272. virtual void queryResult(int id, size32_t &count, rowset_t &data) const
  273. {
  274. count = counts.item(id);
  275. data = stored.item(id);
  276. }
  277. virtual IWorkUnitRowReader *createDeserializedReader(int id) const
  278. {
  279. return new DeserializedDataReader(counts.item(id), stored.item(id));
  280. }
  281. virtual void serialize(unsigned & tlen, void * & tgt, int id, ICodeContext *codectx) const
  282. {
  283. IOutputMetaData *meta = metas.item(id);
  284. rowset_t data = stored.item(id);
  285. size32_t count = counts.item(id);
  286. MemoryBuffer result;
  287. Owned<IOutputRowSerializer> rowSerializer = meta->createDiskSerializer(codectx, 0); // NOTE - we don't have a meaningful activity id. Only used for error reporting.
  288. bool grouped = meta->isGrouped();
  289. for (size32_t idx = 0; idx<count; idx++)
  290. {
  291. const byte *row = data[idx];
  292. if (grouped && idx)
  293. result.append(row == NULL);
  294. if (row)
  295. {
  296. CThorDemoRowSerializer serializerTarget(result);
  297. rowSerializer->serialize(serializerTarget, row);
  298. }
  299. }
  300. tlen = result.length();
  301. tgt= result.detach();
  302. }
  303. };
  304. extern IDeserializedResultStore *createDeserializedResultStore()
  305. {
  306. return new CDeserializedResultStore;
  307. }
  308. class WorkUnitRowReaderBase : public CInterface, implements IWorkUnitRowReader
  309. {
  310. protected:
  311. Linked<IEngineRowAllocator> rowAllocator;
  312. bool isGrouped;
  313. public:
  314. IMPLEMENT_IINTERFACE;
  315. WorkUnitRowReaderBase(IEngineRowAllocator *_rowAllocator, bool _isGrouped)
  316. : rowAllocator(_rowAllocator), isGrouped(_isGrouped)
  317. {
  318. }
  319. virtual void getResultRowset(size32_t & tcount, byte * * & tgt)
  320. {
  321. bool atEOG = true;
  322. RtlLinkedDatasetBuilder builder(rowAllocator);
  323. loop
  324. {
  325. const void *ret = nextInGroup();
  326. if (!ret)
  327. {
  328. if (atEOG || !isGrouped)
  329. break;
  330. atEOG = true;
  331. }
  332. else
  333. atEOG = false;
  334. builder.appendOwn(ret);
  335. }
  336. tcount = builder.getcount();
  337. tgt = builder.linkrows();
  338. }
  339. };
  340. class RawDataReader : public WorkUnitRowReaderBase
  341. {
  342. protected:
  343. const IRoxieContextLogger &logctx;
  344. byte *bufferBase;
  345. MemoryBuffer blockBuffer;
  346. Owned<ISerialStream> bufferStream;
  347. CThorStreamDeserializerSource rowSource;
  348. bool eof;
  349. bool eogPending;
  350. Owned<IOutputRowDeserializer> rowDeserializer;
  351. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) = 0;
  352. bool reload()
  353. {
  354. free(bufferBase);
  355. size32_t lenData;
  356. bufferBase = NULL;
  357. void *tempData, *base;
  358. eof = !nextBlock(lenData, tempData, base);
  359. bufferBase = (byte *) base;
  360. blockBuffer.setBuffer(lenData, tempData, false);
  361. return !eof;
  362. }
  363. public:
  364. RawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx)
  365. : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), logctx(_logctx)
  366. {
  367. eof = false;
  368. eogPending = false;
  369. bufferBase = NULL;
  370. rowDeserializer.setown(rowAllocator->createDiskDeserializer(codeContext));
  371. bufferStream.setown(createMemoryBufferSerialStream(blockBuffer));
  372. rowSource.setStream(bufferStream);
  373. }
  374. ~RawDataReader()
  375. {
  376. if (bufferBase)
  377. free(bufferBase);
  378. }
  379. virtual const void *nextInGroup()
  380. {
  381. if (eof)
  382. return NULL;
  383. if (rowSource.eos() && !reload())
  384. return NULL;
  385. if (eogPending)
  386. {
  387. eogPending = false;
  388. return NULL;
  389. }
  390. #if 0
  391. // MORE - think a bit about what happens on incomplete rows - I think deserializer will throw an exception?
  392. unsigned thisSize = meta.getRecordSize(data+cursor);
  393. if (thisSize > lenData-cursor)
  394. {
  395. CTXLOG("invalid stored dataset - incomplete row at end");
  396. throw MakeStringException(ROXIE_DATA_ERROR, "invalid stored dataset - incomplete row at end");
  397. }
  398. #endif
  399. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  400. size32_t size = rowDeserializer->deserialize(rowBuilder, rowSource);
  401. if (isGrouped)
  402. rowSource.read(sizeof(bool), &eogPending);
  403. atomic_inc(&rowsIn);
  404. return rowBuilder.finalizeRowClear(size);
  405. }
  406. };
  407. class InlineRawDataReader : public RawDataReader
  408. {
  409. Linked<IPropertyTree> xml;
  410. public:
  411. InlineRawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, IPropertyTree *_xml)
  412. : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), xml(_xml)
  413. {
  414. }
  415. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base)
  416. {
  417. base = tgt = NULL;
  418. if (xml)
  419. {
  420. MemoryBuffer result;
  421. xml->getPropBin(NULL, result);
  422. tlen = result.length();
  423. base = tgt = result.detach();
  424. xml.clear();
  425. return tlen != 0;
  426. }
  427. else
  428. {
  429. tlen = 0;
  430. return false;
  431. }
  432. }
  433. };
  434. class StreamedRawDataReader : public RawDataReader
  435. {
  436. SafeSocket &client;
  437. StringAttr id;
  438. offset_t offset;
  439. public:
  440. StreamedRawDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, SafeSocket &_client, const char *_id)
  441. : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), client(_client), id(_id)
  442. {
  443. offset = 0;
  444. }
  445. virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base)
  446. {
  447. try
  448. {
  449. #ifdef FAKE_EXCEPTIONS
  450. if (offset > 0x10000)
  451. throw MakeStringException(ROXIE_INTERNAL_ERROR, "TEST EXCEPTION");
  452. #endif
  453. // Go request from the socket
  454. MemoryBuffer request;
  455. request.reserve(sizeof(size32_t));
  456. request.append('D');
  457. offset_t loffset = offset;
  458. _WINREV(loffset);
  459. request.append(sizeof(loffset), &loffset);
  460. request.append(strlen(id)+1, id);
  461. size32_t len = request.length() - sizeof(size32_t);
  462. len |= 0x80000000;
  463. _WINREV(len);
  464. *(size32_t *) request.toByteArray() = len;
  465. client.write(request.toByteArray(), request.length());
  466. // Note: I am the only thread reading (we only support a single input dataset in roxiepipe mode)
  467. MemoryBuffer reply;
  468. client.readBlock(reply, readTimeout);
  469. tlen = reply.length();
  470. // MORE - not very robust!
  471. // skip past block header
  472. if (tlen > 0)
  473. {
  474. tgt = base = reply.detach();
  475. tgt = ((char *)base) + 9;
  476. tgt = strchr((char *)tgt, '\0') + 1;
  477. tlen -= ((char *)tgt - (char *)base);
  478. offset += tlen;
  479. }
  480. else
  481. tgt = base = NULL;
  482. return tlen != 0;
  483. }
  484. catch (IException *E)
  485. {
  486. StringBuffer text;
  487. E->errorMessage(text);
  488. int errCode = E->errorCode();
  489. E->Release();
  490. IException *ee = MakeStringException(MSGAUD_internal, errCode, "%s", text.str());
  491. logctx.logOperatorException(ee, __FILE__, __LINE__, "Exception caught in RawDataReader::nextBlock");
  492. throw ee;
  493. }
  494. catch (...)
  495. {
  496. logctx.logOperatorException(NULL, __FILE__, __LINE__, "Unknown exception caught in RawDataReader::nextBlock");
  497. throw;
  498. }
  499. }
  500. };
  501. class InlineXmlDataReader : public WorkUnitRowReaderBase
  502. {
  503. Linked<IPropertyTree> xml;
  504. Owned <XmlColumnProvider> columns;
  505. Owned<IPropertyTreeIterator> rows;
  506. IXmlToRowTransformer &rowTransformer;
  507. public:
  508. IMPLEMENT_IINTERFACE;
  509. InlineXmlDataReader(IXmlToRowTransformer &_rowTransformer, IPropertyTree *_xml, IEngineRowAllocator *_rowAllocator, bool _isGrouped)
  510. : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), xml(_xml), rowTransformer(_rowTransformer)
  511. {
  512. columns.setown(new XmlDatasetColumnProvider);
  513. rows.setown(xml->getElements("Row")); // NOTE - the 'hack for Gordon' as found in thorxmlread is not implemented here. Does it need to be ?
  514. rows->first();
  515. }
  516. virtual const void *nextInGroup()
  517. {
  518. if (rows->isValid())
  519. {
  520. columns->setRow(&rows->query());
  521. rows->next();
  522. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  523. NullDiskCallback callback;
  524. size_t outSize = rowTransformer.transform(rowBuilder, columns, &callback);
  525. return rowBuilder.finalizeRowClear(outSize);
  526. }
  527. return NULL;
  528. }
  529. };
  530. //---------------------------------------------------------------------------------------
  531. class CSlaveContext : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
  532. {
  533. protected:
  534. mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
  535. Owned<IRowManager> rowManager; // NOTE: the order of destruction here is significant. For leak check to work destroy this BEFORE allAllocators, but after most other things
  536. Owned <IDebuggableContext> debugContext;
  537. const IQueryFactory *factory;
  538. Owned<IProbeManager> probeManager; // must be destroyed after childGraphs
  539. MapXToMyClass<unsigned, unsigned, IActivityGraph> childGraphs;
  540. Owned<IActivityGraph> graph;
  541. unsigned priority;
  542. StringBuffer authToken;
  543. Owned<IPropertyTree> probeQuery;
  544. RoxiePacketHeader *header;
  545. unsigned lastWuAbortCheck;
  546. unsigned startTime;
  547. unsigned timeLimit;
  548. unsigned totSlavesReplyLen;
  549. unsigned ctxParallelJoinPreload;
  550. unsigned ctxFullKeyedJoinPreload;
  551. unsigned ctxKeyedJoinPreload;
  552. unsigned ctxConcatPreload;
  553. unsigned ctxFetchPreload;
  554. unsigned ctxPrefetchProjectPreload;
  555. bool traceActivityTimes;
  556. bool checkingHeap;
  557. Owned<IConstWorkUnit> workUnit;
  558. Owned<IRoxieDaliHelper> daliHelperLink;
  559. CriticalSection statsCrit;
  560. const IRoxieContextLogger &logctx;
  561. protected:
  562. CriticalSection resultsCrit;
  563. PointerIArrayOf<FlushingStringBuffer> resultMap;
  564. bool exceptionLogged;
  565. bool aborted;
  566. CriticalSection abortLock; // NOTE: we don't bother to get lock when just reading to see whether to abort
  567. Owned<IException> exception;
  568. static void _toXML(IPropertyTree *tree, StringBuffer &xgmml, unsigned indent)
  569. {
  570. if (tree->getPropInt("att[@name='_roxieStarted']/@value", 1) == 0)
  571. return;
  572. if (0 && tree->getPropInt("att[@name='_kind']/@value", 0) == 496)
  573. {
  574. Owned<IPropertyTreeIterator> sub = tree->getElements(".//att[@name='_roxieStarted']");
  575. bool foundOne = false;
  576. ForEach(*sub)
  577. {
  578. if (sub->query().getPropInt("@value", 1)==0)
  579. {
  580. foundOne = true;
  581. break;
  582. }
  583. }
  584. if (!foundOne)
  585. return;
  586. }
  587. const char *name = tree->queryName();
  588. xgmml.pad(indent).append('<').append(name);
  589. Owned<IAttributeIterator> it = tree->getAttributes(true);
  590. if (it->first())
  591. {
  592. do
  593. {
  594. const char *key = it->queryName();
  595. xgmml.append(' ').append(key+1).append("=\"");
  596. encodeXML(it->queryValue(), xgmml, ENCODE_NEWLINES);
  597. xgmml.append("\"");
  598. }
  599. while (it->next());
  600. }
  601. Owned<IPropertyTreeIterator> sub = tree->getElements("*", iptiter_sort);
  602. if (!sub->first())
  603. {
  604. xgmml.append("/>\n");
  605. }
  606. else
  607. {
  608. xgmml.append(">\n");
  609. for(; sub->isValid(); sub->next())
  610. _toXML(&sub->query(), xgmml, indent+1);
  611. xgmml.pad(indent).append("</").append(name).append(">\n");
  612. }
  613. }
  614. public:
  615. IMPLEMENT_IINTERFACE;
  616. CSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, unsigned _timeLimit, memsize_t _memoryLimit, IRoxieQueryPacket *_packet, bool _traceActivityTimes, bool _debuggerActive, bool _checkingHeap)
  617. : factory(_factory), logctx(_logctx)
  618. {
  619. if (_packet)
  620. header = &_packet->queryHeader();
  621. else
  622. header = NULL;
  623. timeLimit = _timeLimit;
  624. startTime = lastWuAbortCheck = msTick();
  625. ctxParallelJoinPreload = 0;
  626. ctxFullKeyedJoinPreload = 0;
  627. ctxKeyedJoinPreload = 0;
  628. ctxConcatPreload = 0;
  629. ctxFetchPreload = 0;
  630. ctxPrefetchProjectPreload = 0;
  631. traceActivityTimes = _traceActivityTimes;
  632. temporaries = NULL;
  633. deserializedResultStore = NULL;
  634. rereadResults = NULL;
  635. xmlStoredDatasetReadFlags = ptr_none;
  636. if (_debuggerActive)
  637. {
  638. CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
  639. slaveDebugContext->init(_packet);
  640. debugContext.setown(slaveDebugContext);
  641. probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
  642. }
  643. checkingHeap = _checkingHeap;
  644. aborted = false;
  645. allocatorMetaCache.setown(createRowAllocatorCache(this));
  646. rowManager.setown(roxiemem::createRowManager(_memoryLimit, this, logctx, allocatorMetaCache, false));
  647. //MORE: If checking heap required then should have
  648. //rowManager.setown(createCheckingHeap(rowManager)) or something similar.
  649. }
  650. ~CSlaveContext()
  651. {
  652. ::Release(rereadResults);
  653. ::Release(temporaries);
  654. ::Release(deserializedResultStore);
  655. }
  656. // interface IRoxieServerContext
  657. virtual void noteStatistic(unsigned statCode, unsigned __int64 value, unsigned count) const
  658. {
  659. logctx.noteStatistic(statCode, value, count);
  660. }
  661. virtual void CTXLOG(const char *format, ...) const
  662. {
  663. va_list args;
  664. va_start(args, format);
  665. logctx.CTXLOGva(format, args);
  666. va_end(args);
  667. }
  668. virtual void CTXLOGva(const char *format, va_list args) const
  669. {
  670. logctx.CTXLOGva(format, args);
  671. }
  672. virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
  673. {
  674. logctx.CTXLOGa(category, prefix, text);
  675. }
  676. virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
  677. {
  678. va_list args;
  679. va_start(args, format);
  680. logctx.logOperatorExceptionVA(E, file, line, format, args);
  681. va_end(args);
  682. }
  683. virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
  684. {
  685. logctx.logOperatorExceptionVA(E, file, line, format, args);
  686. }
  687. virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
  688. {
  689. va_list args;
  690. va_start(args, format);
  691. logctx.CTXLOGaeva(E, file, line, prefix, format, args);
  692. va_end(args);
  693. }
  694. virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
  695. {
  696. logctx.CTXLOGaeva(E, file, line, prefix, format, args);
  697. }
  698. virtual void CTXLOGl(LogItem *log) const
  699. {
  700. if (log->isStatistics())
  701. {
  702. logctx.noteStatistic(log->getStatCode(), log->getStatValue(), (unsigned)log->getStatCount());
  703. log->Release();
  704. }
  705. else
  706. logctx.CTXLOGl(log);
  707. }
  708. virtual unsigned logString(const char *text) const
  709. {
  710. if (text && *text)
  711. {
  712. CTXLOG("USER: %s", text);
  713. return strlen(text);
  714. }
  715. else
  716. return 0;
  717. }
  718. virtual const IContextLogger &queryContextLogger() const
  719. {
  720. return logctx;
  721. }
  722. virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
  723. {
  724. return logctx.getLogPrefix(ret);
  725. }
  726. virtual bool isIntercepted() const
  727. {
  728. return logctx.isIntercepted();
  729. }
  730. virtual bool isBlind() const
  731. {
  732. return logctx.isBlind();
  733. }
  734. virtual unsigned queryTraceLevel() const
  735. {
  736. return logctx.queryTraceLevel();
  737. }
  738. virtual void noteProcessed(const IRoxieContextLogger &activityContext, const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed, unsigned __int64 _totalCycles, unsigned __int64 _localCycles) const
  739. {
  740. if (traceActivityTimes)
  741. {
  742. StringBuffer text, prefix;
  743. text.appendf("%s outputIdx %d processed %d total %d us local %d us",
  744. getActivityText(activity->getKind()), _idx, _processed, (unsigned) (cycle_to_nanosec(_totalCycles)/1000), (unsigned)(cycle_to_nanosec(_localCycles)/1000));
  745. activityContext.getLogPrefix(prefix);
  746. CTXLOGa(LOG_TIMING, prefix.str(), text.str());
  747. }
  748. if (graphProgress)
  749. {
  750. IPropertyTree& nodeProgress = graphProgress->updateNode(activity->querySubgraphId(), activity->queryId());
  751. nodeProgress.setPropInt64("@totalTime", (unsigned) (cycle_to_nanosec(_totalCycles)/1000));
  752. nodeProgress.setPropInt64("@localTime", (unsigned) (cycle_to_nanosec(_localCycles)/1000));
  753. if (_processed)
  754. {
  755. StringBuffer edgeId;
  756. edgeId.append(activity->queryId()).append('_').append(_idx);
  757. IPropertyTree& edgeProgress = graphProgress->updateEdge(activity->querySubgraphId(), edgeId.str());
  758. edgeProgress.setPropInt64("@count", _processed);
  759. }
  760. }
  761. }
  762. virtual bool queryTimeActivities() const
  763. {
  764. return traceActivityTimes;
  765. }
  766. virtual bool queryCheckingHeap() const
  767. {
  768. return checkingHeap;
  769. }
  770. virtual void checkAbort()
  771. {
  772. // MORE - really should try to apply limits at slave end too
  773. #ifdef __linux__
  774. if (linuxYield)
  775. sched_yield();
  776. #endif
  777. #ifdef _DEBUG
  778. if (shuttingDown)
  779. throw MakeStringException(ROXIE_FORCE_SHUTDOWN, "Roxie is shutting down");
  780. #endif
  781. if (aborted) // NOTE - don't bother getting lock before reading this (for speed) - a false read is very unlikely and not a problem
  782. {
  783. CriticalBlock b(abortLock);
  784. if (!exception)
  785. exception.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Query was aborted"));
  786. throw exception.getLink();
  787. }
  788. if (graph)
  789. graph->checkAbort();
  790. if (timeLimit && (msTick() - startTime > timeLimit))
  791. {
  792. unsigned oldLimit = timeLimit;
  793. //timeLimit = 0; // to prevent exceptions in cleanup - this means only one arm gets stopped!
  794. CriticalBlock b(abortLock);
  795. IException *E = MakeStringException(ROXIE_TIMEOUT, "Query %s exceeded time limit (%d ms) - terminated", factory->queryQueryName(), oldLimit);
  796. if (!exceptionLogged)
  797. {
  798. logOperatorException(E, NULL, 0, NULL);
  799. exceptionLogged = true;
  800. }
  801. throw E;
  802. }
  803. if (workUnit && (msTick() - lastWuAbortCheck > 5000))
  804. {
  805. CriticalBlock b(abortLock);
  806. if (workUnit->aborting())
  807. {
  808. if (!exception)
  809. exception.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Query was aborted"));
  810. throw exception.getLink();
  811. }
  812. lastWuAbortCheck = msTick();
  813. }
  814. }
  815. virtual void notifyAbort(IException *E)
  816. {
  817. CriticalBlock b(abortLock);
  818. if (!aborted && QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
  819. {
  820. aborted = true;
  821. exception.set(E);
  822. setWUState(WUStateAborting);
  823. }
  824. }
  825. virtual void setWUState(WUState state)
  826. {
  827. if (workUnit)
  828. {
  829. WorkunitUpdate w(&workUnit->lock());
  830. w->setState(state);
  831. }
  832. }
  833. virtual bool checkWuAborted()
  834. {
  835. return workUnit && workUnit->aborting();
  836. }
  837. virtual unsigned parallelJoinPreload()
  838. {
  839. return ctxParallelJoinPreload;
  840. }
  841. virtual unsigned concatPreload()
  842. {
  843. return ctxConcatPreload;
  844. }
  845. virtual unsigned fetchPreload()
  846. {
  847. return ctxFetchPreload;
  848. }
  849. virtual unsigned fullKeyedJoinPreload()
  850. {
  851. return ctxFullKeyedJoinPreload;
  852. }
  853. virtual unsigned keyedJoinPreload()
  854. {
  855. return ctxKeyedJoinPreload;
  856. }
  857. virtual unsigned prefetchProjectPreload()
  858. {
  859. return ctxPrefetchProjectPreload;
  860. }
  861. const char *queryAuthToken()
  862. {
  863. return authToken.str();
  864. }
  865. virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph)
  866. {
  867. if (queryTraceLevel() > 10)
  868. CTXLOG("CSlaveContext %p noteChildGraph %d=%p", this, id, childGraph);
  869. childGraphs.setValue(id, childGraph);
  870. }
  871. virtual IActivityGraph *getLibraryGraph(const LibraryCallFactoryExtra &extra, IRoxieServerActivity *parentActivity)
  872. {
  873. if (extra.embedded)
  874. {
  875. return factory->lookupGraph(extra.libraryName, probeManager, *this, parentActivity);
  876. }
  877. else
  878. {
  879. Owned<IQueryFactory> libraryQuery = factory->lookupLibrary(extra.libraryName, extra.interfaceHash, *this);
  880. assertex(libraryQuery);
  881. return libraryQuery->lookupGraph("graph1", probeManager, *this, parentActivity);
  882. }
  883. }
  884. void beginGraph(const char *graphName)
  885. {
  886. if (debugContext)
  887. {
  888. probeManager.clear(); // Hack!
  889. probeManager.setown(createDebugManager(debugContext, graphName));
  890. debugContext->checkBreakpoint(DebugStateGraphCreate, NULL, graphName);
  891. }
  892. else if (probeAllRows || probeQuery != NULL)
  893. probeManager.setown(createProbeManager());
  894. graph.setown(factory->lookupGraph(graphName, probeManager, *this, NULL));
  895. graph->onCreate(this, NULL); // MORE - is that right
  896. if (debugContext)
  897. debugContext->checkBreakpoint(DebugStateGraphStart, NULL, graphName);
  898. }
  899. Owned<IWUGraphProgress> graphProgress; // could make local to endGraph and pass to reset - might be cleaner
  900. virtual void endGraph(bool aborting)
  901. {
  902. if (graph)
  903. {
  904. if (debugContext)
  905. debugContext->checkBreakpoint(aborting ? DebugStateGraphAbort : DebugStateGraphEnd, NULL, graph->queryName());
  906. if (aborting)
  907. graph->abort();
  908. WorkunitUpdate progressWorkUnit(NULL);
  909. Owned<IConstWUGraphProgress> progress;
  910. if (workUnit)
  911. {
  912. progressWorkUnit.setown(&workUnit->lock());
  913. progress.setown(progressWorkUnit->getGraphProgress(graph->queryName()));
  914. graphProgress.setown(progress->update());
  915. }
  916. graph->reset();
  917. if (graphProgress)
  918. {
  919. graphProgress.clear();
  920. progress.clear();
  921. }
  922. graph.clear();
  923. childGraphs.kill();
  924. }
  925. }
  926. void runGraph()
  927. {
  928. try
  929. {
  930. graph->execute();
  931. if (probeQuery)
  932. graph->getProbeResponse(probeQuery);
  933. }
  934. catch(...)
  935. {
  936. if (probeQuery)
  937. graph->getProbeResponse(probeQuery);
  938. throw;
  939. }
  940. }
  941. virtual void executeGraph(const char * name, bool realThor, size32_t parentExtractSize, const void * parentExtract)
  942. {
  943. assertex(parentExtractSize == 0);
  944. if (queryTraceLevel() > 8)
  945. CTXLOG("Executing graph %s", name);
  946. assertex(!realThor);
  947. bool created = false;
  948. try
  949. {
  950. beginGraph(name);
  951. created = true;
  952. runGraph();
  953. }
  954. catch (IException *e)
  955. {
  956. if (e->errorAudience() == MSGAUD_operator)
  957. EXCLOG(e, "Exception thrown in query - cleaning up"); // if an IException is throw let EXCLOG determine if a trap should be generated
  958. else
  959. {
  960. StringBuffer s;
  961. CTXLOG("Exception thrown in query - cleaning up: %d: %s", e->errorCode(), e->errorMessage(s).str());
  962. }
  963. if (created)
  964. endGraph(true);
  965. CTXLOG("Done cleaning up");
  966. throw;
  967. }
  968. catch (...)
  969. {
  970. CTXLOG("Exception thrown in query - cleaning up");
  971. if (created)
  972. endGraph(true);
  973. CTXLOG("Done cleaning up");
  974. throw;
  975. }
  976. endGraph(false);
  977. }
  978. virtual IActivityGraph * queryChildGraph(unsigned id)
  979. {
  980. if (queryTraceLevel() > 10)
  981. CTXLOG("CSlaveContext %p resolveChildGraph %d", this, id);
  982. IActivityGraph *childGraph = childGraphs.getValue(id);
  983. assertex(childGraph);
  984. return childGraph;
  985. }
  986. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
  987. {
  988. // NOTE - part of ICodeContext interface
  989. return LINK(queryChildGraph((unsigned) activityId)->queryChildGraph());
  990. }
  991. virtual IEclGraphResults * resolveLocalQuery(__int64 id)
  992. {
  993. return queryChildGraph((unsigned) id)->queryLocalGraph();
  994. }
  995. virtual IRowManager &queryRowManager()
  996. {
  997. return *rowManager;
  998. }
  999. virtual void addSlavesReplyLen(unsigned len)
  1000. {
  1001. CriticalBlock b(statsCrit); // MORE: change to atomic_add, or may not need it at all?
  1002. totSlavesReplyLen += len;
  1003. }
  1004. virtual const char *loadResource(unsigned id)
  1005. {
  1006. ILoadedDllEntry *dll = factory->queryDll();
  1007. return (const char *) dll->getResource(id);
  1008. }
  1009. virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
  1010. {
  1011. CDateTime cacheDate; // Note - this is empty meaning we don't know...
  1012. return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
  1013. }
  1014. virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
  1015. {
  1016. throwUnexpected(); // only support writing on the server
  1017. }
  1018. virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
  1019. {
  1020. // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
  1021. // (possibly we could get just local if the channel matches but not sure there is any point)
  1022. Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
  1023. if (dFile)
  1024. {
  1025. MemoryBuffer mb;
  1026. mb.append(sizeof(RoxiePacketHeader), &header);
  1027. mb.append(lfn);
  1028. dFile->serializePartial(mb, header.channel, isLocal);
  1029. ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
  1030. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  1031. reply->queryHeader().retries = 0;
  1032. ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
  1033. return;
  1034. }
  1035. ROQ->sendAbortCallback(header, lfn, *this);
  1036. throwUnexpected();
  1037. }
  1038. virtual ICodeContext *queryCodeContext()
  1039. {
  1040. return this;
  1041. }
  1042. virtual IRoxieServerContext *queryServerContext()
  1043. {
  1044. return NULL;
  1045. }
  1046. virtual IProbeManager *queryProbeManager() const
  1047. {
  1048. return probeManager;
  1049. }
  1050. virtual IDebuggableContext *queryDebugContext() const
  1051. {
  1052. return debugContext;
  1053. }
  1054. virtual char *getOS()
  1055. {
  1056. #ifdef _WIN32
  1057. return strdup("windows");
  1058. #else
  1059. return strdup("linux");
  1060. #endif
  1061. }
  1062. virtual const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  1063. {
  1064. return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  1065. }
  1066. virtual IEngineContext *queryEngineContext() { return NULL; }
  1067. virtual char *getDaliServers() { throwUnexpected(); }
  1068. // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server (or in child query on slave), they will be implemented by more derived CRoxieServerContext class
  1069. virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
  1070. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  1071. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
  1072. virtual void setResultInt(const char *name, unsigned sequence, __int64 value) { throwUnexpected(); }
  1073. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  1074. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
  1075. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
  1076. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
  1077. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value) { throwUnexpected(); }
  1078. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
  1079. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
  1080. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
  1081. virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
  1082. virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) { throwUnexpected(); }
  1083. virtual char *getWuid() { throwUnexpected(); }
  1084. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1085. virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
  1086. virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
  1087. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
  1088. virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
  1089. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); }
  1090. virtual unsigned getNodes() { throwUnexpected(); }
  1091. virtual unsigned getNodeNum() { throwUnexpected(); }
  1092. virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); }
  1093. virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
  1094. virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
  1095. virtual char *getJobName() { throwUnexpected(); }
  1096. virtual char *getJobOwner() { throwUnexpected(); }
  1097. virtual char *getClusterName() { throwUnexpected(); }
  1098. virtual char *getGroupName() { throwUnexpected(); }
  1099. virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
  1100. virtual unsigned getPriority() const { throwUnexpected(); }
  1101. virtual char *getPlatform() { throwUnexpected(); }
  1102. virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
  1103. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1104. {
  1105. return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
  1106. }
  1107. virtual const char *cloneVString(const char *str) const
  1108. {
  1109. return rowManager->cloneVString(str);
  1110. }
  1111. virtual const char *cloneVString(size32_t len, const char *str) const
  1112. {
  1113. return rowManager->cloneVString(len, str);
  1114. }
  1115. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  1116. {
  1117. convertRowToXML(lenResult, result, info, row, flags);
  1118. }
  1119. virtual IWorkUnit *updateWorkUnit() const
  1120. {
  1121. if (workUnit)
  1122. return &workUnit->lock();
  1123. else
  1124. return NULL;
  1125. }
  1126. virtual IConstWorkUnit *queryWorkUnit() const
  1127. {
  1128. return workUnit;
  1129. }
  1130. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1131. virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
  1132. {
  1133. if (checkingHeap)
  1134. return createCrcRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
  1135. else
  1136. return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
  1137. }
  1138. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  1139. {
  1140. try
  1141. {
  1142. Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, isGrouped);
  1143. wuReader->getResultRowset(tcount, tgt);
  1144. }
  1145. catch (IException * e)
  1146. {
  1147. StringBuffer text;
  1148. e->errorMessage(text);
  1149. e->Release();
  1150. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1151. }
  1152. catch (...)
  1153. {
  1154. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1155. }
  1156. }
  1157. virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
  1158. {
  1159. try
  1160. {
  1161. Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, false);
  1162. wuReader->getResultRowset(tcount, tgt);
  1163. }
  1164. catch (IException * e)
  1165. {
  1166. StringBuffer text;
  1167. e->errorMessage(text);
  1168. e->Release();
  1169. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1170. }
  1171. catch (...)
  1172. {
  1173. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1174. }
  1175. }
  1176. virtual bool getResultBool(const char * name, unsigned sequence)
  1177. {
  1178. CriticalBlock b(contextCrit);
  1179. return useContext(sequence).getPropBool(name);
  1180. }
  1181. static unsigned hex2digit(char c)
  1182. {
  1183. // MORE - what about error cases?
  1184. if (c >= 'a')
  1185. return (c - 'a' + 10);
  1186. else if (c >= 'A')
  1187. return (c - 'A' + 10);
  1188. return (c - '0');
  1189. }
  1190. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
  1191. {
  1192. MemoryBuffer result;
  1193. CriticalBlock b(contextCrit);
  1194. const char *val = useContext(sequence).queryProp(name);
  1195. if (val)
  1196. {
  1197. loop
  1198. {
  1199. char c0 = *val++;
  1200. if (!c0)
  1201. break;
  1202. char c1 = *val++;
  1203. if (!c1)
  1204. break; // Shouldn't really happen - we expect even length
  1205. unsigned c2 = (hex2digit(c0) << 4) | hex2digit(c1);
  1206. result.append((unsigned char) c2);
  1207. }
  1208. }
  1209. tlen = result.length();
  1210. tgt = result.detach();
  1211. }
  1212. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  1213. {
  1214. if (isSpecialResultSequence(sequence))
  1215. {
  1216. MemoryBuffer m;
  1217. CriticalBlock b(contextCrit);
  1218. useContext(sequence).getPropBin(stepname, m);
  1219. if (m.length())
  1220. {
  1221. assertex(m.length() == tlen);
  1222. m.read(tlen, tgt);
  1223. }
  1224. else
  1225. memset(tgt, 0, tlen);
  1226. }
  1227. else
  1228. {
  1229. StringBuffer x;
  1230. {
  1231. CriticalBlock b(contextCrit);
  1232. useContext(sequence).getProp(stepname, x);
  1233. }
  1234. Decimal d;
  1235. d.setString(x.length(), x.str());
  1236. if (isSigned)
  1237. d.getDecimal(tlen, precision, tgt);
  1238. else
  1239. d.getUDecimal(tlen, precision, tgt);
  1240. }
  1241. }
  1242. virtual __int64 getResultInt(const char * name, unsigned sequence)
  1243. {
  1244. CriticalBlock b(contextCrit);
  1245. const char *val = useContext(sequence).queryProp(name);
  1246. if (val)
  1247. {
  1248. // NOTE - we use this rather than getPropInt64 since it handles uint64 values up to MAX_UINT better (for our purposes)
  1249. return rtlStrToInt8(strlen(val), val);
  1250. }
  1251. else
  1252. return 0;
  1253. }
  1254. virtual double getResultReal(const char * name, unsigned sequence)
  1255. {
  1256. CriticalBlock b(contextCrit);
  1257. IPropertyTree &ctx = useContext(sequence);
  1258. double ret = 0;
  1259. if (ctx.hasProp(name))
  1260. {
  1261. if (ctx.isBinary(name))
  1262. {
  1263. MemoryBuffer buf;
  1264. ctx.getPropBin(name, buf);
  1265. buf.read(ret);
  1266. }
  1267. else
  1268. {
  1269. const char *val = ctx.queryProp(name);
  1270. if (val)
  1271. ret = atof(val);
  1272. }
  1273. }
  1274. return ret;
  1275. }
  1276. virtual void getResultSet(bool & tisAll, unsigned & tlen, void * & tgt, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  1277. {
  1278. try
  1279. {
  1280. CriticalBlock b(contextCrit);
  1281. IPropertyTree &ctx = useContext(sequence);
  1282. IPropertyTree *val = ctx.queryPropTree(stepname);
  1283. doExtractRawResultX(tlen, tgt, val, sequence, xmlTransformer, csvTransformer, true);
  1284. tisAll = val ? val->getPropBool("@isAll", false) : false;
  1285. }
  1286. catch (IException * e)
  1287. {
  1288. StringBuffer text;
  1289. e->errorMessage(text);
  1290. e->Release();
  1291. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve set \"%s\". [%s]", stepname, text.str());
  1292. }
  1293. catch (...)
  1294. {
  1295. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve set \"%s\"", stepname);
  1296. }
  1297. }
  1298. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  1299. {
  1300. try
  1301. {
  1302. CriticalBlock b(contextCrit);
  1303. IPropertyTree &ctx = useContext(sequence);
  1304. IPropertyTree *val = ctx.queryPropTree(stepname);
  1305. doExtractRawResultX(tlen, tgt, val, sequence, xmlTransformer, csvTransformer, false);
  1306. }
  1307. catch (IException * e)
  1308. {
  1309. StringBuffer text;
  1310. e->errorMessage(text);
  1311. e->Release();
  1312. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
  1313. }
  1314. catch (...)
  1315. {
  1316. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
  1317. }
  1318. }
  1319. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
  1320. {
  1321. MemoryBuffer x;
  1322. bool isBinary;
  1323. {
  1324. CriticalBlock b(contextCrit);
  1325. IPropertyTree &ctx = useContext(sequence);
  1326. isBinary = ctx.isBinary(name);
  1327. ctx.getPropBin(name, x);
  1328. }
  1329. if (isBinary) // No utf8 translation if previously set via setResultString
  1330. {
  1331. tlen = x.length();
  1332. tgt = (char *) x.detach();
  1333. }
  1334. else
  1335. rtlUtf8ToStrX(tlen, tgt, rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1336. }
  1337. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
  1338. {
  1339. MemoryBuffer x;
  1340. bool isBinary;
  1341. {
  1342. CriticalBlock b(contextCrit);
  1343. IPropertyTree &ctx = useContext(sequence);
  1344. isBinary = ctx.isBinary(name);
  1345. ctx.getPropBin(name, x);
  1346. }
  1347. if (isBinary)
  1348. rtlStrToStr(tlen, tgt, x.length(), x.toByteArray());
  1349. else
  1350. rtlUtf8ToStr(tlen, tgt, rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1351. }
  1352. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
  1353. {
  1354. StringBuffer x;
  1355. {
  1356. CriticalBlock b(contextCrit);
  1357. useContext(sequence).getProp(name, x);
  1358. }
  1359. tgt = rtlCodepageToVUnicodeX(x.length(), x.str(), "utf-8");
  1360. tlen = rtlUnicodeStrlen(tgt);
  1361. }
  1362. virtual char *getResultVarString(const char * name, unsigned sequence)
  1363. {
  1364. CriticalBlock b(contextCrit);
  1365. IPropertyTree &ctx = useContext(sequence);
  1366. bool isBinary = ctx.isBinary(name);
  1367. if (isBinary)
  1368. {
  1369. StringBuffer s;
  1370. ctx.getProp(name, s);
  1371. return s.detach();
  1372. }
  1373. else
  1374. {
  1375. MemoryBuffer x;
  1376. ctx.getPropBin(name, x);
  1377. return rtlUtf8ToVStr(rtlUtf8Length(x.length(), x.toByteArray()), x.toByteArray());
  1378. }
  1379. }
  1380. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
  1381. {
  1382. StringBuffer x;
  1383. CriticalBlock b(contextCrit);
  1384. useContext(sequence).getProp(name, x);
  1385. return rtlVCodepageToVUnicodeX(x.str(), "utf-8");
  1386. }
  1387. protected:
  1388. mutable CriticalSection contextCrit;
  1389. Owned<IPropertyTree> context;
  1390. IPropertyTree *temporaries;
  1391. IPropertyTree *rereadResults;
  1392. PTreeReaderOptions xmlStoredDatasetReadFlags;
  1393. CDeserializedResultStore *deserializedResultStore;
  1394. IPropertyTree &useContext(unsigned sequence)
  1395. {
  1396. checkAbort();
  1397. switch (sequence)
  1398. {
  1399. case ResultSequenceStored:
  1400. if (context)
  1401. return *context;
  1402. else
  1403. throw MakeStringException(ROXIE_CODEGEN_ERROR, "Code generation error - attempting to access stored variable on slave");
  1404. case ResultSequencePersist:
  1405. throwUnexpected(); // Do not expect to see in Roxie
  1406. case ResultSequenceInternal:
  1407. {
  1408. CriticalBlock b(contextCrit);
  1409. if (!temporaries)
  1410. temporaries = createPTree();
  1411. return *temporaries;
  1412. }
  1413. case ResultSequenceOnce:
  1414. {
  1415. return factory->queryOnceContext(logctx);
  1416. }
  1417. default:
  1418. {
  1419. CriticalBlock b(contextCrit);
  1420. if (!rereadResults)
  1421. rereadResults = createPTree();
  1422. return *rereadResults;
  1423. }
  1424. }
  1425. }
  1426. IDeserializedResultStore &useResultStore(unsigned sequence)
  1427. {
  1428. checkAbort();
  1429. switch (sequence)
  1430. {
  1431. case ResultSequenceOnce:
  1432. return factory->queryOnceResultStore();
  1433. default:
  1434. // No need to have separate stores for other temporaries...
  1435. CriticalBlock b(contextCrit);
  1436. if (!deserializedResultStore)
  1437. deserializedResultStore = new CDeserializedResultStore;
  1438. return *deserializedResultStore;
  1439. }
  1440. }
  1441. void doExtractRawResultX(unsigned & tlen, void * & tgt, IPropertyTree *val, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, bool isSet)
  1442. {
  1443. tgt = NULL;
  1444. tlen = 0;
  1445. if (val)
  1446. {
  1447. if (val->isBinary())
  1448. {
  1449. MemoryBuffer m;
  1450. val->getPropBin(NULL, m);
  1451. tlen = m.length();
  1452. tgt= m.detach();
  1453. }
  1454. else
  1455. {
  1456. const char *format = val->queryProp("@format");
  1457. if (!format || strcmp(format, "xml")==0)
  1458. {
  1459. assertex(xmlTransformer);
  1460. Variable2IDataVal result(&tlen, &tgt);
  1461. CXmlToRawTransformer rawTransformer(*xmlTransformer, xmlStoredDatasetReadFlags);
  1462. rawTransformer.transformTree(result, *val, !isSet);
  1463. }
  1464. else if (strcmp(format, "deserialized")==0)
  1465. {
  1466. IDeserializedResultStore &resultStore = useResultStore(sequence);
  1467. resultStore.serialize(tlen, tgt, val->getPropInt("@id", -1), queryCodeContext());
  1468. }
  1469. else if (strcmp(format, "csv")==0)
  1470. {
  1471. // MORE - never tested this code.....
  1472. assertex(csvTransformer);
  1473. Variable2IDataVal result(&tlen, &tgt);
  1474. MemoryBuffer m;
  1475. val->getPropBin(NULL, m);
  1476. CCsvToRawTransformer rawCsvTransformer(*csvTransformer);
  1477. rawCsvTransformer.transform(result, m.length(), m.toByteArray(), !isSet);
  1478. }
  1479. else
  1480. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "no transform function available");
  1481. }
  1482. }
  1483. }
  1484. virtual IWorkUnitRowReader *createStreamedRawRowReader(IEngineRowAllocator *rowAllocator, bool isGrouped, const char *id)
  1485. {
  1486. throwUnexpected(); // Should only see on server
  1487. }
  1488. virtual IWorkUnitRowReader *getWorkunitRowReader(const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
  1489. {
  1490. try
  1491. {
  1492. CriticalBlock b(contextCrit);
  1493. IPropertyTree &ctx = useContext(sequence);
  1494. IPropertyTree *val = ctx.queryPropTree(stepname);
  1495. if (val)
  1496. {
  1497. const char *id = val->queryProp("@id");
  1498. const char *format = val->queryProp("@format");
  1499. if (id)
  1500. {
  1501. if (!format || strcmp(format, "raw") == 0)
  1502. {
  1503. return createStreamedRawRowReader(rowAllocator, isGrouped, id);
  1504. }
  1505. else if (strcmp(format, "deserialized") == 0)
  1506. {
  1507. IDeserializedResultStore &resultStore = useResultStore(sequence);
  1508. return resultStore.createDeserializedReader(atoi(id));
  1509. }
  1510. else
  1511. throwUnexpected();
  1512. }
  1513. else
  1514. {
  1515. if (!format || strcmp(format, "xml") == 0)
  1516. {
  1517. if (xmlTransformer)
  1518. return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator, isGrouped);
  1519. }
  1520. else if (strcmp(format, "raw") == 0)
  1521. {
  1522. return new InlineRawDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, val);
  1523. }
  1524. else
  1525. throwUnexpected();
  1526. }
  1527. }
  1528. }
  1529. catch (IException * e)
  1530. {
  1531. StringBuffer text;
  1532. e->errorMessage(text);
  1533. e->Release();
  1534. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s. [%s]", stepname, text.str());
  1535. }
  1536. catch (...)
  1537. {
  1538. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
  1539. }
  1540. throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
  1541. }
  1542. };
  1543. IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, unsigned _timeLimit, memsize_t _memoryLimit, IRoxieQueryPacket *packet)
  1544. {
  1545. return new CSlaveContext(_factory, _logctx, _timeLimit, _memoryLimit, packet, _logctx.queryTraceActivityTimes(), _logctx.queryDebuggerActive(), _logctx.queryCheckingHeap());
  1546. }
  1547. class CRoxieServerDebugContext : extends CBaseServerDebugContext
  1548. {
  1549. // Some questions:
  1550. // 1. Do we let all threads go even when say step? Probably... (may allow a thread to be suspended at some point)
  1551. // 2. Doesn't that then make a bit of a mockery of step (when there are multiple threads active)... I _think_ it actually means we DON'T try to wait for all
  1552. // threads to hit a stop, but allow any that hit stop while we are paused to be queued up to be returned by step.... perhaps actually stop them in critsec rather than
  1553. // semaphore and it all becomes easier to code... Anything calling checkBreakPoint while program state is "in debugger" will block on that critSec.
  1554. // 3. I think we need to recheck breakpoints on Roxie server but just check not deleted
  1555. public:
  1556. IRoxieSlaveContext *ctx;
  1557. CRoxieServerDebugContext(IRoxieSlaveContext *_ctx, const IContextLogger &_logctx, IPropertyTree *_queryXGMML, SafeSocket &_client)
  1558. : CBaseServerDebugContext(_logctx, _queryXGMML, _client), ctx(_ctx)
  1559. {
  1560. }
  1561. void debugCounts(IXmlWriter *output, unsigned sinceSequence, bool reset)
  1562. {
  1563. CriticalBlock b(debugCrit);
  1564. if (running)
  1565. throw MakeStringException(ROXIE_DEBUG_ERROR, "Command not available while query is running");
  1566. if (currentGraph)
  1567. currentGraph->mergeRemoteCounts(this);
  1568. CBaseServerDebugContext::debugCounts(output, sinceSequence, reset);
  1569. }
  1570. virtual void waitForDebugger(DebugState state, IActivityDebugContext *probe)
  1571. {
  1572. ctx->setWUState(WUStateDebugPaused);
  1573. CBaseServerDebugContext::waitForDebugger(state, probe);
  1574. ctx->setWUState(WUStateDebugRunning);
  1575. }
  1576. virtual bool onDebuggerTimeout()
  1577. {
  1578. return ctx->checkWuAborted();
  1579. }
  1580. virtual void debugInitialize(const char *id, const char *_queryName, bool _breakAtStart)
  1581. {
  1582. CBaseServerDebugContext::debugInitialize(id, _queryName, _breakAtStart);
  1583. queryRoxieDebugSessionManager().registerDebugId(id, this);
  1584. }
  1585. virtual void debugTerminate()
  1586. {
  1587. CriticalBlock b(debugCrit);
  1588. assertex(running);
  1589. currentState = DebugStateUnloaded;
  1590. running = false;
  1591. queryRoxieDebugSessionManager().deregisterDebugId(debugId);
  1592. if (debuggerActive)
  1593. {
  1594. debuggerSem.signal(debuggerActive);
  1595. debuggerActive = 0;
  1596. }
  1597. }
  1598. virtual IRoxieQueryPacket *onDebugCallback(const RoxiePacketHeader &header, size32_t len, char *data)
  1599. {
  1600. MemoryBuffer slaveInfo;
  1601. slaveInfo.setBuffer(len, data, false);
  1602. unsigned debugSequence;
  1603. slaveInfo.read(debugSequence);
  1604. {
  1605. CriticalBlock b(breakCrit); // we want to wait until it's our turn before updating the graph info or the counts get ahead of the current row and life is confusing
  1606. char slaveStateChar;
  1607. slaveInfo.read(slaveStateChar);
  1608. DebugState slaveState = (DebugState) slaveStateChar;
  1609. if (slaveState==DebugStateGraphFinished)
  1610. {
  1611. unsigned numCounts;
  1612. slaveInfo.read(numCounts);
  1613. while (numCounts)
  1614. {
  1615. StringAttr edgeId;
  1616. unsigned edgeCount;
  1617. slaveInfo.read(edgeId);
  1618. slaveInfo.read(edgeCount);
  1619. Owned<IGlobalEdgeRecord> thisEdge = getEdgeRecord(edgeId);
  1620. thisEdge->incrementCount(edgeCount, sequence);
  1621. numCounts--;
  1622. }
  1623. }
  1624. slaveInfo.read(currentBreakpointUID);
  1625. memsize_t slaveActivity;
  1626. unsigned channel;
  1627. __uint64 tmp;
  1628. slaveInfo.read(tmp);
  1629. slaveActivity = (memsize_t)tmp;
  1630. slaveInfo.read(channel);
  1631. assertex(currentGraph);
  1632. currentGraph->deserializeProxyGraphs(slaveState, slaveInfo, (IActivityBase *) slaveActivity, channel);
  1633. if (slaveState != DebugStateGraphFinished) // MORE - this is debatable - may (at least sometimes) want a child graph finished to be a notified event...
  1634. {
  1635. StringBuffer slaveActivityId;
  1636. slaveInfo.read(slaveActivityId);
  1637. IActivityDebugContext *slaveActivityCtx = slaveActivityId.length() ? currentGraph->lookupActivityByEdgeId(slaveActivityId.str()) : NULL;
  1638. BreakpointActionMode action = checkBreakpoint(slaveState, slaveActivityCtx , NULL);
  1639. }
  1640. }
  1641. MemoryBuffer mb;
  1642. mb.append(sizeof(RoxiePacketHeader), &header);
  1643. StringBuffer debugIdString;
  1644. debugIdString.appendf(".debug.%x", debugSequence);
  1645. mb.append(debugIdString.str());
  1646. serialize(mb);
  1647. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  1648. reply->queryHeader().activityId = ROXIE_DEBUGCALLBACK;
  1649. reply->queryHeader().retries = 0;
  1650. return reply.getClear();
  1651. }
  1652. virtual void debugPrintVariable(IXmlWriter *output, const char *name, const char *type) const
  1653. {
  1654. CriticalBlock b(debugCrit);
  1655. if (running)
  1656. throw MakeStringException(ROXIE_DEBUG_ERROR, "Command not available while query is running");
  1657. output->outputBeginNested("Variables", true);
  1658. if (!type || stricmp(type, "temporary"))
  1659. {
  1660. output->outputBeginNested("Temporary", true);
  1661. ctx->printResults(output, name, (unsigned) ResultSequenceInternal);
  1662. output->outputEndNested("Temporary");
  1663. }
  1664. if (!type || stricmp(type, "global"))
  1665. {
  1666. output->outputBeginNested("Global", true);
  1667. ctx->printResults(output, name, (unsigned) ResultSequenceStored);
  1668. output->outputEndNested("Global");
  1669. }
  1670. output->outputEndNested("Variables");
  1671. }
  1672. };
  1673. class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext, implements IGlobalCodeContext
  1674. {
  1675. const IQueryFactory *serverQueryFactory;
  1676. CriticalSection daliUpdateCrit;
  1677. TextMarkupFormat mlFmt;
  1678. bool isRaw;
  1679. bool sendHeartBeats;
  1680. unsigned warnTimeLimit;
  1681. unsigned lastSocketCheckTime;
  1682. unsigned lastHeartBeat;
  1683. protected:
  1684. Owned<WorkflowMachine> workflow;
  1685. mutable MapStringToMyClass<IResolvedFile> fileCache;
  1686. SafeSocket *client;
  1687. bool isBlocked;
  1688. bool isHttp;
  1689. bool trim;
  1690. void doPostProcess()
  1691. {
  1692. CriticalBlock b(resultsCrit); // Probably not needed
  1693. if (!isRaw && !isBlocked)
  1694. {
  1695. ForEachItemIn(seq, resultMap)
  1696. {
  1697. FlushingStringBuffer *result = resultMap.item(seq);
  1698. if (result)
  1699. result->flush(true);
  1700. }
  1701. }
  1702. if (probeQuery)
  1703. {
  1704. FlushingStringBuffer response(client, isBlocked, MarkupFmt_XML, false, isHttp, *this);
  1705. // create output stream
  1706. response.startDataset("_Probe", NULL, (unsigned) -1); // initialize it
  1707. // loop through all of the graphs and create a _Probe to output each xgmml
  1708. Owned<IPropertyTreeIterator> graphs = probeQuery->getElements("Graph");
  1709. ForEach(*graphs)
  1710. {
  1711. IPropertyTree &graph = graphs->query();
  1712. StringBuffer xgmml;
  1713. _toXML(&graph, xgmml, 0);
  1714. response.append("\n");
  1715. response.append(xgmml.str());
  1716. }
  1717. }
  1718. }
  1719. void addWuException(IException *E)
  1720. {
  1721. if (workUnit)
  1722. ::addWuException(workUnit, E);
  1723. }
  1724. void init()
  1725. {
  1726. client = NULL;
  1727. totSlavesReplyLen = 0;
  1728. mlFmt = MarkupFmt_XML;
  1729. isRaw = false;
  1730. isBlocked = false;
  1731. isHttp = false;
  1732. trim = false;
  1733. priority = 0;
  1734. sendHeartBeats = false;
  1735. timeLimit = serverQueryFactory->getTimeLimit();
  1736. if (!timeLimit)
  1737. timeLimit = defaultTimeLimit[priority];
  1738. warnTimeLimit = serverQueryFactory->getWarnTimeLimit();
  1739. if (!warnTimeLimit)
  1740. warnTimeLimit = defaultWarnTimeLimit[priority];
  1741. lastSocketCheckTime = startTime;
  1742. lastHeartBeat = startTime;
  1743. ctxParallelJoinPreload = defaultParallelJoinPreload;
  1744. ctxFullKeyedJoinPreload = defaultFullKeyedJoinPreload;
  1745. ctxKeyedJoinPreload = defaultKeyedJoinPreload;
  1746. ctxConcatPreload = defaultConcatPreload;
  1747. ctxFetchPreload = defaultFetchPreload;
  1748. ctxPrefetchProjectPreload = defaultPrefetchProjectPreload;
  1749. traceActivityTimes = false;
  1750. }
  1751. void startWorkUnit()
  1752. {
  1753. WorkunitUpdate wu(&workUnit->lock());
  1754. if (!context->getPropBool("@outputToSocket", false))
  1755. client = NULL;
  1756. SCMStringBuffer wuParams;
  1757. if (workUnit->getXmlParams(wuParams).length())
  1758. {
  1759. // Merge in params from WU. Ones on command line take precedence though...
  1760. Owned<IPropertyTree> wuParamTree = createPTreeFromXMLString(wuParams.str(), ipt_caseInsensitive);
  1761. Owned<IPropertyTreeIterator> params = wuParamTree ->getElements("*");
  1762. ForEach(*params)
  1763. {
  1764. IPropertyTree &param = params->query();
  1765. if (!context->hasProp(param.queryName()))
  1766. context->addPropTree(param.queryName(), LINK(&param));
  1767. }
  1768. }
  1769. if (workUnit->getDebugValueBool("Debug", false))
  1770. {
  1771. bool breakAtStart = workUnit->getDebugValueBool("BreakAtStart", true);
  1772. wu->setState(WUStateDebugRunning);
  1773. SCMStringBuffer wuid;
  1774. initDebugMode(breakAtStart, workUnit->getWuid(wuid).str());
  1775. }
  1776. else
  1777. wu->setState(WUStateRunning);
  1778. }
  1779. void initDebugMode(bool breakAtStart, const char *debugUID)
  1780. {
  1781. if (!debugPermitted || !ownEP.port)
  1782. throw MakeStringException(ROXIE_ACCESS_ERROR, "Debug queries are not permitted on this system");
  1783. debugContext.setown(new CRoxieServerDebugContext(this, logctx, factory->cloneQueryXGMML(), *client));
  1784. debugContext->debugInitialize(debugUID, factory->queryQueryName(), breakAtStart);
  1785. if (workUnit)
  1786. {
  1787. WorkunitUpdate wu(&workUnit->lock());
  1788. wu->setDebugAgentListenerPort(ownEP.port); //tells debugger what port to write commands to
  1789. StringBuffer sb;
  1790. ownEP.getIpText(sb);
  1791. wu->setDebugAgentListenerIP(sb); //tells debugger what IP to write commands to
  1792. }
  1793. timeLimit = 0;
  1794. warnTimeLimit = 0;
  1795. }
  1796. public:
  1797. IMPLEMENT_IINTERFACE;
  1798. CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
  1799. : CSlaveContext(_factory, _logctx, 0, 0, NULL, false, false, false), serverQueryFactory(_factory)
  1800. {
  1801. init();
  1802. rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
  1803. workflow.setown(_factory->createWorkflowMachine(true, logctx));
  1804. context.setown(createPTree(ipt_caseInsensitive));
  1805. }
  1806. CRoxieServerContext(IConstWorkUnit *_workUnit, const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
  1807. : CSlaveContext(_factory, _logctx, 0, 0, NULL, false, false, false), serverQueryFactory(_factory)
  1808. {
  1809. init();
  1810. workUnit.set(_workUnit);
  1811. rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
  1812. workflow.setown(_factory->createWorkflowMachine(false, logctx));
  1813. context.setown(createPTree(ipt_caseInsensitive));
  1814. startWorkUnit();
  1815. }
  1816. CRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, TextMarkupFormat _mlFmt, bool _isRaw, bool _isBlocked, HttpHelper &httpHelper, bool _trim, unsigned _priority, const IRoxieContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags)
  1817. : CSlaveContext(_factory, _logctx, 0, 0, NULL, false, false, false), serverQueryFactory(_factory)
  1818. {
  1819. init();
  1820. context.set(_context);
  1821. client = &_client;
  1822. mlFmt = _mlFmt;
  1823. isRaw = _isRaw;
  1824. isBlocked = _isBlocked;
  1825. isHttp = httpHelper.isHttp();
  1826. trim = _trim;
  1827. priority = _priority;
  1828. xmlStoredDatasetReadFlags = _xmlReadFlags;
  1829. sendHeartBeats = enableHeartBeat && isRaw && isBlocked && priority==0;
  1830. timeLimit = context->getPropInt("_TimeLimit", timeLimit);
  1831. warnTimeLimit = context->getPropInt("_warnTimeLimit", warnTimeLimit);
  1832. const char *wuid = context->queryProp("@wuid");
  1833. if (wuid)
  1834. {
  1835. IRoxieDaliHelper *daliHelper = checkDaliConnection();
  1836. assertex(daliHelper );
  1837. workUnit.setown(daliHelper->attachWorkunit(wuid, _factory->queryDll()));
  1838. if (!workUnit)
  1839. throw MakeStringException(ROXIE_DALI_ERROR, "Failed to open workunit %s", wuid);
  1840. startWorkUnit();
  1841. }
  1842. else if (context->getPropBool("@debug", false))
  1843. {
  1844. bool breakAtStart = context->getPropBool("@break", true);
  1845. const char *debugUID = context->queryProp("@uid");
  1846. if (debugUID && *debugUID)
  1847. initDebugMode(breakAtStart, debugUID);
  1848. }
  1849. else if (context->getPropBool("_Probe", false))
  1850. probeQuery.setown(_factory->cloneQueryXGMML());
  1851. // MORE some of these might be appropriate in wu case too?
  1852. rowManager->setActivityTracking(context->getPropBool("_TraceMemory", false));
  1853. rowManager->setMemoryLimit((memsize_t) context->getPropInt64("_MemoryLimit", _factory->getMemoryLimit()));
  1854. authToken.append(httpHelper.queryAuthToken());
  1855. workflow.setown(_factory->createWorkflowMachine(false, logctx));
  1856. ctxParallelJoinPreload = context->getPropInt("_ParallelJoinPreload", defaultParallelJoinPreload);
  1857. ctxFullKeyedJoinPreload = context->getPropInt("_FullKeyedJoinPreload", defaultFullKeyedJoinPreload);
  1858. ctxKeyedJoinPreload = context->getPropInt("_KeyedJoinPreload", defaultKeyedJoinPreload);
  1859. ctxConcatPreload = context->getPropInt("_ConcatPreload", defaultConcatPreload);
  1860. ctxFetchPreload = context->getPropInt("_FetchPreload", defaultFetchPreload);
  1861. ctxPrefetchProjectPreload = context->getPropInt("_PrefetchProjectPreload", defaultPrefetchProjectPreload);
  1862. traceActivityTimes = context->getPropBool("_TraceActivityTimes", false) || context->getPropBool("@timing", false);
  1863. checkingHeap = context->getPropBool("_CheckingHeap", defaultCheckingHeap) || context->getPropBool("@checkingHeap", defaultCheckingHeap);
  1864. }
  1865. virtual roxiemem::IRowManager &queryRowManager()
  1866. {
  1867. return *rowManager;
  1868. }
  1869. virtual IRoxieDaliHelper *checkDaliConnection()
  1870. {
  1871. CriticalBlock b(daliUpdateCrit);
  1872. if (!daliHelperLink)
  1873. daliHelperLink.setown(::connectToDali());
  1874. return daliHelperLink;
  1875. }
  1876. virtual void checkAbort()
  1877. {
  1878. CSlaveContext::checkAbort();
  1879. unsigned ticksNow = msTick();
  1880. if (warnTimeLimit)
  1881. {
  1882. unsigned elapsed = ticksNow - startTime;
  1883. if (elapsed > warnTimeLimit)
  1884. {
  1885. CriticalBlock b(abortLock);
  1886. if (elapsed > warnTimeLimit) // we don't want critsec on the first check (for efficiency) but check again inside the critsec
  1887. {
  1888. logOperatorException(NULL, NULL, 0, "SLOW (%d ms): %s", elapsed, factory->queryQueryName());
  1889. warnTimeLimit = elapsed + warnTimeLimit;
  1890. }
  1891. }
  1892. }
  1893. if (client)
  1894. {
  1895. if (socketCheckInterval)
  1896. {
  1897. if (ticksNow - lastSocketCheckTime > socketCheckInterval)
  1898. {
  1899. CriticalBlock b(abortLock);
  1900. if (!client->checkConnection())
  1901. throw MakeStringException(ROXIE_CLIENT_CLOSED, "Client socket closed");
  1902. lastSocketCheckTime = ticksNow;
  1903. }
  1904. }
  1905. if (sendHeartBeats)
  1906. {
  1907. unsigned hb = ticksNow - lastHeartBeat;
  1908. if (hb > 30000)
  1909. {
  1910. lastHeartBeat = msTick();
  1911. client->sendHeartBeat(*this);
  1912. }
  1913. }
  1914. }
  1915. }
  1916. virtual unsigned getXmlFlags() const
  1917. {
  1918. return trim ? XWFtrim|XWFopt : XWFexpandempty;
  1919. }
  1920. virtual unsigned getMemoryUsage()
  1921. {
  1922. return rowManager->getMemoryUsage();
  1923. }
  1924. virtual unsigned getSlavesReplyLen()
  1925. {
  1926. return totSlavesReplyLen;
  1927. }
  1928. virtual void process()
  1929. {
  1930. EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
  1931. Owned<IEclProcess> p = pf();
  1932. try
  1933. {
  1934. if (debugContext)
  1935. debugContext->checkBreakpoint(DebugStateReady, NULL, NULL);
  1936. if (workflow)
  1937. workflow->perform(this, p);
  1938. else
  1939. p->perform(this, 0);
  1940. }
  1941. catch(WorkflowException *E)
  1942. {
  1943. if (debugContext)
  1944. debugContext->checkBreakpoint(DebugStateException, NULL, static_cast<IException *>(E));
  1945. addWuException(E);
  1946. doPostProcess();
  1947. throw;
  1948. }
  1949. catch(IException *E)
  1950. {
  1951. if (debugContext)
  1952. debugContext->checkBreakpoint(DebugStateException, NULL, E);
  1953. addWuException(E);
  1954. doPostProcess();
  1955. throw;
  1956. }
  1957. catch(...)
  1958. {
  1959. if (debugContext)
  1960. debugContext->checkBreakpoint(DebugStateFailed, NULL, NULL);
  1961. if (workUnit)
  1962. {
  1963. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception caught in CRoxieServerContext::process");
  1964. addWuException(E);
  1965. }
  1966. doPostProcess();
  1967. throw;
  1968. }
  1969. if (debugContext)
  1970. debugContext->checkBreakpoint(DebugStateFinished, NULL, NULL);
  1971. doPostProcess();
  1972. }
  1973. virtual void done(bool failed)
  1974. {
  1975. if (debugContext)
  1976. debugContext->debugTerminate();
  1977. setWUState(aborted ? WUStateAborted : (failed ? WUStateFailed : WUStateCompleted));
  1978. }
  1979. virtual ICodeContext *queryCodeContext()
  1980. {
  1981. return this;
  1982. }
  1983. virtual IRoxieServerContext *queryServerContext()
  1984. {
  1985. return this;
  1986. }
  1987. virtual IGlobalCodeContext *queryGlobalCodeContext()
  1988. {
  1989. return this;
  1990. }
  1991. // interface ICodeContext
  1992. virtual FlushingStringBuffer *queryResult(unsigned sequence)
  1993. {
  1994. if (!client && workUnit)
  1995. return NULL; // when outputting to workunit only, don't output anything to stdout
  1996. CriticalBlock procedure(resultsCrit);
  1997. while (!resultMap.isItem(sequence))
  1998. resultMap.append(NULL);
  1999. FlushingStringBuffer *result = resultMap.item(sequence);
  2000. if (!result)
  2001. {
  2002. result = new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHttp, *this);
  2003. result->isSoap = isHttp;
  2004. result->trim = trim;
  2005. result->queryName.set(context->queryName());
  2006. resultMap.replace(result, sequence);
  2007. }
  2008. return result;
  2009. }
  2010. virtual char *getDaliServers()
  2011. {
  2012. //MORE: Should this now be implemented using IRoxieDaliHelper?
  2013. throwUnexpected();
  2014. }
  2015. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  2016. {
  2017. if (isSpecialResultSequence(sequence))
  2018. {
  2019. CriticalBlock b(contextCrit);
  2020. useContext(sequence).setPropBool(name, value);
  2021. }
  2022. else
  2023. {
  2024. FlushingStringBuffer *r = queryResult(sequence);
  2025. if (r)
  2026. {
  2027. r->startScalar(name, sequence);
  2028. if (isRaw)
  2029. r->append(sizeof(value), (char *)&value);
  2030. else
  2031. r->append(value ? "true" : "false");
  2032. }
  2033. }
  2034. if (workUnit)
  2035. {
  2036. try
  2037. {
  2038. WorkunitUpdate wu(&workUnit->lock());
  2039. CriticalBlock b(daliUpdateCrit); // MORE - do we really need these locks?
  2040. wu->setResultBool(name, sequence, value);
  2041. }
  2042. catch(IException *e)
  2043. {
  2044. StringBuffer text;
  2045. e->errorMessage(text);
  2046. CTXLOG("Error trying to update dali: %s", text.str());
  2047. e->Release();
  2048. }
  2049. catch(...)
  2050. {
  2051. CTXLOG("Unknown exception trying to update dali");
  2052. }
  2053. }
  2054. }
  2055. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  2056. {
  2057. static char hexchar[] = "0123456789ABCDEF";
  2058. if (isSpecialResultSequence(sequence))
  2059. {
  2060. StringBuffer s;
  2061. const byte *field = (const byte *) data;
  2062. for (int i = 0; i < len; i++)
  2063. s.append(hexchar[field[i] >> 4]).append(hexchar[field[i] & 0x0f]);
  2064. CriticalBlock b(contextCrit);
  2065. IPropertyTree &ctx = useContext(sequence);
  2066. ctx.setProp(name, s.str());
  2067. }
  2068. else
  2069. {
  2070. FlushingStringBuffer *r = queryResult(sequence);
  2071. if (r)
  2072. {
  2073. r->startScalar(name, sequence);
  2074. if (isRaw)
  2075. r->append(len, (const char *) data);
  2076. else
  2077. {
  2078. const byte *field = (const byte *) data;
  2079. for (int i = 0; i < len; i++)
  2080. {
  2081. r->append(hexchar[field[i] >> 4]);
  2082. r->append(hexchar[field[i] & 0x0f]);
  2083. }
  2084. }
  2085. }
  2086. }
  2087. if (workUnit)
  2088. {
  2089. try
  2090. {
  2091. WorkunitUpdate wu(&workUnit->lock());
  2092. CriticalBlock b(daliUpdateCrit);
  2093. wu->setResultData(name, sequence, len, data);
  2094. }
  2095. catch(IException *e)
  2096. {
  2097. StringBuffer text;
  2098. e->errorMessage(text);
  2099. CTXLOG("Error trying to update dali: %s", text.str());
  2100. e->Release();
  2101. }
  2102. catch(...)
  2103. {
  2104. CTXLOG("Unknown exception trying to update dali");
  2105. }
  2106. }
  2107. }
  2108. virtual void appendResultDeserialized(const char *name, unsigned sequence, size32_t count, rowset_t data, bool extend, IOutputMetaData *meta)
  2109. {
  2110. CriticalBlock b(contextCrit);
  2111. IPropertyTree &ctx = useContext(sequence);
  2112. IDeserializedResultStore &resultStore = useResultStore(sequence);
  2113. IPropertyTree *val = ctx.queryPropTree(name);
  2114. if (extend && val)
  2115. {
  2116. int oldId = val->getPropInt("@id", -1);
  2117. const char * oldFormat = val->queryProp("@format");
  2118. assertex(oldId != -1);
  2119. assertex(oldFormat && strcmp(oldFormat, "deserialized")==0);
  2120. size32_t oldCount;
  2121. rowset_t oldData;
  2122. resultStore.queryResult(oldId, oldCount, oldData);
  2123. Owned<IEngineRowAllocator> allocator = createRoxieRowAllocator(*rowManager, meta, 0, 0, roxiemem::RHFnone);
  2124. RtlLinkedDatasetBuilder builder(allocator);
  2125. builder.appendRows(oldCount, oldData);
  2126. builder.appendRows(count, data);
  2127. rtlReleaseRowset(count, data);
  2128. val->setPropInt("@id", resultStore.addResult(builder.getcount(), builder.linkrows(), meta));
  2129. }
  2130. else
  2131. {
  2132. if (!val)
  2133. val = ctx.addPropTree(name, createPTree());
  2134. val->setProp("@format", "deserialized");
  2135. val->setPropInt("@id", resultStore.addResult(count, data, meta));
  2136. }
  2137. }
  2138. virtual void appendResultRawContext(const char *name, unsigned sequence, int len, const void * data, int numRows, bool extend, bool saveInContext)
  2139. {
  2140. if (saveInContext)
  2141. {
  2142. CriticalBlock b(contextCrit);
  2143. IPropertyTree &ctx = useContext(sequence);
  2144. ctx.appendPropBin(name, len, data);
  2145. ctx.queryPropTree(name)->setProp("@format", "raw");
  2146. }
  2147. if (workUnit)
  2148. {
  2149. try
  2150. {
  2151. WorkunitUpdate wu(&workUnit->lock());
  2152. CriticalBlock b(daliUpdateCrit);
  2153. wu->setResultDataset(name, sequence, len, data, numRows, extend);
  2154. }
  2155. catch(IException *e)
  2156. {
  2157. StringBuffer text;
  2158. e->errorMessage(text);
  2159. CTXLOG("Error trying to update dali: %s", text.str());
  2160. e->Release();
  2161. }
  2162. catch(...)
  2163. {
  2164. CTXLOG("Unknown exception trying to update dali");
  2165. }
  2166. }
  2167. }
  2168. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  2169. {
  2170. if (isSpecialResultSequence(sequence))
  2171. {
  2172. CriticalBlock b(contextCrit);
  2173. IPropertyTree &ctx = useContext(sequence);
  2174. ctx.setPropBin(name, len, data);
  2175. ctx.queryPropTree(name)->setProp("@format", "raw");
  2176. }
  2177. else
  2178. {
  2179. FlushingStringBuffer *r = queryResult(sequence);
  2180. if (r)
  2181. {
  2182. r->startScalar(name, sequence);
  2183. if (isRaw)
  2184. r->append(len, (const char *) data);
  2185. else
  2186. UNIMPLEMENTED;
  2187. }
  2188. }
  2189. if (workUnit)
  2190. {
  2191. try
  2192. {
  2193. WorkunitUpdate wu(&workUnit->lock());
  2194. CriticalBlock b(daliUpdateCrit);
  2195. wu->setResultRaw(name, sequence, len, data);
  2196. }
  2197. catch(IException *e)
  2198. {
  2199. StringBuffer text;
  2200. e->errorMessage(text);
  2201. CTXLOG("Error trying to update dali: %s", text.str());
  2202. e->Release();
  2203. }
  2204. catch(...)
  2205. {
  2206. CTXLOG("Unknown exception trying to update dali");
  2207. }
  2208. }
  2209. }
  2210. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  2211. {
  2212. if (isSpecialResultSequence(sequence))
  2213. {
  2214. CriticalBlock b(contextCrit);
  2215. IPropertyTree &ctx = useContext(sequence);
  2216. ctx.setPropBin(name, len, data);
  2217. ctx.queryPropTree(name)->setProp("@format", "raw");
  2218. ctx.queryPropTree(name)->setPropBool("@isAll", isAll);
  2219. }
  2220. else
  2221. {
  2222. FlushingStringBuffer *r = queryResult(sequence);
  2223. if (r)
  2224. {
  2225. r->startScalar(name, sequence);
  2226. if (isRaw)
  2227. r->append(len, (char *)data);
  2228. else if (mlFmt==MarkupFmt_XML)
  2229. {
  2230. assertex(transformer);
  2231. CommonXmlWriter writer(getXmlFlags()|XWFnoindent, 0);
  2232. transformer->toXML(isAll, len, (byte *)data, writer);
  2233. r->append(writer.str());
  2234. }
  2235. else if (mlFmt==MarkupFmt_JSON)
  2236. {
  2237. assertex(transformer);
  2238. CommonJsonWriter writer(getXmlFlags()|XWFnoindent, 0);
  2239. transformer->toXML(isAll, len, (byte *)data, writer);
  2240. r->append(writer.str());
  2241. }
  2242. else
  2243. {
  2244. assertex(transformer);
  2245. r->append('[');
  2246. if (isAll)
  2247. r->appendf("*]");
  2248. else
  2249. {
  2250. SimpleOutputWriter x;
  2251. transformer->toXML(isAll, len, (const byte *) data, x);
  2252. r->appendf("%s]", x.str());
  2253. }
  2254. }
  2255. }
  2256. }
  2257. if (workUnit)
  2258. {
  2259. try
  2260. {
  2261. WorkunitUpdate wu(&workUnit->lock());
  2262. CriticalBlock b(daliUpdateCrit);
  2263. wu->setResultSet(name, sequence, isAll, len, data, transformer);
  2264. }
  2265. catch(IException *e)
  2266. {
  2267. StringBuffer text;
  2268. e->errorMessage(text);
  2269. CTXLOG("Error trying to update dali: %s", text.str());
  2270. e->Release();
  2271. }
  2272. catch(...)
  2273. {
  2274. CTXLOG("Unknown exception trying to update dali");
  2275. }
  2276. }
  2277. }
  2278. virtual void setResultXml(const char *name, unsigned sequence, const char *xml)
  2279. {
  2280. CriticalBlock b(contextCrit);
  2281. useContext(sequence).setPropTree(name, createPTreeFromXMLString(xml, ipt_caseInsensitive));
  2282. }
  2283. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  2284. {
  2285. if (isSpecialResultSequence(sequence))
  2286. {
  2287. MemoryBuffer m;
  2288. serializeFixedData(len, val, m);
  2289. CriticalBlock b(contextCrit);
  2290. useContext(sequence).setPropBin(name, m.length(), m.toByteArray());
  2291. }
  2292. else
  2293. {
  2294. FlushingStringBuffer *r = queryResult(sequence);
  2295. if (r)
  2296. {
  2297. r->startScalar(name, sequence);
  2298. if (isRaw)
  2299. r->append(len, (char *)val);
  2300. else
  2301. {
  2302. StringBuffer s;
  2303. if (isSigned)
  2304. outputXmlDecimal(val, len, precision, NULL, s);
  2305. else
  2306. outputXmlUDecimal(val, len, precision, NULL, s);
  2307. r->append(s);
  2308. }
  2309. }
  2310. }
  2311. if (workUnit)
  2312. {
  2313. try
  2314. {
  2315. WorkunitUpdate wu(&workUnit->lock());
  2316. CriticalBlock b(daliUpdateCrit);
  2317. wu->setResultDecimal(name, sequence, len, precision, isSigned, val);
  2318. }
  2319. catch(IException *e)
  2320. {
  2321. StringBuffer text;
  2322. e->errorMessage(text);
  2323. CTXLOG("Error trying to update dali: %s", text.str());
  2324. e->Release();
  2325. }
  2326. catch(...)
  2327. {
  2328. CTXLOG("Unknown exception trying to update dali");
  2329. }
  2330. }
  2331. }
  2332. virtual void setResultInt(const char *name, unsigned sequence, __int64 value)
  2333. {
  2334. if (isSpecialResultSequence(sequence))
  2335. {
  2336. CriticalBlock b(contextCrit);
  2337. useContext(sequence).setPropInt64(name, value);
  2338. }
  2339. else
  2340. {
  2341. FlushingStringBuffer *r = queryResult(sequence);
  2342. if (r)
  2343. {
  2344. r->startScalar(name, sequence);
  2345. if (isRaw)
  2346. r->append(sizeof(value), (char *)&value);
  2347. else
  2348. r->appendf("%"I64F"d", value);
  2349. }
  2350. }
  2351. if (workUnit)
  2352. {
  2353. try
  2354. {
  2355. WorkunitUpdate wu(&workUnit->lock());
  2356. CriticalBlock b(daliUpdateCrit);
  2357. wu->setResultInt(name, sequence, value);
  2358. }
  2359. catch(IException *e)
  2360. {
  2361. StringBuffer text;
  2362. e->errorMessage(text);
  2363. CTXLOG("Error trying to update dali: %s", text.str());
  2364. e->Release();
  2365. }
  2366. catch(...)
  2367. {
  2368. CTXLOG("Unknown exception trying to update dali");
  2369. }
  2370. }
  2371. }
  2372. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value)
  2373. {
  2374. if (isSpecialResultSequence(sequence))
  2375. {
  2376. CriticalBlock b(contextCrit);
  2377. useContext(sequence).setPropInt64(name, value);
  2378. }
  2379. else
  2380. {
  2381. FlushingStringBuffer *r = queryResult(sequence);
  2382. if (r)
  2383. {
  2384. r->startScalar(name, sequence);
  2385. if (isRaw)
  2386. r->append(sizeof(value), (char *)&value);
  2387. else
  2388. r->appendf("%"I64F"u", value);
  2389. }
  2390. }
  2391. if (workUnit)
  2392. {
  2393. try
  2394. {
  2395. WorkunitUpdate wu(&workUnit->lock());
  2396. CriticalBlock b(daliUpdateCrit);
  2397. wu->setResultUInt(name, sequence, value);
  2398. }
  2399. catch(IException *e)
  2400. {
  2401. StringBuffer text;
  2402. e->errorMessage(text);
  2403. CTXLOG("Error trying to update dali: %s", text.str());
  2404. e->Release();
  2405. }
  2406. catch(...)
  2407. {
  2408. CTXLOG("Unknown exception trying to update dali");
  2409. }
  2410. }
  2411. }
  2412. virtual void setResultReal(const char *name, unsigned sequence, double value)
  2413. {
  2414. if (isSpecialResultSequence(sequence))
  2415. {
  2416. CriticalBlock b(contextCrit);
  2417. useContext(sequence).setPropBin(name, sizeof(value), &value);
  2418. }
  2419. else
  2420. {
  2421. StringBuffer v;
  2422. v.append(value);
  2423. FlushingStringBuffer *r = queryResult(sequence);
  2424. if (r)
  2425. {
  2426. r->startScalar(name, sequence);
  2427. if (r->isRaw)
  2428. r->append(sizeof(value), (char *)&value);
  2429. else
  2430. r->appendf("%s", v.str());
  2431. }
  2432. }
  2433. if (workUnit)
  2434. {
  2435. try
  2436. {
  2437. WorkunitUpdate wu(&workUnit->lock());
  2438. CriticalBlock b(daliUpdateCrit);
  2439. wu->setResultReal(name, sequence, value);
  2440. }
  2441. catch(IException *e)
  2442. {
  2443. StringBuffer text;
  2444. e->errorMessage(text);
  2445. CTXLOG("Error trying to update dali: %s", text.str());
  2446. e->Release();
  2447. }
  2448. catch(...)
  2449. {
  2450. CTXLOG("Unknown exception trying to update dali");
  2451. }
  2452. }
  2453. }
  2454. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  2455. {
  2456. if (isSpecialResultSequence(sequence))
  2457. {
  2458. CriticalBlock b(contextCrit);
  2459. useContext(sequence).setPropBin(name, len, str);
  2460. }
  2461. else
  2462. {
  2463. FlushingStringBuffer *r = queryResult(sequence);
  2464. if (r)
  2465. {
  2466. r->startScalar(name, sequence);
  2467. if (r->isRaw)
  2468. {
  2469. r->append(len, str);
  2470. }
  2471. else
  2472. {
  2473. r->encodeXML(str, 0, len);
  2474. }
  2475. }
  2476. }
  2477. if (workUnit)
  2478. {
  2479. try
  2480. {
  2481. WorkunitUpdate wu(&workUnit->lock());
  2482. CriticalBlock b(daliUpdateCrit);
  2483. wu->setResultString(name, sequence, len, str);
  2484. }
  2485. catch(IException *e)
  2486. {
  2487. StringBuffer text;
  2488. e->errorMessage(text);
  2489. CTXLOG("Error trying to update dali: %s", text.str());
  2490. e->Release();
  2491. }
  2492. catch(...)
  2493. {
  2494. CTXLOG("Unknown exception trying to update dali");
  2495. }
  2496. }
  2497. }
  2498. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  2499. {
  2500. if (isSpecialResultSequence(sequence))
  2501. {
  2502. rtlDataAttr buff;
  2503. unsigned bufflen = 0;
  2504. rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
  2505. CriticalBlock b(contextCrit);
  2506. useContext(sequence).setPropBin(name, bufflen, buff.getstr());
  2507. }
  2508. else
  2509. {
  2510. FlushingStringBuffer *r = queryResult(sequence);
  2511. if (r)
  2512. {
  2513. r->startScalar(name, sequence);
  2514. if (r->isRaw)
  2515. {
  2516. r->append(len*2, (const char *) str);
  2517. }
  2518. else
  2519. {
  2520. rtlDataAttr buff;
  2521. unsigned bufflen = 0;
  2522. rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
  2523. r->encodeXML(buff.getstr(), 0, bufflen, true); // output as UTF-8
  2524. }
  2525. }
  2526. }
  2527. if (workUnit)
  2528. {
  2529. try
  2530. {
  2531. WorkunitUpdate wu(&workUnit->lock());
  2532. CriticalBlock b(daliUpdateCrit);
  2533. wu->setResultUnicode(name, sequence, len, str);
  2534. }
  2535. catch(IException *e)
  2536. {
  2537. StringBuffer text;
  2538. e->errorMessage(text);
  2539. CTXLOG("Error trying to update dali: %s", text.str());
  2540. e->Release();
  2541. }
  2542. catch(...)
  2543. {
  2544. CTXLOG("Unknown exception trying to update dali");
  2545. }
  2546. }
  2547. }
  2548. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  2549. {
  2550. setResultString(name, sequence, strlen(value), value);
  2551. }
  2552. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  2553. {
  2554. setResultUnicode(name, sequence, rtlUnicodeStrlen(value), value);
  2555. }
  2556. virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *data, unsigned numRows, bool extend)
  2557. {
  2558. appendResultRawContext(name, sequence, len, data, numRows, extend, true);
  2559. }
  2560. virtual IWorkUnitRowReader *createStreamedRawRowReader(IEngineRowAllocator *rowAllocator, bool isGrouped, const char *id)
  2561. {
  2562. return new StreamedRawDataReader(this, rowAllocator, isGrouped, logctx, *client, id);
  2563. }
  2564. virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence)
  2565. {
  2566. CriticalBlock b(contextCrit);
  2567. IPropertyTree &tree = useContext(sequence);
  2568. if (name)
  2569. {
  2570. const char *val = tree.queryProp(name);
  2571. if (val)
  2572. output->outputCString(val, name);
  2573. }
  2574. else
  2575. {
  2576. StringBuffer hack;
  2577. toXML(&tree, hack);
  2578. output->outputString(0, NULL, NULL); // Hack upon hack...
  2579. output->outputQuoted(hack.str());
  2580. }
  2581. }
  2582. virtual const IResolvedFile *resolveLFN(const char *fileName, bool isOpt)
  2583. {
  2584. CriticalBlock b(contextCrit);
  2585. StringBuffer expandedName;
  2586. expandLogicalFilename(expandedName, fileName, workUnit, false);
  2587. Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
  2588. if (!ret)
  2589. {
  2590. ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit));
  2591. if (ret)
  2592. {
  2593. IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());
  2594. fileCache.setValue(expandedName, add);
  2595. }
  2596. }
  2597. return ret.getClear();
  2598. }
  2599. virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
  2600. {
  2601. return factory->queryPackage().createFileName(filename, overwrite, extend, clusters, workUnit);
  2602. }
  2603. virtual void endGraph(bool aborting)
  2604. {
  2605. fileCache.kill();
  2606. CSlaveContext::endGraph(aborting);
  2607. }
  2608. virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
  2609. {
  2610. Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
  2611. if (dFile)
  2612. {
  2613. MemoryBuffer mb;
  2614. mb.append(sizeof(RoxiePacketHeader), &header);
  2615. mb.append(lfn);
  2616. dFile->serializePartial(mb, header.channel, isLocal);
  2617. ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
  2618. Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
  2619. reply->queryHeader().retries = 0;
  2620. ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
  2621. return;
  2622. }
  2623. ROQ->sendAbortCallback(header, lfn, *this);
  2624. throwUnexpected();
  2625. }
  2626. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  2627. {
  2628. UNIMPLEMENTED;
  2629. }
  2630. virtual void addWuException(const char * text, unsigned code, unsigned _severity, const char * source)
  2631. {
  2632. WUExceptionSeverity severity = (WUExceptionSeverity) _severity;
  2633. CTXLOG("%s", text);
  2634. if (severity > ExceptionSeverityInformation)
  2635. OERRLOG("%d - %s", code, text);
  2636. if (workUnit)
  2637. {
  2638. WorkunitUpdate wu(&workUnit->lock());
  2639. addExceptionToWorkunit(wu, severity, source, code, text, NULL, 0 ,0);
  2640. }
  2641. }
  2642. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  2643. {
  2644. CTXLOG("%s", text);
  2645. OERRLOG("%d - %s", code, text);
  2646. if (workUnit)
  2647. {
  2648. WorkunitUpdate wu(&workUnit->lock());
  2649. addExceptionToWorkunit(wu, ExceptionSeverityError, "user", code, text, filename, lineno, column);
  2650. }
  2651. if (isAbort)
  2652. rtlFailOnAssert(); // minimal implementation
  2653. }
  2654. IUserDescriptor *queryUserDescriptor()
  2655. {
  2656. return NULL; // TBD - Richard, where do user credentials for a roxie query come from
  2657. }
  2658. virtual bool isResult(const char * name, unsigned sequence)
  2659. {
  2660. CriticalBlock b(contextCrit);
  2661. return useContext(sequence).hasProp(name);
  2662. }
  2663. virtual char *getClusterName() { throwUnexpected(); }
  2664. virtual char *getGroupName() { throwUnexpected(); }
  2665. virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
  2666. virtual char *getEnv(const char *name, const char *defaultValue) const
  2667. {
  2668. return serverQueryFactory->getEnv(name, defaultValue);
  2669. }
  2670. virtual char *getJobName()
  2671. {
  2672. return strdup(factory->queryQueryName());
  2673. }
  2674. virtual char *getJobOwner() { throwUnexpected(); }
  2675. virtual char *getPlatform()
  2676. {
  2677. return strdup("roxie");
  2678. }
  2679. virtual char *getWuid()
  2680. {
  2681. if (workUnit)
  2682. {
  2683. SCMStringBuffer wuid;
  2684. workUnit->getWuid(wuid);
  2685. return strdup(wuid.str());
  2686. }
  2687. else
  2688. {
  2689. throw MakeStringException(ROXIE_INVALID_ACTION, "No workunit associated with this context");
  2690. }
  2691. }
  2692. // persist-related code - usage of persist should have been caught and rejected at codegen time
  2693. virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
  2694. virtual IRemoteConnection *startPersist(const char * name) { throwUnexpected(); }
  2695. virtual void finishPersist(IRemoteConnection *) { throwUnexpected(); }
  2696. virtual void clearPersist(const char * logicalName) { throwUnexpected(); }
  2697. virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) { throwUnexpected(); }
  2698. virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC) { throwUnexpected(); }
  2699. virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }
  2700. virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile) { throwUnexpected(); }
  2701. virtual void fail(int code, const char *text)
  2702. {
  2703. addWuException(text, code, 2, "user");
  2704. }
  2705. virtual unsigned getWorkflowId() { return workflow->queryCurrentWfid(); }
  2706. virtual void doNotify(char const * code, char const * extra) { UNIMPLEMENTED; }
  2707. virtual void doNotify(char const * code, char const * extra, const char * target) { UNIMPLEMENTED; }
  2708. virtual void doWait(unsigned code, char const * extra) { UNIMPLEMENTED; }
  2709. virtual void doWaitCond(unsigned code, char const * extra, int sequence, char const * alias, unsigned wfid) { UNIMPLEMENTED; }
  2710. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); }
  2711. virtual int queryLastFailCode() { UNIMPLEMENTED; }
  2712. virtual void getLastFailMessage(size32_t & outLen, char * &outStr, const char * tag) { UNIMPLEMENTED; }
  2713. virtual void getEventName(size32_t & outLen, char * & outStr) { UNIMPLEMENTED; }
  2714. virtual void getEventExtra(size32_t & outLen, char * & outStr, const char * tag) { UNIMPLEMENTED; }
  2715. virtual bool fileExists(const char * filename) { throwUnexpected(); }
  2716. virtual void deleteFile(const char * logicalName) { throwUnexpected(); }
  2717. virtual unsigned getNodes() { return numChannels; }
  2718. virtual unsigned getNodeNum() { return 1; }
  2719. virtual char *getFilePart(const char *logicalPart, bool create=false) { UNIMPLEMENTED; }
  2720. virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
  2721. virtual IDistributedFileTransaction *querySuperFileTransaction() { UNIMPLEMENTED; }
  2722. virtual void flush(unsigned seqNo) { throwUnexpected(); }
  2723. virtual unsigned getPriority() const { return priority; }
  2724. virtual bool outputResultsToWorkUnit() const { return workUnit != NULL; }
  2725. virtual bool outputResultsToSocket() const { return client != NULL; }
  2726. virtual void selectCluster(const char * cluster) { throwUnexpected(); }
  2727. virtual void restoreCluster() { throwUnexpected(); }
  2728. };
  2729. //================================================================================================
  2730. class CSoapRoxieServerContext : public CRoxieServerContext
  2731. {
  2732. private:
  2733. StringAttr queryName;
  2734. public:
  2735. CSoapRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, HttpHelper &httpHelper, unsigned _priority, const IRoxieContextLogger &_logctx, PTreeReaderOptions xmlReadFlags)
  2736. : CRoxieServerContext(_context, _factory, _client, MarkupFmt_XML, false, false, httpHelper, true, _priority, _logctx, xmlReadFlags)
  2737. {
  2738. queryName.set(_context->queryName());
  2739. }
  2740. virtual void process()
  2741. {
  2742. EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
  2743. Owned<IEclProcess> p = pf();
  2744. if (workflow)
  2745. workflow->perform(this, p);
  2746. else
  2747. p->perform(this, 0);
  2748. }
  2749. virtual void flush(unsigned seqNo)
  2750. {
  2751. CriticalBlock b(resultsCrit);
  2752. CriticalBlock b1(client->queryCrit());
  2753. StringBuffer responseHead, responseTail;
  2754. responseHead.append("<").append(queryName).append("Response");
  2755. responseHead.append(" sequence=\"").append(seqNo).append("\"");
  2756. responseHead.append(" xmlns=\"urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.sget()).append("\">");
  2757. responseHead.append("<Results><Result>");
  2758. unsigned len = responseHead.length();
  2759. client->write(responseHead.detach(), len, true);
  2760. ForEachItemIn(seq, resultMap)
  2761. {
  2762. FlushingStringBuffer *result = resultMap.item(seq);
  2763. if (result)
  2764. {
  2765. result->flush(true);
  2766. for(;;)
  2767. {
  2768. size32_t length;
  2769. void *payload = result->getPayload(length);
  2770. if (!length)
  2771. break;
  2772. client->write(payload, length, true);
  2773. }
  2774. }
  2775. }
  2776. responseTail.append("</Result></Results>");
  2777. responseTail.append("</").append(queryName).append("Response>");
  2778. len = responseTail.length();
  2779. client->write(responseTail.detach(), len, true);
  2780. }
  2781. };
  2782. class CJsonRoxieServerContext : public CRoxieServerContext
  2783. {
  2784. private:
  2785. StringAttr queryName;
  2786. public:
  2787. CJsonRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, HttpHelper &httpHelper, unsigned _priority, const IRoxieContextLogger &_logctx, PTreeReaderOptions xmlReadFlags)
  2788. : CRoxieServerContext(_context, _factory, _client, MarkupFmt_JSON, false, false, httpHelper, true, _priority, _logctx, xmlReadFlags)
  2789. {
  2790. queryName.set(_context->queryName());
  2791. }
  2792. virtual void process()
  2793. {
  2794. EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
  2795. Owned<IEclProcess> p = pf();
  2796. if (workflow)
  2797. workflow->perform(this, p);
  2798. else
  2799. p->perform(this, 0);
  2800. }
  2801. virtual void flush(unsigned seqNo)
  2802. {
  2803. CriticalBlock b(resultsCrit);
  2804. CriticalBlock b1(client->queryCrit());
  2805. StringBuffer responseHead, responseTail;
  2806. appendfJSONName(responseHead, "%sResponse", queryName.get()).append(" {");
  2807. appendJSONValue(responseHead, "sequence", seqNo);
  2808. appendJSONName(responseHead, "Results").append(" {");
  2809. unsigned len = responseHead.length();
  2810. client->write(responseHead.detach(), len, true);
  2811. bool needDelimiter = false;
  2812. ForEachItemIn(seq, resultMap)
  2813. {
  2814. FlushingStringBuffer *result = resultMap.item(seq);
  2815. if (result)
  2816. {
  2817. result->flush(true);
  2818. for(;;)
  2819. {
  2820. size32_t length;
  2821. void *payload = result->getPayload(length);
  2822. if (!length)
  2823. break;
  2824. if (needDelimiter)
  2825. {
  2826. StringAttr s(","); //write() will take ownership of buffer
  2827. size32_t len = s.length();
  2828. client->write((void *)s.detach(), len, true);
  2829. needDelimiter=false;
  2830. }
  2831. client->write(payload, length, true);
  2832. }
  2833. needDelimiter=true;
  2834. }
  2835. }
  2836. responseTail.append("}}");
  2837. len = responseTail.length();
  2838. client->write(responseTail.detach(), len, true);
  2839. }
  2840. virtual FlushingStringBuffer *queryResult(unsigned sequence)
  2841. {
  2842. if (!client && workUnit)
  2843. return NULL; // when outputting to workunit only, don't output anything to stdout
  2844. CriticalBlock procedure(resultsCrit);
  2845. while (!resultMap.isItem(sequence))
  2846. resultMap.append(NULL);
  2847. FlushingStringBuffer *result = resultMap.item(sequence);
  2848. if (!result)
  2849. {
  2850. result = new FlushingJsonBuffer(client, isBlocked, isHttp, *this);
  2851. result->trim = trim;
  2852. result->queryName.set(context->queryName());
  2853. resultMap.replace(result, sequence);
  2854. }
  2855. return result;
  2856. }
  2857. };
  2858. IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, unsigned priority, const IRoxieContextLogger &_logctx, PTreeReaderOptions readFlags)
  2859. {
  2860. if (httpHelper.isHttp())
  2861. {
  2862. if (httpHelper.queryContentFormat()==MarkupFmt_JSON)
  2863. return new CJsonRoxieServerContext(context, factory, client, httpHelper, priority, _logctx, readFlags);
  2864. return new CSoapRoxieServerContext(context, factory, client, httpHelper, priority, _logctx, readFlags);
  2865. }
  2866. else
  2867. return new CRoxieServerContext(context, factory, client, isXml ? MarkupFmt_XML : MarkupFmt_Unknown, isRaw, isBlocked, httpHelper, trim, priority, _logctx, readFlags);
  2868. }
  2869. IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx)
  2870. {
  2871. return new CRoxieServerContext(factory, _logctx);
  2872. }
  2873. IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const IRoxieContextLogger &_logctx)
  2874. {
  2875. return new CRoxieServerContext(wu, factory, _logctx);
  2876. }