ccdprotocol.cpp 76 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jthread.hpp"
  16. #include "rtlcommon.hpp"
  17. #include "roxie.hpp"
  18. #include "roxiehelper.hpp"
  19. #include "ccdprotocol.hpp"
  20. #include "securesocket.hpp"
  21. //================================================================================================================================
  22. IHpccProtocolListener *createProtocolListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *certFile, const char *keyFile, const char *passPhrase);
  23. class CHpccProtocolPlugin : implements IHpccProtocolPlugin, public CInterface
  24. {
  25. public:
  26. IMPLEMENT_IINTERFACE;
  27. CHpccProtocolPlugin(IHpccProtocolPluginContext &ctx)
  28. {
  29. targetNames.appendListUniq(ctx.ctxQueryProp("@querySets"), ",");
  30. targetAliases.setown(createProperties());
  31. StringArray tempList;
  32. tempList.appendListUniq(ctx.ctxQueryProp("@targetAliases"), ",");
  33. ForEachItemIn(i, tempList)
  34. {
  35. const char *alias = tempList.item(i);
  36. const char *eq = strchr(alias, '=');
  37. if (eq)
  38. {
  39. StringAttr name(alias, eq-alias);
  40. if (!targetNames.contains(name))
  41. targetAliases->setProp(name.str(), ++eq);
  42. }
  43. }
  44. maxBlockSize = ctx.ctxGetPropInt("@maxBlockSize", 10000000);
  45. defaultXmlReadFlags = ctx.ctxGetPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
  46. trapTooManyActiveQueries = ctx.ctxGetPropBool("@trapTooManyActiveQueries", true);
  47. numRequestArrayThreads = ctx.ctxGetPropInt("@requestArrayThreads", 5);
  48. maxHttpConnectionRequests = ctx.ctxGetPropInt("@maxHttpConnectionRequests", 10);
  49. maxHttpKeepAliveWait = ctx.ctxGetPropInt("@maxHttpKeepAliveWait", 5000);
  50. }
  51. IHpccProtocolListener *createListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *config, const char *certFile=nullptr, const char *keyFile=nullptr, const char *passPhrase=nullptr)
  52. {
  53. return createProtocolListener(protocol, sink, port, listenQueue, certFile, keyFile, passPhrase);
  54. }
  55. public:
  56. StringArray targetNames;
  57. Owned<IProperties> targetAliases;
  58. PTreeReaderOptions defaultXmlReadFlags;
  59. unsigned maxBlockSize;
  60. unsigned numRequestArrayThreads;
  61. unsigned maxHttpConnectionRequests = 10;
  62. unsigned maxHttpKeepAliveWait = 5000;
  63. bool trapTooManyActiveQueries;
  64. };
  65. Owned<CHpccProtocolPlugin> global;
  66. class ProtocolListener : public Thread, implements IHpccProtocolListener, implements IThreadFactory
  67. {
  68. public:
  69. IMPLEMENT_IINTERFACE;
  70. ProtocolListener(IHpccProtocolMsgSink *_sink) : Thread("RoxieListener")
  71. {
  72. running = false;
  73. sink.setown(dynamic_cast<IHpccNativeProtocolMsgSink*>(_sink));
  74. }
  75. virtual IHpccProtocolMsgSink *queryMsgSink()
  76. {
  77. return sink;
  78. }
  79. static void updateAffinity()
  80. {
  81. #ifdef CPU_ZERO
  82. if (sched_getaffinity(0, sizeof(cpu_set_t), &cpuMask))
  83. {
  84. if (traceLevel)
  85. DBGLOG("Unable to get CPU affinity - thread affinity settings will be ignored");
  86. cpuCores = 0;
  87. lastCore = 0;
  88. CPU_ZERO(&cpuMask);
  89. }
  90. else
  91. {
  92. #if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 6)
  93. cpuCores = CPU_COUNT(&cpuMask);
  94. #else
  95. cpuCores = 0;
  96. unsigned setSize = CPU_SETSIZE;
  97. while (setSize--)
  98. {
  99. if (CPU_ISSET(setSize, &cpuMask))
  100. ++cpuCores;
  101. }
  102. #endif /* GLIBC */
  103. if (traceLevel)
  104. traceAffinity(&cpuMask);
  105. }
  106. #endif
  107. }
  108. virtual void start()
  109. {
  110. // Note we allow a few additional threads than requested - these are the threads that return "Too many active queries" responses
  111. pool.setown(createThreadPool("RoxieSocketWorkerPool", this, NULL, sink->getPoolSize()+5, INFINITE));
  112. assertex(!running);
  113. Thread::start();
  114. started.wait();
  115. }
  116. virtual bool stop(unsigned timeout)
  117. {
  118. if (running)
  119. {
  120. running = false;
  121. join();
  122. Release();
  123. }
  124. return pool->joinAll(false, timeout);
  125. }
  126. virtual bool suspend(bool suspendIt)
  127. {
  128. return sink->suspend(suspendIt);
  129. }
  130. void setThreadAffinity(int numCores)
  131. {
  132. #ifdef CPU_ZERO
  133. // Note - strictly speaking not threadsafe but any race conditions are (a) unlikely and (b) harmless
  134. if (cpuCores)
  135. {
  136. if (numCores > 0 && numCores < cpuCores)
  137. {
  138. cpu_set_t threadMask;
  139. CPU_ZERO(&threadMask);
  140. unsigned cores = 0;
  141. unsigned offset = lastCore;
  142. unsigned core;
  143. for (core = 0; core < CPU_SETSIZE; core++)
  144. {
  145. unsigned useCore = (core + offset) % CPU_SETSIZE;
  146. if (CPU_ISSET(useCore, &cpuMask))
  147. {
  148. CPU_SET(useCore, &threadMask);
  149. cores++;
  150. if (cores == numCores)
  151. {
  152. lastCore = useCore+1;
  153. break;
  154. }
  155. }
  156. }
  157. if (traceLevel > 3)
  158. traceAffinity(&threadMask);
  159. pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &threadMask);
  160. }
  161. else
  162. {
  163. if (traceLevel > 3)
  164. traceAffinity(&cpuMask);
  165. pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &cpuMask);
  166. }
  167. }
  168. #endif
  169. }
  170. protected:
  171. bool running;
  172. Semaphore started;
  173. Owned<IThreadPool> pool;
  174. Linked<IHpccNativeProtocolMsgSink> sink;
  175. #ifdef CPU_ZERO
  176. static cpu_set_t cpuMask;
  177. static unsigned cpuCores;
  178. static unsigned lastCore;
  179. private:
  180. static void traceAffinity(cpu_set_t *mask)
  181. {
  182. StringBuffer trace;
  183. for (unsigned core = 0; core < CPU_SETSIZE; core++)
  184. {
  185. if (CPU_ISSET(core, mask))
  186. trace.appendf(",%d", core);
  187. }
  188. if (trace.length())
  189. DBGLOG("Process affinity is set to use core(s) %s", trace.str()+1);
  190. }
  191. #endif
  192. };
  193. #ifdef CPU_ZERO
  194. cpu_set_t ProtocolListener::cpuMask;
  195. unsigned ProtocolListener::cpuCores;
  196. unsigned ProtocolListener::lastCore;
  197. #endif
  198. class ProtocolSocketListener : public ProtocolListener
  199. {
  200. unsigned port;
  201. unsigned listenQueue;
  202. Owned<ISocket> socket;
  203. SocketEndpoint ep;
  204. StringAttr protocol;
  205. StringAttr certFile;
  206. StringAttr keyFile;
  207. StringAttr passPhrase;
  208. Owned<ISecureSocketContext> secureContext;
  209. public:
  210. ProtocolSocketListener(IHpccProtocolMsgSink *_sink, unsigned _port, unsigned _listenQueue, const char *_protocol, const char *_certFile, const char *_keyFile, const char *_passPhrase)
  211. : ProtocolListener(_sink)
  212. {
  213. port = _port;
  214. listenQueue = _listenQueue;
  215. ep.set(port, queryHostIP());
  216. protocol.set(_protocol);
  217. certFile.set(_certFile);
  218. keyFile.set(_keyFile);
  219. passPhrase.set(_passPhrase);
  220. }
  221. IHpccProtocolMsgSink *queryMsgSink()
  222. {
  223. return sink.get();
  224. }
  225. virtual bool stop(unsigned timeout)
  226. {
  227. if (socket)
  228. socket->cancel_accept();
  229. return ProtocolListener::stop(timeout);
  230. }
  231. virtual void disconnectQueue()
  232. {
  233. // This is for dali queues only
  234. }
  235. virtual void stopListening()
  236. {
  237. // Not threadsafe, but we only call this when generating a core file... what's the worst that can happen?
  238. try
  239. {
  240. DBGLOG("Closing listening socket %d", port);
  241. socket.clear();
  242. DBGLOG("Closed listening socket %d", port);
  243. }
  244. catch(...)
  245. {
  246. }
  247. }
  248. virtual void runOnce(const char *query);
  249. virtual void cleanupSocket(ISocket *sock)
  250. {
  251. if (!sock)
  252. return;
  253. try
  254. {
  255. sock->shutdown();
  256. }
  257. catch (IException *e)
  258. {
  259. e->Release();
  260. }
  261. try
  262. {
  263. sock->close();
  264. }
  265. catch (IException *e)
  266. {
  267. e->Release();
  268. }
  269. }
  270. virtual int run()
  271. {
  272. DBGLOG("ProtocolSocketListener (%d threads) listening to socket on port %d", sink->getPoolSize(), port);
  273. socket.setown(ISocket::create(port, listenQueue));
  274. running = true;
  275. started.signal();
  276. while (running)
  277. {
  278. Owned<ISocket> client = socket->accept(true);
  279. Owned<ISecureSocket> ssock;
  280. if (client)
  281. {
  282. if (streq(protocol.str(), "ssl"))
  283. {
  284. #ifdef _USE_OPENSSL
  285. try
  286. {
  287. if (!secureContext)
  288. secureContext.setown(createSecureSocketContextEx(certFile.get(), keyFile.get(), passPhrase.get(), ServerSocket));
  289. ssock.setown(secureContext->createSecureSocket(client.getClear()));
  290. int loglevel = 0;
  291. if (traceLevel > 1)
  292. loglevel = traceLevel;
  293. int status = ssock->secure_accept(loglevel);
  294. if (status < 0)
  295. {
  296. // secure_accept may also DBGLOG() errors ...
  297. cleanupSocket(ssock);
  298. continue;
  299. }
  300. }
  301. catch (IException *E)
  302. {
  303. StringBuffer s;
  304. E->errorMessage(s);
  305. WARNLOG("%s", s.str());
  306. E->Release();
  307. cleanupSocket(ssock);
  308. ssock.clear();
  309. cleanupSocket(client);
  310. client.clear();
  311. continue;
  312. }
  313. catch (...)
  314. {
  315. WARNLOG("ProtocolSocketListener failure to establish secure connection");
  316. cleanupSocket(ssock);
  317. ssock.clear();
  318. cleanupSocket(client);
  319. client.clear();
  320. continue;
  321. }
  322. client.setown(ssock.getClear());
  323. #else
  324. WARNLOG("ProtocolSocketListener failure to establish secure connection: OpenSSL disabled in build");
  325. continue;
  326. #endif
  327. }
  328. client->set_linger(-1);
  329. pool->start(client.getClear());
  330. }
  331. }
  332. DBGLOG("ProtocolSocketListener closed query socket");
  333. return 0;
  334. }
  335. virtual IPooledThread *createNew();
  336. virtual const SocketEndpoint &queryEndpoint() const
  337. {
  338. return ep;
  339. }
  340. virtual unsigned queryPort() const
  341. {
  342. return port;
  343. }
  344. };
  345. class ProtocolQueryWorker : public CInterface, implements IPooledThread
  346. {
  347. public:
  348. IMPLEMENT_IINTERFACE;
  349. ProtocolQueryWorker(ProtocolListener *_listener) : listener(_listener)
  350. {
  351. qstart = msTick();
  352. time(&startTime);
  353. }
  354. // interface IPooledThread
  355. virtual void init(void *) override
  356. {
  357. qstart = msTick();
  358. time(&startTime);
  359. }
  360. virtual bool canReuse() const override
  361. {
  362. return true;
  363. }
  364. virtual bool stop() override
  365. {
  366. ERRLOG("RoxieQueryWorker stopped with queries active");
  367. return true;
  368. }
  369. protected:
  370. ProtocolListener *listener;
  371. unsigned qstart;
  372. time_t startTime;
  373. };
  374. enum class AdaptiveRoot {NamedArray, RootArray, ExtendArray, FirstRow};
  375. class AdaptiveRESTJsonWriter : public CommonJsonWriter
  376. {
  377. AdaptiveRoot model;
  378. unsigned depth = 0;
  379. public:
  380. AdaptiveRESTJsonWriter(AdaptiveRoot _model, unsigned _flags, unsigned _initialIndent, IXmlStreamFlusher *_flusher) :
  381. CommonJsonWriter(_flags, _initialIndent, _flusher), model(_model)
  382. {
  383. }
  384. virtual void outputBeginArray(const char *fieldname)
  385. {
  386. prepareBeginArray(fieldname);
  387. if (model == AdaptiveRoot::NamedArray || arrays.length()>1)
  388. appendJSONName(out, fieldname).append('[');
  389. else if (model == AdaptiveRoot::RootArray)
  390. out.append('[');
  391. }
  392. void outputEndArray(const char *fieldname)
  393. {
  394. arrays.pop();
  395. checkFormat(false, true, -1);
  396. if (arrays.length() || (model != AdaptiveRoot::FirstRow && model != AdaptiveRoot::ExtendArray))
  397. out.append(']');
  398. const char * sep = (fieldname) ? strchr(fieldname, '/') : NULL;
  399. while (sep)
  400. {
  401. out.append('}');
  402. sep = strchr(sep+1, '/');
  403. }
  404. }
  405. void outputBeginNested(const char *fieldname, bool nestChildren)
  406. {
  407. CommonJsonWriter::outputBeginNested(fieldname, nestChildren);
  408. if (model == AdaptiveRoot::FirstRow)
  409. depth++;
  410. }
  411. void outputEndNested(const char *fieldname)
  412. {
  413. CommonJsonWriter::outputEndNested(fieldname);
  414. if (model == AdaptiveRoot::FirstRow)
  415. {
  416. depth--;
  417. if (fieldname && streq(fieldname, "Row") && depth==0)
  418. {
  419. flush(true);
  420. flusher = nullptr;
  421. }
  422. }
  423. }
  424. };
  425. class AdaptiveRESTXmlWriter : public CommonXmlWriter
  426. {
  427. StringAttr tag;
  428. AdaptiveRoot model = AdaptiveRoot::NamedArray;
  429. unsigned depth = 0;
  430. public:
  431. AdaptiveRESTXmlWriter(AdaptiveRoot _model, const char *tagname, unsigned _flags, unsigned _initialIndent, IXmlStreamFlusher *_flusher) :
  432. CommonXmlWriter(_flags, _initialIndent, _flusher), tag(tagname), model(_model)
  433. {
  434. }
  435. void outputBeginNested(const char *fieldname, bool nestChildren)
  436. {
  437. if (model == AdaptiveRoot::FirstRow)
  438. {
  439. if (!depth && tag.length())
  440. fieldname = tag.str();
  441. depth++;
  442. }
  443. CommonXmlWriter::outputBeginNested(fieldname, nestChildren);
  444. }
  445. void outputEndNested(const char *fieldname)
  446. {
  447. if (model == AdaptiveRoot::FirstRow)
  448. {
  449. depth--;
  450. if (!depth)
  451. {
  452. CommonXmlWriter::outputEndNested(tag.length() ? tag.str() : fieldname);
  453. flush(true);
  454. flusher = nullptr;
  455. return;
  456. }
  457. }
  458. CommonXmlWriter::outputEndNested(fieldname);
  459. }
  460. };
  461. IXmlWriterExt * createAdaptiveRESTWriterExt(AdaptiveRoot model, const char *tagname, unsigned _flags, unsigned _initialIndent, IXmlStreamFlusher *_flusher, XMLWriterType xmlType)
  462. {
  463. if (xmlType==WTJSON)
  464. return new AdaptiveRESTJsonWriter(model, _flags, _initialIndent, _flusher);
  465. return new AdaptiveRESTXmlWriter(model, tagname, _flags, _initialIndent, _flusher);
  466. }
  467. //================================================================================================================
  468. class CHpccNativeResultsWriter : implements IHpccNativeProtocolResultsWriter, public CInterface
  469. {
  470. protected:
  471. SafeSocket *client;
  472. CriticalSection resultsCrit;
  473. IPointerArrayOf<FlushingStringBuffer> resultMap;
  474. StringAttr queryName;
  475. StringAttr tagName;
  476. StringAttr resultFilter;
  477. const IContextLogger &logctx;
  478. Owned<FlushingStringBuffer> probe;
  479. TextMarkupFormat mlFmt;
  480. PTreeReaderOptions xmlReadFlags;
  481. bool isBlocked;
  482. bool isRaw;
  483. bool isHTTP;
  484. bool trim; // MORE - this is never set!
  485. bool adaptiveRoot = false;
  486. bool onlyUseFirstRow = false;
  487. public:
  488. IMPLEMENT_IINTERFACE;
  489. CHpccNativeResultsWriter(const char *queryname, SafeSocket *_client, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  490. client(_client), queryName(queryname), logctx(_logctx), mlFmt(_mlFmt), xmlReadFlags(_xmlReadFlags), isBlocked(_isBlocked), isRaw(_isRaw), isHTTP(_isHTTP)
  491. {
  492. }
  493. ~CHpccNativeResultsWriter()
  494. {
  495. }
  496. inline void setAdaptiveRoot(){adaptiveRoot = true; client->setAdaptiveRoot(true);}
  497. inline void setTagName(const char *tag){tagName.set(tag);}
  498. inline void setOnlyUseFirstRow(){onlyUseFirstRow = true;}
  499. inline void setResultFilter(const char *_resultFilter){resultFilter.set(_resultFilter);}
  500. virtual FlushingStringBuffer *queryResult(unsigned sequence, bool extend=false)
  501. {
  502. CriticalBlock procedure(resultsCrit);
  503. while (!resultMap.isItem(sequence))
  504. resultMap.append(NULL);
  505. FlushingStringBuffer *result = resultMap.item(sequence);
  506. if (!result)
  507. {
  508. if (mlFmt==MarkupFmt_JSON)
  509. result = new FlushingJsonBuffer(client, isBlocked, isHTTP, logctx, extend);
  510. else
  511. result = new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHTTP, logctx);
  512. result->isSoap = isHTTP;
  513. result->trim = trim;
  514. result->queryName.set(queryName);
  515. resultMap.replace(result, sequence);
  516. }
  517. return result;
  518. }
  519. virtual FlushingStringBuffer *createFlushingBuffer()
  520. {
  521. return new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHTTP, logctx);
  522. }
  523. bool checkAdaptiveResult(const char *name)
  524. {
  525. if (!adaptiveRoot)
  526. return false;
  527. if (!resultFilter || !*resultFilter)
  528. return true;
  529. return (streq(resultFilter, name));
  530. }
  531. virtual IXmlWriter *addDataset(const char *name, unsigned sequence, const char *elementName, bool &appendRawData, unsigned writeFlags, bool _extend, const IProperties *xmlns)
  532. {
  533. FlushingStringBuffer *response = queryResult(sequence, _extend);
  534. if (response)
  535. {
  536. appendRawData = response->isRaw;
  537. bool adaptive = checkAdaptiveResult(name);
  538. if (adaptive)
  539. {
  540. elementName = nullptr;
  541. if (response->mlFmt!=MarkupFmt_JSON && !onlyUseFirstRow && tagName.length())
  542. elementName = tagName.str();
  543. }
  544. response->startDataset(elementName, name, sequence, _extend, xmlns, adaptive);
  545. if (response->mlFmt==MarkupFmt_XML || response->mlFmt==MarkupFmt_JSON)
  546. {
  547. if (response->mlFmt==MarkupFmt_JSON)
  548. writeFlags |= XWFnoindent;
  549. AdaptiveRoot rootType = AdaptiveRoot::ExtendArray;
  550. if (adaptive)
  551. {
  552. if (onlyUseFirstRow)
  553. rootType = AdaptiveRoot::FirstRow;
  554. else
  555. rootType = AdaptiveRoot::RootArray;
  556. }
  557. Owned<IXmlWriter> xmlwriter = createAdaptiveRESTWriterExt(rootType, tagName, writeFlags, 1, response, (response->mlFmt==MarkupFmt_JSON) ? WTJSON : WTStandard);
  558. xmlwriter->outputBeginArray("Row");
  559. return xmlwriter.getClear();
  560. }
  561. }
  562. return NULL;
  563. }
  564. virtual void finalizeXmlRow(unsigned sequence)
  565. {
  566. if (mlFmt==MarkupFmt_XML || mlFmt==MarkupFmt_JSON)
  567. {
  568. FlushingStringBuffer *r = queryResult(sequence);
  569. if (r)
  570. {
  571. r->incrementRowCount();
  572. r->flush(false);
  573. }
  574. }
  575. }
  576. inline void startScalar(FlushingStringBuffer *r, const char *name, unsigned sequence)
  577. {
  578. if (checkAdaptiveResult(name))
  579. {
  580. r->startScalar(name, sequence, true, tagName.length() ? tagName.str() : name);
  581. return;
  582. }
  583. r->startScalar(name, sequence);
  584. }
  585. virtual void appendRaw(unsigned sequence, unsigned len, const char *data)
  586. {
  587. FlushingStringBuffer *r = queryResult(sequence);
  588. if (r)
  589. r->append(len, data);
  590. }
  591. virtual void appendRawRow(unsigned sequence, unsigned len, const char *data)
  592. {
  593. FlushingStringBuffer *r = queryResult(sequence);
  594. if (r)
  595. {
  596. r->append(len, data);
  597. r->incrementRowCount();
  598. r->flush(false);
  599. }
  600. }
  601. virtual void appendSimpleRow(unsigned sequence, const char *str)
  602. {
  603. FlushingStringBuffer *r = queryResult(sequence);
  604. if (r)
  605. r->append(str);
  606. }
  607. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  608. {
  609. FlushingStringBuffer *r = queryResult(sequence);
  610. if (r)
  611. {
  612. startScalar(r, name, sequence);
  613. if (isRaw)
  614. r->append(sizeof(value), (char *)&value);
  615. else
  616. r->append(value ? "true" : "false");
  617. }
  618. }
  619. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  620. {
  621. FlushingStringBuffer *r = queryResult(sequence);
  622. if (r)
  623. {
  624. startScalar(r, name, sequence);
  625. r->encodeData(data, len);
  626. }
  627. }
  628. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  629. {
  630. FlushingStringBuffer *r = queryResult(sequence);
  631. if (r)
  632. {
  633. startScalar(r, name, sequence);
  634. if (isRaw)
  635. r->append(len, (const char *) data);
  636. else
  637. UNIMPLEMENTED;
  638. }
  639. }
  640. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  641. {
  642. FlushingStringBuffer *r = queryResult(sequence);
  643. if (r)
  644. {
  645. startScalar(r, name, sequence);
  646. if (isRaw)
  647. r->append(len, (char *)data);
  648. else if (mlFmt==MarkupFmt_XML)
  649. {
  650. assertex(transformer);
  651. CommonXmlWriter writer(xmlReadFlags|XWFnoindent, 0, r);
  652. transformer->toXML(isAll, len, (byte *)data, writer);
  653. }
  654. else if (mlFmt==MarkupFmt_JSON)
  655. {
  656. assertex(transformer);
  657. CommonJsonWriter writer(xmlReadFlags|XWFnoindent, 0, r);
  658. transformer->toXML(isAll, len, (byte *)data, writer);
  659. }
  660. else
  661. {
  662. assertex(transformer);
  663. r->append('[');
  664. if (isAll)
  665. r->appendf("*]");
  666. else
  667. {
  668. SimpleOutputWriter x;
  669. transformer->toXML(isAll, len, (const byte *) data, x);
  670. r->appendf("%s]", x.str());
  671. }
  672. }
  673. }
  674. }
  675. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  676. {
  677. FlushingStringBuffer *r = queryResult(sequence);
  678. if (r)
  679. {
  680. startScalar(r, name, sequence);
  681. if (isRaw)
  682. r->append(len, (char *)val);
  683. else
  684. {
  685. StringBuffer s;
  686. if (isSigned)
  687. outputXmlDecimal(val, len, precision, NULL, s);
  688. else
  689. outputXmlUDecimal(val, len, precision, NULL, s);
  690. r->append(s);
  691. }
  692. }
  693. }
  694. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size)
  695. {
  696. FlushingStringBuffer *r = queryResult(sequence);
  697. if (r)
  698. {
  699. if (isRaw)
  700. {
  701. startScalar(r, name, sequence);
  702. r->append(sizeof(value), (char *)&value);
  703. }
  704. else
  705. r->setScalarInt(name, sequence, value, size);
  706. }
  707. }
  708. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size)
  709. {
  710. FlushingStringBuffer *r = queryResult(sequence);
  711. if (r)
  712. {
  713. if (isRaw)
  714. {
  715. startScalar(r, name, sequence);
  716. r->append(sizeof(value), (char *)&value);
  717. }
  718. else
  719. r->setScalarUInt(name, sequence, value, size);
  720. }
  721. }
  722. virtual void setResultReal(const char *name, unsigned sequence, double value)
  723. {
  724. FlushingStringBuffer *r = queryResult(sequence);
  725. if (r)
  726. {
  727. startScalar(r, name, sequence);
  728. r->append(value);
  729. }
  730. }
  731. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  732. {
  733. FlushingStringBuffer *r = queryResult(sequence);
  734. if (r)
  735. {
  736. startScalar(r, name, sequence);
  737. if (r->isRaw)
  738. {
  739. r->append(len, str);
  740. }
  741. else
  742. {
  743. r->encodeString(str, len);
  744. }
  745. }
  746. }
  747. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  748. {
  749. FlushingStringBuffer *r = queryResult(sequence);
  750. if (r)
  751. {
  752. startScalar(r, name, sequence);
  753. if (r->isRaw)
  754. {
  755. r->append(len*2, (const char *) str);
  756. }
  757. else
  758. {
  759. rtlDataAttr buff;
  760. unsigned bufflen = 0;
  761. rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
  762. r->encodeString(buff.getstr(), bufflen, true); // output as UTF-8
  763. }
  764. }
  765. }
  766. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  767. {
  768. setResultString(name, sequence, strlen(value), value);
  769. }
  770. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  771. {
  772. setResultUnicode(name, sequence, rtlUnicodeStrlen(value), value);
  773. }
  774. virtual void flush()
  775. {
  776. ForEachItemIn(seq, resultMap)
  777. {
  778. FlushingStringBuffer *result = resultMap.item(seq);
  779. if (result)
  780. result->flush(true);
  781. }
  782. }
  783. virtual void finalize(unsigned seqNo, const char *delim, const char *filter, bool *needDelimiter)
  784. {
  785. ForEachItemIn(seq, resultMap)
  786. {
  787. FlushingStringBuffer *result = resultMap.item(seq);
  788. if (result && (!filter || !*filter || streq(filter, result->queryResultName())))
  789. {
  790. result->flush(true);
  791. for(;;)
  792. {
  793. size32_t length;
  794. void *payload = result->getPayload(length);
  795. if (!length)
  796. break;
  797. if (needDelimiter && *needDelimiter)
  798. {
  799. StringAttr s(delim); //write() will take ownership of buffer
  800. size32_t len = s.length();
  801. client->write((void *)s.detach(), len, true);
  802. *needDelimiter=false;
  803. }
  804. client->write(payload, length, true);
  805. }
  806. if (delim && needDelimiter)
  807. *needDelimiter=true;
  808. }
  809. }
  810. }
  811. virtual void appendProbeGraph(const char *xml)
  812. {
  813. if (!xml)
  814. {
  815. if (probe)
  816. probe.clear();
  817. return;
  818. }
  819. if (!probe)
  820. {
  821. probe.setown(new FlushingStringBuffer(client, isBlocked, MarkupFmt_XML, false, isHTTP, logctx));
  822. probe->startDataset("_Probe", NULL, (unsigned) -1); // initialize it
  823. }
  824. probe->append("\n");
  825. probe->append(xml);
  826. }
  827. };
  828. class CHpccXmlResultsWriter : public CHpccNativeResultsWriter
  829. {
  830. public:
  831. CHpccXmlResultsWriter(const char *queryname, SafeSocket *_client, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  832. CHpccNativeResultsWriter(queryname, _client, false, MarkupFmt_XML, false, _isHTTP, _logctx, _xmlReadFlags)
  833. {
  834. }
  835. virtual void addContent(TextMarkupFormat fmt, const char *content, const char *name)
  836. {
  837. StringBuffer xml;
  838. if (!content || !*content)
  839. return;
  840. if (fmt==MarkupFmt_JSON)
  841. {
  842. Owned<IPropertyTree> convertPT = createPTreeFromXMLString(content, ipt_fast);
  843. if (name && *name)
  844. appendXMLOpenTag(xml, name);
  845. toXML(convertPT, xml, 0, 0);
  846. if (name && *name)
  847. appendXMLCloseTag(xml, name);
  848. }
  849. }
  850. virtual void finalize(unsigned seqNo)
  851. {
  852. if (!isHTTP)
  853. {
  854. flush();
  855. return;
  856. }
  857. CriticalBlock b(resultsCrit);
  858. CriticalBlock b1(client->queryCrit());
  859. StringBuffer responseHead, responseTail;
  860. responseHead.append("<Results><Result>");
  861. unsigned len = responseHead.length();
  862. client->write(responseHead.detach(), len, true);
  863. ForEachItemIn(seq, resultMap)
  864. {
  865. FlushingStringBuffer *result = resultMap.item(seq);
  866. if (result)
  867. {
  868. result->flush(true);
  869. for(;;)
  870. {
  871. size32_t length;
  872. void *payload = result->getPayload(length);
  873. if (!length)
  874. break;
  875. client->write(payload, length, true);
  876. }
  877. }
  878. }
  879. responseTail.append("</Result></Results>");
  880. len = responseTail.length();
  881. client->write(responseTail.detach(), len, true);
  882. }
  883. };
  884. class CHpccJsonResultsWriter : public CHpccNativeResultsWriter
  885. {
  886. public:
  887. CHpccJsonResultsWriter(const char *queryname, SafeSocket *_client, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  888. CHpccNativeResultsWriter(queryname, _client, false, MarkupFmt_JSON, false, true, _logctx, _xmlReadFlags)
  889. {
  890. }
  891. virtual FlushingStringBuffer *createFlushingBuffer()
  892. {
  893. return new FlushingJsonBuffer(client, isBlocked, isHTTP, logctx);
  894. }
  895. virtual void finalize(unsigned seqNo)
  896. {
  897. CriticalBlock b(resultsCrit);
  898. CriticalBlock b1(client->queryCrit());
  899. StringBuffer responseHead, responseTail;
  900. appendJSONName(responseHead, "Results").append(" {");
  901. unsigned len = responseHead.length();
  902. client->write(responseHead.detach(), len, true);
  903. bool needDelimiter = false;
  904. ForEachItemIn(seq, resultMap)
  905. {
  906. FlushingStringBuffer *result = resultMap.item(seq);
  907. if (result)
  908. {
  909. result->flush(true);
  910. for(;;)
  911. {
  912. size32_t length;
  913. void *payload = result->getPayload(length);
  914. if (!length)
  915. break;
  916. if (needDelimiter)
  917. {
  918. StringAttr s(","); //write() will take ownership of buffer
  919. size32_t len = s.length();
  920. client->write((void *)s.detach(), len, true);
  921. needDelimiter=false;
  922. }
  923. client->write(payload, length, true);
  924. }
  925. needDelimiter=true;
  926. }
  927. }
  928. responseTail.append("}");
  929. len = responseTail.length();
  930. client->write(responseTail.detach(), len, true);
  931. }
  932. };
  933. class CHpccNativeProtocolResponse : implements IHpccNativeProtocolResponse, public CInterface
  934. {
  935. protected:
  936. SafeSocket *client;
  937. StringAttr queryName;
  938. StringArray resultFilter;
  939. StringBuffer rootTag;
  940. const IContextLogger &logctx;
  941. TextMarkupFormat mlFmt;
  942. PTreeReaderOptions xmlReadFlags;
  943. Owned<CHpccNativeResultsWriter> results; //hpcc results section
  944. IPointerArrayOf<FlushingStringBuffer> contentsMap; //other sections
  945. CriticalSection contentsCrit;
  946. unsigned protocolFlags;
  947. bool isHTTP;
  948. public:
  949. IMPLEMENT_IINTERFACE;
  950. CHpccNativeProtocolResponse(const char *queryname, SafeSocket *_client, TextMarkupFormat _mlFmt, unsigned flags, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_resultFilterString, const char *_rootTag) :
  951. client(_client), queryName(queryname), logctx(_logctx), mlFmt(_mlFmt), xmlReadFlags(_xmlReadFlags), protocolFlags(flags), isHTTP(_isHTTP), rootTag(_rootTag)
  952. {
  953. resultFilter.appendList(_resultFilterString, ".");
  954. if (!rootTag.length() && resultFilter.length())
  955. rootTag.set(resultFilter.item(0)).replace(' ', '_');
  956. }
  957. ~CHpccNativeProtocolResponse()
  958. {
  959. }
  960. virtual unsigned getFlags()
  961. {
  962. return protocolFlags;
  963. }
  964. inline bool getIsRaw()
  965. {
  966. return (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW);
  967. }
  968. inline bool getIsBlocked()
  969. {
  970. return (protocolFlags & HPCC_PROTOCOL_BLOCKED);
  971. }
  972. inline bool getTrim()
  973. {
  974. return (protocolFlags & HPCC_PROTOCOL_TRIM);
  975. }
  976. virtual FlushingStringBuffer *queryAppendContentBuffer()
  977. {
  978. CriticalBlock procedure(contentsCrit);
  979. FlushingStringBuffer *content;
  980. if (mlFmt==MarkupFmt_JSON)
  981. content = new FlushingJsonBuffer(client, getIsBlocked(), isHTTP, logctx);
  982. else
  983. content = new FlushingStringBuffer(client, getIsBlocked(), mlFmt, getIsRaw(), isHTTP, logctx);
  984. content->isSoap = isHTTP;
  985. content->trim = getTrim();
  986. content->queryName.set(queryName);
  987. if (!isHTTP)
  988. content->startBlock();
  989. contentsMap.append(content);
  990. return content;
  991. }
  992. virtual IHpccProtocolResultsWriter *queryHpccResultsSection()
  993. {
  994. if (!results)
  995. {
  996. results.setown(new CHpccNativeResultsWriter(queryName, client, getIsBlocked(), mlFmt, getIsRaw(), isHTTP, logctx, xmlReadFlags));
  997. if (rootTag.length())
  998. results->setTagName(rootTag);
  999. if (resultFilter.length())
  1000. {
  1001. results->setAdaptiveRoot();
  1002. results->setResultFilter(resultFilter.item(0));
  1003. }
  1004. if (resultFilter.isItem(1) && strieq("row", resultFilter.item(1)))
  1005. results->setOnlyUseFirstRow();
  1006. }
  1007. return results;
  1008. }
  1009. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  1010. {
  1011. throwUnexpected();
  1012. }
  1013. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  1014. {
  1015. throwUnexpected();
  1016. }
  1017. virtual void finalize(unsigned seqNo)
  1018. {
  1019. flush();
  1020. if (!isHTTP)
  1021. {
  1022. unsigned replyLen = 0;
  1023. client->write(&replyLen, sizeof(replyLen));
  1024. }
  1025. }
  1026. virtual bool checkConnection()
  1027. {
  1028. if (client)
  1029. return client->checkConnection();
  1030. else
  1031. return true;
  1032. }
  1033. virtual void sendHeartBeat()
  1034. {
  1035. if (client)
  1036. client->sendHeartBeat(logctx);
  1037. }
  1038. virtual SafeSocket *querySafeSocket()
  1039. {
  1040. return client;
  1041. }
  1042. virtual void flush()
  1043. {
  1044. if (results)
  1045. results->flush();
  1046. ForEachItemIn(i, contentsMap)
  1047. contentsMap.item(i)->flush(true);
  1048. }
  1049. virtual void appendProbeGraph(const char *xml)
  1050. {
  1051. if (results)
  1052. results->appendProbeGraph(xml);
  1053. }
  1054. };
  1055. class CHpccJsonResponse : public CHpccNativeProtocolResponse
  1056. {
  1057. private:
  1058. bool needDelimiter = false;
  1059. public:
  1060. CHpccJsonResponse(const char *queryname, SafeSocket *_client, unsigned flags, bool _isHttp, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_resultFilter, const char *_rootTag) :
  1061. CHpccNativeProtocolResponse(queryname, _client, MarkupFmt_JSON, flags, _isHttp, _logctx, _xmlReadFlags, _resultFilter, _rootTag)
  1062. {
  1063. }
  1064. virtual IHpccProtocolResultsWriter *getHpccResultsSection()
  1065. {
  1066. if (!results)
  1067. results.setown(new CHpccJsonResultsWriter(queryName, client, logctx, xmlReadFlags));
  1068. return results;
  1069. }
  1070. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  1071. {
  1072. if (mlFmt!=MarkupFmt_XML && mlFmt!=MarkupFmt_JSON)
  1073. return;
  1074. StringBuffer json;
  1075. if (mlFmt==MarkupFmt_XML)
  1076. {
  1077. Owned<IPropertyTree> convertPT = createPTreeFromXMLString(StringBuffer("<Control>").append(content).append("</Control>"), ipt_fast);
  1078. toJSON(convertPT, json, 0, 0);
  1079. content = json.str();
  1080. }
  1081. FlushingStringBuffer *contentBuffer = queryAppendContentBuffer();
  1082. StringBuffer tag;
  1083. if (name && *name)
  1084. appendJSONName(tag, name);
  1085. contentBuffer->append(tag);
  1086. contentBuffer->append(content);
  1087. }
  1088. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  1089. {
  1090. FlushingStringBuffer *content = queryAppendContentBuffer();
  1091. if (name && *name)
  1092. {
  1093. StringBuffer tag;
  1094. appendJSONName(tag, name);
  1095. content->append(tag);
  1096. }
  1097. Owned<IXmlWriter> xmlwriter = createIXmlWriterExt(XWFnoindent, 1, content, WTJSON);
  1098. return xmlwriter.getClear();
  1099. }
  1100. void outputContent()
  1101. {
  1102. ForEachItemIn(seq, contentsMap)
  1103. {
  1104. FlushingStringBuffer *content = contentsMap.item(seq);
  1105. if (content)
  1106. {
  1107. content->flush(true);
  1108. for(;;)
  1109. {
  1110. size32_t length;
  1111. void *payload = content->getPayload(length);
  1112. if (!length)
  1113. break;
  1114. if (needDelimiter)
  1115. {
  1116. StringAttr s(","); //write() will take ownership of buffer
  1117. size32_t len = s.length();
  1118. client->write((void *)s.detach(), len, true);
  1119. needDelimiter=false;
  1120. }
  1121. client->write(payload, length, true);
  1122. }
  1123. needDelimiter=true;
  1124. }
  1125. }
  1126. }
  1127. virtual void finalize(unsigned seqNo)
  1128. {
  1129. if (!isHTTP)
  1130. {
  1131. CHpccNativeProtocolResponse::finalize(seqNo);
  1132. return;
  1133. }
  1134. CriticalBlock b(contentsCrit);
  1135. CriticalBlock b1(client->queryCrit());
  1136. StringBuffer responseHead, responseTail;
  1137. if (!resultFilter.ordinality() && !(protocolFlags & HPCC_PROTOCOL_CONTROL))
  1138. {
  1139. StringBuffer name(queryName.get());
  1140. if (isHTTP)
  1141. name.append("Response");
  1142. appendJSONName(responseHead, name.str()).append(" {");
  1143. appendJSONValue(responseHead, "sequence", seqNo);
  1144. appendJSONName(responseHead, "Results").append(" {");
  1145. unsigned len = responseHead.length();
  1146. client->write(responseHead.detach(), len, true);
  1147. }
  1148. if (results)
  1149. results->finalize(seqNo, ",", resultFilter.ordinality() ? resultFilter.item(0) : NULL, &needDelimiter);
  1150. if (!resultFilter.ordinality())
  1151. outputContent();
  1152. if (!resultFilter.ordinality() && !(protocolFlags & HPCC_PROTOCOL_CONTROL))
  1153. {
  1154. responseTail.append("}}");
  1155. unsigned len = responseTail.length();
  1156. client->write(responseTail.detach(), len, true);
  1157. }
  1158. }
  1159. };
  1160. class CHpccXmlResponse : public CHpccNativeProtocolResponse
  1161. {
  1162. public:
  1163. CHpccXmlResponse(const char *queryname, SafeSocket *_client, unsigned flags, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_resultFilter, const char *_rootTag) :
  1164. CHpccNativeProtocolResponse(queryname, _client, MarkupFmt_XML, flags, _isHTTP, _logctx, _xmlReadFlags, _resultFilter, _rootTag)
  1165. {
  1166. }
  1167. virtual IHpccProtocolResultsWriter *getHpccResultsSection()
  1168. {
  1169. if (!results)
  1170. results.setown(new CHpccXmlResultsWriter(queryName, client, isHTTP, logctx, xmlReadFlags));
  1171. return results;
  1172. }
  1173. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  1174. {
  1175. if (mlFmt!=MarkupFmt_XML && mlFmt!=MarkupFmt_JSON)
  1176. return;
  1177. StringBuffer xml;
  1178. if (mlFmt==MarkupFmt_JSON)
  1179. {
  1180. Owned<IPropertyTree> convertPT = createPTreeFromJSONString(content, ipt_fast);
  1181. toXML(convertPT, xml, 0, 0);
  1182. content = xml.str();
  1183. }
  1184. FlushingStringBuffer *contentBuffer = queryAppendContentBuffer();
  1185. if (name && *name)
  1186. {
  1187. StringBuffer tag;
  1188. appendXMLOpenTag(tag, name);
  1189. contentBuffer->append(tag.append('\n'));
  1190. appendXMLCloseTag(tag.clear(), name);
  1191. contentBuffer->setTail(tag.append('\n'));
  1192. }
  1193. contentBuffer->append(content);
  1194. }
  1195. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  1196. {
  1197. FlushingStringBuffer *content = queryAppendContentBuffer();
  1198. StringBuffer tag;
  1199. if (name && *name)
  1200. {
  1201. appendXMLOpenTag(tag, name);
  1202. content->append(tag);
  1203. appendXMLCloseTag(tag.clear(), name);
  1204. content->setTail(tag);
  1205. }
  1206. Owned<IXmlWriter> xmlwriter = createIXmlWriterExt(0, 1, content, WTStandard);
  1207. return xmlwriter.getClear();
  1208. }
  1209. void outputContent()
  1210. {
  1211. ForEachItemIn(seq, contentsMap)
  1212. {
  1213. FlushingStringBuffer *content = contentsMap.item(seq);
  1214. if (content)
  1215. {
  1216. content->flush(true);
  1217. if (!this->isHTTP)
  1218. continue;
  1219. for(;;)
  1220. {
  1221. size32_t length;
  1222. void *payload = content->getPayload(length);
  1223. if (!length)
  1224. break;
  1225. client->write(payload, length, true);
  1226. }
  1227. }
  1228. }
  1229. }
  1230. virtual void finalize(unsigned seqNo)
  1231. {
  1232. if (!isHTTP)
  1233. {
  1234. CHpccNativeProtocolResponse::finalize(seqNo);
  1235. return;
  1236. }
  1237. CriticalBlock b(contentsCrit);
  1238. CriticalBlock b1(client->queryCrit());
  1239. StringBuffer responseHead, responseTail;
  1240. if (!resultFilter.ordinality() && !(protocolFlags & HPCC_PROTOCOL_CONTROL))
  1241. {
  1242. responseHead.append("<").append(queryName);
  1243. responseHead.append("Response").append(" xmlns=\"urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.str()).append('\"');
  1244. responseHead.append(" sequence=\"").append(seqNo).append("\"><Results><Result>");
  1245. unsigned len = responseHead.length();
  1246. client->write(responseHead.detach(), len, true);
  1247. }
  1248. if (results)
  1249. results->finalize(seqNo, NULL, resultFilter.ordinality() ? resultFilter.item(0) : NULL, nullptr);
  1250. if (!resultFilter.ordinality())
  1251. outputContent();
  1252. if (!resultFilter.ordinality() && !(protocolFlags & HPCC_PROTOCOL_CONTROL))
  1253. {
  1254. responseTail.append("</Result></Results></").append(queryName);
  1255. if (isHTTP)
  1256. responseTail.append("Response");
  1257. responseTail.append('>');
  1258. unsigned len = responseTail.length();
  1259. client->write(responseTail.detach(), len, true);
  1260. }
  1261. }
  1262. };
  1263. IHpccProtocolResponse *createProtocolResponse(const char *queryname, SafeSocket *client, HttpHelper &httpHelper, const IContextLogger &logctx, unsigned protocolFlags, PTreeReaderOptions xmlReadFlags)
  1264. {
  1265. StringAttr filter, tag;
  1266. httpHelper.getResultFilterAndTag(filter, tag);
  1267. if ((protocolFlags & HPCC_PROTOCOL_NATIVE_RAW) || (protocolFlags & HPCC_PROTOCOL_NATIVE_ASCII))
  1268. return new CHpccNativeProtocolResponse(queryname, client, MarkupFmt_Unknown, protocolFlags, false, logctx, xmlReadFlags, filter, tag);
  1269. else if (httpHelper.queryResponseMlFormat()==MarkupFmt_JSON)
  1270. return new CHpccJsonResponse(queryname, client, protocolFlags, httpHelper.isHttp(), logctx, xmlReadFlags, filter, tag);
  1271. return new CHpccXmlResponse(queryname, client, protocolFlags, httpHelper.isHttp(), logctx, xmlReadFlags, filter, tag);
  1272. }
  1273. class CHttpRequestAsyncFor : public CInterface, public CAsyncFor
  1274. {
  1275. private:
  1276. const char *queryName, *queryText, *querySetName;
  1277. const IContextLogger &logctx;
  1278. IArrayOf<IPropertyTree> &requestArray;
  1279. Linked<IHpccProtocolMsgSink> sink;
  1280. Linked<IHpccProtocolMsgContext> msgctx;
  1281. SafeSocket &client;
  1282. HttpHelper &httpHelper;
  1283. PTreeReaderOptions xmlReadFlags;
  1284. unsigned &memused;
  1285. unsigned &slaveReplyLen;
  1286. CriticalSection crit;
  1287. unsigned flags;
  1288. public:
  1289. CHttpRequestAsyncFor(const char *_queryName, IHpccProtocolMsgSink *_sink, IHpccProtocolMsgContext *_msgctx, IArrayOf<IPropertyTree> &_requestArray,
  1290. SafeSocket &_client, HttpHelper &_httpHelper, unsigned _flags, unsigned &_memused, unsigned &_slaveReplyLen, const char *_queryText, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
  1291. : sink(_sink), msgctx(_msgctx), requestArray(_requestArray), client(_client), httpHelper(_httpHelper), memused(_memused),
  1292. slaveReplyLen(_slaveReplyLen), logctx(_logctx), xmlReadFlags(_xmlReadFlags), querySetName(_querySetName), flags(_flags)
  1293. {
  1294. queryName = _queryName;
  1295. queryText = _queryText;
  1296. }
  1297. void onException(IException *E)
  1298. {
  1299. //if (!logctx.isBlind())
  1300. // logctx.CTXLOG("FAILED: %s", queryText);
  1301. StringBuffer error("EXCEPTION: ");
  1302. E->errorMessage(error);
  1303. DBGLOG("%s", error.str());
  1304. client.checkSendHttpException(httpHelper, E, queryName);
  1305. E->Release();
  1306. }
  1307. void Do(unsigned idx)
  1308. {
  1309. try
  1310. {
  1311. IPropertyTree &request = requestArray.item(idx);
  1312. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(request.queryName(), &client, httpHelper, logctx, flags, xmlReadFlags);
  1313. sink->onQueryMsg(msgctx, &request, protocol, flags, xmlReadFlags, querySetName, idx, memused, slaveReplyLen);
  1314. }
  1315. catch (IException * E)
  1316. {
  1317. onException(E);
  1318. }
  1319. catch (...)
  1320. {
  1321. onException(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception"));
  1322. }
  1323. }
  1324. };
  1325. enum class WhiteSpaceHandling
  1326. {
  1327. Default,
  1328. Strip,
  1329. Preserve
  1330. };
  1331. class QueryNameExtractor : implements IPTreeNotifyEvent, public CInterface
  1332. {
  1333. public:
  1334. TextMarkupFormat mlFmt;
  1335. StringAttr prefix;
  1336. StringAttr name;
  1337. unsigned headerDepth;
  1338. bool isSoap;
  1339. bool isRequestArray;
  1340. bool isRequest = false;
  1341. WhiteSpaceHandling whitespace=WhiteSpaceHandling::Default;
  1342. bool more;
  1343. public:
  1344. IMPLEMENT_IINTERFACE;
  1345. QueryNameExtractor(TextMarkupFormat _mlFmt) : mlFmt(_mlFmt), headerDepth(0), isSoap(false), isRequestArray(false), more(true)
  1346. {
  1347. }
  1348. void extractName(HttpHelper &httpHelper, const char *msg, const IContextLogger &logctx, const char *peer, unsigned port)
  1349. {
  1350. const char *urlName = httpHelper.queryQueryName(); //"Adaptive REST" query name and attrs can come from URL
  1351. if (httpHelper.isHttpGet() || httpHelper.isMappedToInputParameter()) //these types can't have roxie attrs in the content body
  1352. {
  1353. name.set(urlName); //if blank will return error as expected
  1354. return;
  1355. }
  1356. Owned<IPullPTreeReader> parser;
  1357. if (mlFmt==MarkupFmt_JSON)
  1358. parser.setown(createPullJSONStringReader(msg, *this));
  1359. else if (mlFmt==MarkupFmt_XML)
  1360. parser.setown(createPullXMLStringReader(msg, *this));
  1361. if (!parser)
  1362. return;
  1363. while (more && parser->next());
  1364. if (urlName && *urlName)
  1365. {
  1366. name.set(urlName);
  1367. return;
  1368. }
  1369. if (name.isEmpty())
  1370. {
  1371. const char *fmt = mlFmt==MarkupFmt_XML ? "XML" : "JSON";
  1372. IException *E = MakeStringException(-1, "ERROR: Invalid %s queryName not found - received from %s:%d - %s", fmt, peer, port, msg);
  1373. logctx.logOperatorException(E, __FILE__, __LINE__, "Invalid query %s", fmt);
  1374. throw E;
  1375. }
  1376. String nameStr(name.get());
  1377. if (nameStr.endsWith("RequestArray"))
  1378. {
  1379. isRequestArray = true;
  1380. name.set(nameStr.str(), nameStr.length() - strlen("RequestArray"));
  1381. }
  1382. else if (nameStr.endsWith("Request"))
  1383. {
  1384. isRequest = true;
  1385. name.set(nameStr.str(), nameStr.length() - strlen("Request"));
  1386. }
  1387. }
  1388. virtual void beginNode(const char *tag, offset_t startOffset)
  1389. {
  1390. if (streq(tag, "__object__"))
  1391. return;
  1392. const char *local = strchr(tag, ':');
  1393. if (local)
  1394. local++;
  1395. else
  1396. local = tag;
  1397. if (mlFmt==MarkupFmt_XML)
  1398. {
  1399. if (!isSoap && streq(local, "Envelope"))
  1400. {
  1401. isSoap=true;
  1402. return;
  1403. }
  1404. if (isSoap && streq(local, "Header"))
  1405. {
  1406. headerDepth++;
  1407. return;
  1408. }
  1409. if (isSoap && !headerDepth && streq(local, "Body"))
  1410. return;
  1411. }
  1412. if (!headerDepth)
  1413. {
  1414. name.set(local);
  1415. if (tag!=local)
  1416. prefix.set(tag, local-tag-1);
  1417. }
  1418. }
  1419. virtual void newAttribute(const char *attr, const char *value)
  1420. {
  1421. if (!name.isEmpty() && strieq(attr, "@stripWhitespaceFromStoredDataset"))
  1422. {
  1423. whitespace = strToBool(value) ? WhiteSpaceHandling::Strip : WhiteSpaceHandling::Preserve;
  1424. more = false;
  1425. }
  1426. }
  1427. virtual void beginNodeContent(const char *tag)
  1428. {
  1429. if (!name.isEmpty())
  1430. more = false;
  1431. }
  1432. virtual void endNode(const char *tag, unsigned length, const void *value, bool binary, offset_t endOffset)
  1433. {
  1434. if (!name.isEmpty())
  1435. more = false;
  1436. else if (headerDepth) //will never be true if !isSoap
  1437. {
  1438. const char *local = strchr(tag, ':');
  1439. if (local)
  1440. local++;
  1441. else
  1442. local = tag;
  1443. if (streq(local, "Header"))
  1444. headerDepth--;
  1445. }
  1446. }
  1447. };
  1448. static Owned<IActiveQueryLimiterFactory> queryLimiterFactory;
  1449. class RoxieSocketWorker : public ProtocolQueryWorker
  1450. {
  1451. SocketEndpoint ep;
  1452. Owned<SafeSocket> client;
  1453. Owned<IHpccNativeProtocolMsgSink> sink;
  1454. public:
  1455. RoxieSocketWorker(ProtocolSocketListener *_pool, SocketEndpoint &_ep)
  1456. : ProtocolQueryWorker(_pool), ep(_ep)
  1457. {
  1458. sink.set(dynamic_cast<IHpccNativeProtocolMsgSink*>(_pool->queryMsgSink()));
  1459. }
  1460. // interface IPooledThread
  1461. virtual void init(void *_r) override
  1462. {
  1463. client.setown(new CSafeSocket((ISocket *) _r));
  1464. ProtocolQueryWorker::init(_r);
  1465. }
  1466. virtual void threadmain() override
  1467. {
  1468. doMain("");
  1469. }
  1470. virtual void runOnce(const char *query)
  1471. {
  1472. doMain(query);
  1473. }
  1474. private:
  1475. static void sendHttpServerTooBusy(SafeSocket &client, const IContextLogger &logctx)
  1476. {
  1477. StringBuffer message;
  1478. message.append("HTTP/1.0 503 Server Too Busy\r\n\r\n");
  1479. message.append("Server too busy, please try again later");
  1480. StringBuffer err("Too many active queries"); // write out Too many active queries - make searching for this error consistent
  1481. if (!global->trapTooManyActiveQueries)
  1482. {
  1483. err.appendf(" %s", message.str());
  1484. logctx.CTXLOG("%s", err.str());
  1485. }
  1486. else
  1487. {
  1488. IException *E = MakeStringException(ROXIE_TOO_MANY_QUERIES, "%s", err.str());
  1489. logctx.logOperatorException(E, __FILE__, __LINE__, "%s", message.str());
  1490. E->Release();
  1491. }
  1492. try
  1493. {
  1494. client.setHttpMode(false); //For historical reasons HTTP mode really means SOAP/JSON, we want raw HTTP here. Should be made more clear
  1495. client.write(message.str(), message.length());
  1496. }
  1497. catch (IException *E)
  1498. {
  1499. logctx.logOperatorException(E, __FILE__, __LINE__, "Exception caught in sendHttpServerTooBusy");
  1500. E->Release();
  1501. }
  1502. catch (...)
  1503. {
  1504. logctx.logOperatorException(NULL, __FILE__, __LINE__, "sendHttpServerTooBusy write failed (Unknown exception)");
  1505. }
  1506. }
  1507. void skipProtocolRoot(Owned<IPropertyTree> &queryPT, HttpHelper &httpHelper, const char *queryName)
  1508. {
  1509. if (queryPT)
  1510. {
  1511. const char *tagName = queryPT->queryName();
  1512. if (httpHelper.isHttp())
  1513. {
  1514. if (httpHelper.queryRequestMlFormat()==MarkupFmt_JSON)
  1515. {
  1516. if (strieq(tagName, "__array__"))
  1517. throw MakeStringException(ROXIE_DATA_ERROR, "JSON request array not implemented");
  1518. if (strieq(tagName, "__object__"))
  1519. {
  1520. queryPT.setown(queryPT->getPropTree("*[1]"));
  1521. if (!queryPT)
  1522. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed JSON request (missing Body)");
  1523. }
  1524. }
  1525. else
  1526. {
  1527. if (strieq(tagName, "envelope"))
  1528. queryPT.setown(queryPT->getPropTree("Body/*"));
  1529. else if (!strnicmp(httpHelper.queryContentType(), "application/soap", strlen("application/soap")))
  1530. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed SOAP request");
  1531. if (!queryPT)
  1532. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed SOAP request (missing Body)");
  1533. queryPT->removeProp("@xmlns:m");
  1534. queryPT->renameProp("/", queryName); // reset the name of the tree
  1535. }
  1536. }
  1537. }
  1538. else
  1539. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed request");
  1540. }
  1541. void sanitizeQuery(Owned<IPropertyTree> &queryPT, StringAttr &queryName, StringBuffer &saniText, HttpHelper &httpHelper, const char *&uid, bool &isBlind, bool &isDebug)
  1542. {
  1543. if (queryPT)
  1544. {
  1545. // convert to XML with attribute values in single quotes - makes replaying queries easier
  1546. uid = queryPT->queryProp("@uid");
  1547. if (!uid)
  1548. uid = queryPT->queryProp("_TransactionId");
  1549. isBlind = queryPT->getPropBool("@blind", false) || queryPT->getPropBool("_blind", false);
  1550. isDebug = queryPT->getPropBool("@debug") || queryPT->getPropBool("_Probe", false);
  1551. toXML(queryPT, saniText, 0, isBlind ? (XML_SingleQuoteAttributeValues | XML_Sanitize) : XML_SingleQuoteAttributeValues);
  1552. }
  1553. }
  1554. void createQueryPTree(Owned<IPropertyTree> &queryPT, HttpHelper &httpHelper, const char *text, byte flags, byte options, const char *queryName)
  1555. {
  1556. StringBuffer logxml;
  1557. if (httpHelper.queryRequestMlFormat()==MarkupFmt_URL)
  1558. {
  1559. queryPT.setown(httpHelper.createPTreeFromParameters(flags));
  1560. toXML(queryPT, logxml);
  1561. DBGLOG("%s", logxml.str());
  1562. return;
  1563. }
  1564. if (httpHelper.queryRequestMlFormat()==MarkupFmt_JSON)
  1565. queryPT.setown(createPTreeFromJSONString(text, flags, (PTreeReaderOptions) options));
  1566. else
  1567. queryPT.setown(createPTreeFromXMLString(text, flags, (PTreeReaderOptions) options));
  1568. queryPT.setown(httpHelper.checkAddWrapperForAdaptiveInput(queryPT.getClear(), flags));
  1569. skipProtocolRoot(queryPT, httpHelper, queryName);
  1570. if (queryPT->hasProp("_stripWhitespaceFromStoredDataset"))
  1571. {
  1572. bool stripTag = queryPT->getPropBool("_stripWhitespaceFromStoredDataset");
  1573. bool stripFlag = (options & ptr_ignoreWhiteSpace) != 0;
  1574. if (stripTag != stripFlag)
  1575. {
  1576. if (stripTag)
  1577. options |= ptr_ignoreWhiteSpace;
  1578. else
  1579. options &= ~ptr_ignoreWhiteSpace;
  1580. //The tag _stripWhitespaceFromStoredDataset can appear anywhere at the same level as query inputs
  1581. //it can't be checked until after parsing the full request, so if it changes the parse flags
  1582. //we have to parse the request again now
  1583. createQueryPTree(queryPT, httpHelper, text, flags, options, queryName);
  1584. }
  1585. }
  1586. }
  1587. void doMain(const char *runQuery)
  1588. {
  1589. StringBuffer rawText(runQuery);
  1590. unsigned memused = 0;
  1591. IpAddress peer;
  1592. bool continuationNeeded = false;
  1593. bool isStatus = false;
  1594. unsigned remainingHttpConnectionRequests = global->maxHttpConnectionRequests ? global->maxHttpConnectionRequests : 1;
  1595. unsigned readWait = WAIT_FOREVER;
  1596. Owned<IHpccProtocolMsgContext> msgctx = sink->createMsgContext(startTime);
  1597. IContextLogger &logctx = *msgctx->queryLogContext();
  1598. readAnother:
  1599. unsigned slavesReplyLen = 0;
  1600. StringArray allTargets;
  1601. sink->getTargetNames(allTargets);
  1602. HttpHelper httpHelper(&allTargets);
  1603. try
  1604. {
  1605. if (client)
  1606. {
  1607. client->querySocket()->getPeerAddress(peer);
  1608. if (!client->readBlock(rawText.clear(), readWait, &httpHelper, continuationNeeded, isStatus, global->maxBlockSize))
  1609. {
  1610. if (traceLevel > 8)
  1611. {
  1612. StringBuffer b;
  1613. DBGLOG("No data reading query from socket");
  1614. }
  1615. client.clear();
  1616. return;
  1617. }
  1618. }
  1619. if (continuationNeeded)
  1620. {
  1621. qstart = msTick();
  1622. time(&startTime);
  1623. }
  1624. }
  1625. catch (IException * E)
  1626. {
  1627. if (traceLevel > 0)
  1628. {
  1629. StringBuffer b;
  1630. DBGLOG("Error reading query from socket: %s", E->errorMessage(b).str());
  1631. }
  1632. E->Release();
  1633. client.clear();
  1634. return;
  1635. }
  1636. bool isHTTP = httpHelper.isHttp();
  1637. if (isHTTP)
  1638. {
  1639. if (httpHelper.allowKeepAlive())
  1640. client->setHttpKeepAlive(remainingHttpConnectionRequests > 1);
  1641. else
  1642. remainingHttpConnectionRequests = 1;
  1643. }
  1644. TextMarkupFormat mlResponseFmt = MarkupFmt_Unknown;
  1645. TextMarkupFormat mlRequestFmt = MarkupFmt_Unknown;
  1646. if (!isStatus)
  1647. {
  1648. if (!isHTTP)
  1649. mlResponseFmt = mlRequestFmt = MarkupFmt_XML;
  1650. else
  1651. {
  1652. mlResponseFmt = httpHelper.queryResponseMlFormat();
  1653. mlRequestFmt = httpHelper.queryRequestMlFormat();
  1654. const char *value = httpHelper.queryRequestHeader(logctx.queryGlobalIdHttpHeader());
  1655. if (!value || !*value)
  1656. value = httpHelper.queryRequestHeader("HPCC-Global-Id"); //always support receiving in the HPCC form
  1657. if (value && *value)
  1658. msgctx->setTransactionId(value, true); //logged and forwarded through SOAPCALL/HTTPCALL
  1659. value = httpHelper.queryRequestHeader(logctx.queryCallerIdHttpHeader());
  1660. if (!value || !*value)
  1661. value = httpHelper.queryRequestHeader("HPCC-Caller-Id");
  1662. if (value && *value)
  1663. msgctx->setCallerId(value); //only logged
  1664. }
  1665. }
  1666. bool failed = false;
  1667. bool isRequest = false;
  1668. bool isRequestArray = false;
  1669. bool isBlind = false;
  1670. bool isDebug = false;
  1671. unsigned protocolFlags = isHTTP ? 0 : HPCC_PROTOCOL_NATIVE;
  1672. Owned<IPropertyTree> queryPT;
  1673. StringBuffer sanitizedText;
  1674. StringBuffer peerStr;
  1675. peer.getIpText(peerStr);
  1676. const char *uid = "-";
  1677. StringAttr queryName;
  1678. StringAttr queryPrefix;
  1679. WhiteSpaceHandling whitespace = WhiteSpaceHandling::Default;
  1680. try
  1681. {
  1682. if (httpHelper.isHttpGet() || httpHelper.isFormPost())
  1683. {
  1684. queryName.set(httpHelper.queryQueryName());
  1685. if (httpHelper.isControlUrl())
  1686. queryPrefix.set("control");
  1687. }
  1688. else if (mlRequestFmt==MarkupFmt_XML || mlRequestFmt==MarkupFmt_JSON)
  1689. {
  1690. QueryNameExtractor extractor(mlRequestFmt);
  1691. extractor.extractName(httpHelper, rawText.str(), logctx, peerStr, ep.port);
  1692. queryName.set(extractor.name);
  1693. queryPrefix.set(extractor.prefix);
  1694. whitespace = extractor.whitespace;
  1695. isRequest = extractor.isRequest;
  1696. isRequestArray = extractor.isRequestArray;
  1697. if (httpHelper.isHttp())
  1698. httpHelper.setUseEnvelope(extractor.isSoap);
  1699. }
  1700. if (streq(queryPrefix.str(), "control"))
  1701. {
  1702. if (httpHelper.isHttp())
  1703. client->setHttpMode(queryName, false, httpHelper);
  1704. bool aclupdate = strieq(queryName, "aclupdate"); //ugly
  1705. byte iptFlags = aclupdate ? ipt_caseInsensitive|ipt_fast : ipt_fast;
  1706. createQueryPTree(queryPT, httpHelper, rawText, iptFlags, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_ignoreNameSpaces), queryName);
  1707. //IPropertyTree *root = queryPT;
  1708. if (!strchr(queryName, ':'))
  1709. {
  1710. VStringBuffer fullname("control:%s", queryName.str()); //just easier to keep for debugging and internal checking
  1711. queryPT->renameProp("/", fullname);
  1712. }
  1713. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags | HPCC_PROTOCOL_CONTROL, global->defaultXmlReadFlags);
  1714. sink->onControlMsg(msgctx, queryPT, protocol);
  1715. protocol->finalize(0);
  1716. if (streq(queryName, "lock") || streq(queryName, "childlock"))
  1717. goto readAnother;
  1718. }
  1719. else if (isStatus)
  1720. {
  1721. client->write("OK", 2);
  1722. }
  1723. else
  1724. {
  1725. StringBuffer querySetName;
  1726. if (isHTTP)
  1727. {
  1728. client->setHttpMode(queryName, isRequestArray, httpHelper);
  1729. querySetName.set(httpHelper.queryTarget());
  1730. if (querySetName.length())
  1731. {
  1732. const char *target = global->targetAliases->queryProp(querySetName.str());
  1733. if (target)
  1734. querySetName.set(target);
  1735. }
  1736. }
  1737. msgctx->initQuery(querySetName, queryName); //needed here to allow checking hash options
  1738. if (whitespace == WhiteSpaceHandling::Default) //value in the request wins
  1739. whitespace = msgctx->getStripWhitespace() ? WhiteSpaceHandling::Strip : WhiteSpaceHandling::Preserve; //might be changed by hash option, returns default otherwise
  1740. unsigned readFlags = (unsigned) global->defaultXmlReadFlags | ptr_ignoreNameSpaces;
  1741. readFlags &= ~ptr_ignoreWhiteSpace;
  1742. readFlags |= (whitespace == WhiteSpaceHandling::Strip ? ptr_ignoreWhiteSpace : ptr_none);
  1743. try
  1744. {
  1745. createQueryPTree(queryPT, httpHelper, rawText.str(), ipt_caseInsensitive|ipt_fast, (PTreeReaderOptions)readFlags, queryName);
  1746. }
  1747. catch (IException *E)
  1748. {
  1749. logctx.logOperatorException(E, __FILE__, __LINE__, "Invalid XML received from %s:%d - %s", peerStr.str(), listener->queryPort(), rawText.str());
  1750. logctx.CTXLOG("ERROR: Invalid XML received from %s:%d - %s", peerStr.str(), listener->queryPort(), rawText.str());
  1751. throw;
  1752. }
  1753. uid = NULL;
  1754. sanitizeQuery(queryPT, queryName, sanitizedText, httpHelper, uid, isBlind, isDebug);
  1755. if (uid)
  1756. msgctx->setTransactionId(uid, false);
  1757. else
  1758. uid = "-";
  1759. sink->checkAccess(peer, queryName, sanitizedText, isBlind);
  1760. if (isDebug)
  1761. msgctx->verifyAllowDebug();
  1762. isBlind = msgctx->checkSetBlind(isBlind);
  1763. if (msgctx->logFullQueries())
  1764. {
  1765. StringBuffer soapStr;
  1766. (isRequest) ? soapStr.append("SoapRequest") : (isRequestArray) ? soapStr.append("SoapRequest") : soapStr.clear();
  1767. logctx.CTXLOG("%s %s:%d %s %s %s", isBlind ? "BLIND:" : "QUERY:", peerStr.str(), listener->queryPort(), uid, soapStr.str(), sanitizedText.str());
  1768. }
  1769. if (strieq(queryPrefix.str(), "debug"))
  1770. {
  1771. FlushingStringBuffer response(client, false, MarkupFmt_XML, false, isHTTP, logctx);
  1772. response.startDataset("Debug", NULL, (unsigned) -1);
  1773. CommonXmlWriter out(0, 1);
  1774. sink->onDebugMsg(msgctx, uid, queryPT, out);
  1775. response.append(out.str());
  1776. }
  1777. Owned<IActiveQueryLimiter> l;
  1778. if (queryLimiterFactory)
  1779. l.setown(queryLimiterFactory->create(listener));
  1780. if (l && !l->isAccepted())
  1781. {
  1782. if (isHTTP)
  1783. {
  1784. sendHttpServerTooBusy(*client, logctx);
  1785. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1786. logctx.CTXLOG("EXCEPTION: Too many active queries");
  1787. }
  1788. else
  1789. {
  1790. IException *e = MakeStringException(ROXIE_TOO_MANY_QUERIES, "Too many active queries");
  1791. if (msgctx->trapTooManyActiveQueries())
  1792. logctx.logOperatorException(e, __FILE__, __LINE__, NULL);
  1793. throw e;
  1794. }
  1795. }
  1796. else
  1797. {
  1798. int bindCores = queryPT->getPropInt("@bindCores", msgctx->getBindCores());
  1799. if (bindCores > 0)
  1800. listener->setThreadAffinity(bindCores);
  1801. IArrayOf<IPropertyTree> requestArray;
  1802. if (isHTTP)
  1803. {
  1804. if (isRequestArray)
  1805. {
  1806. StringBuffer reqIterString;
  1807. reqIterString.append(queryName).append("Request");
  1808. Owned<IPropertyTreeIterator> reqIter = queryPT->getElements(reqIterString.str());
  1809. ForEach(*reqIter)
  1810. {
  1811. IPropertyTree *fixedreq = createPTree(queryName, ipt_caseInsensitive|ipt_fast);
  1812. Owned<IPropertyTreeIterator> iter = reqIter->query().getElements("*");
  1813. ForEach(*iter)
  1814. {
  1815. fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
  1816. }
  1817. requestArray.append(*fixedreq);
  1818. }
  1819. }
  1820. else
  1821. {
  1822. IPropertyTree *fixedreq = createPTree(queryName, ipt_caseInsensitive|ipt_fast);
  1823. Owned<IPropertyTreeIterator> iter = queryPT->getElements("*");
  1824. ForEach(*iter)
  1825. {
  1826. fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
  1827. }
  1828. requestArray.append(*fixedreq);
  1829. msgctx->setIntercept(queryPT->getPropBool("@log", false));
  1830. msgctx->setTraceLevel(queryPT->getPropInt("@traceLevel", logctx.queryTraceLevel()));
  1831. }
  1832. if (httpHelper.getTrim())
  1833. protocolFlags |= HPCC_PROTOCOL_TRIM;
  1834. }
  1835. else
  1836. {
  1837. const char *format = queryPT->queryProp("@format");
  1838. if (format)
  1839. {
  1840. if (stricmp(format, "raw") == 0)
  1841. {
  1842. protocolFlags |= HPCC_PROTOCOL_NATIVE_RAW;
  1843. if (client) //not stand alone roxie exe
  1844. protocolFlags |= HPCC_PROTOCOL_BLOCKED;
  1845. mlResponseFmt = MarkupFmt_Unknown;
  1846. }
  1847. else if (stricmp(format, "bxml") == 0)
  1848. {
  1849. protocolFlags |= HPCC_PROTOCOL_BLOCKED;
  1850. mlResponseFmt = MarkupFmt_XML;
  1851. }
  1852. else if (stricmp(format, "ascii") == 0)
  1853. {
  1854. protocolFlags |= HPCC_PROTOCOL_NATIVE_ASCII;
  1855. mlResponseFmt = MarkupFmt_Unknown;
  1856. }
  1857. else if (stricmp(format, "xml") != 0) // xml is the default
  1858. throw MakeStringException(ROXIE_INVALID_INPUT, "Unsupported format specified: %s", format);
  1859. }
  1860. if (queryPT->getPropBool("@trim", false))
  1861. protocolFlags |= HPCC_PROTOCOL_TRIM;
  1862. msgctx->setIntercept(queryPT->getPropBool("@log", false));
  1863. msgctx->setTraceLevel(queryPT->getPropInt("@traceLevel", logctx.queryTraceLevel()));
  1864. }
  1865. msgctx->noteQueryActive();
  1866. if (isHTTP)
  1867. {
  1868. CHttpRequestAsyncFor af(queryName, sink, msgctx, requestArray, *client, httpHelper, protocolFlags, memused, slavesReplyLen, sanitizedText, logctx, (PTreeReaderOptions)readFlags, querySetName);
  1869. af.For(requestArray.length(), global->numRequestArrayThreads);
  1870. }
  1871. else
  1872. {
  1873. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags, (PTreeReaderOptions)readFlags);
  1874. sink->onQueryMsg(msgctx, queryPT, protocol, protocolFlags, (PTreeReaderOptions)readFlags, querySetName, 0, memused, slavesReplyLen);
  1875. }
  1876. }
  1877. }
  1878. }
  1879. catch (IException * E)
  1880. {
  1881. failed = true;
  1882. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1883. StringBuffer error;
  1884. E->errorMessage(error);
  1885. logctx.CTXLOG("EXCEPTION: %s", error.str());
  1886. unsigned code = E->errorCode();
  1887. if (QUERYINTERFACE(E, ISEH_Exception))
  1888. code = ROXIE_INTERNAL_ERROR;
  1889. else if (QUERYINTERFACE(E, IOutOfMemException))
  1890. code = ROXIE_MEMORY_ERROR;
  1891. if (client)
  1892. {
  1893. if (isHTTP)
  1894. client->checkSendHttpException(httpHelper, E, queryName);
  1895. else
  1896. client->sendException("Roxie", code, error.str(), (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW), logctx);
  1897. }
  1898. else
  1899. {
  1900. fprintf(stderr, "EXCEPTION: %s\n", error.str());
  1901. }
  1902. E->Release();
  1903. }
  1904. #ifndef _DEBUG
  1905. catch(...)
  1906. {
  1907. failed = true;
  1908. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1909. logctx.CTXLOG("EXCEPTION: Unknown exception");
  1910. {
  1911. if (isHTTP)
  1912. {
  1913. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception");
  1914. client->checkSendHttpException(httpHelper, E, queryName);
  1915. }
  1916. else
  1917. client->sendException("Roxie", ROXIE_INTERNAL_ERROR, "Unknown exception", (protocolFlags & HPCC_PROTOCOL_BLOCKED), logctx);
  1918. }
  1919. }
  1920. #endif
  1921. if (isHTTP)
  1922. {
  1923. try
  1924. {
  1925. client->flush();
  1926. }
  1927. catch (IException * E)
  1928. {
  1929. StringBuffer error("RoxieSocketWorker failed to write to socket ");
  1930. E->errorMessage(error);
  1931. logctx.CTXLOG("%s", error.str());
  1932. E->Release();
  1933. }
  1934. catch(...)
  1935. {
  1936. logctx.CTXLOG("RoxieSocketWorker failed to write to socket (Unknown exception)");
  1937. }
  1938. }
  1939. unsigned bytesOut = client? client->bytesOut() : 0;
  1940. unsigned elapsed = msTick() - qstart;
  1941. sink->noteQuery(msgctx.get(), peerStr, failed, bytesOut, elapsed, memused, slavesReplyLen, continuationNeeded);
  1942. if (continuationNeeded)
  1943. {
  1944. rawText.clear();
  1945. goto readAnother;
  1946. }
  1947. else
  1948. {
  1949. try
  1950. {
  1951. if (client && !isHTTP && !isStatus)
  1952. {
  1953. if (msgctx->getIntercept())
  1954. {
  1955. FlushingStringBuffer response(client, (protocolFlags & HPCC_PROTOCOL_BLOCKED), mlResponseFmt, (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW), false, logctx);
  1956. response.startDataset("Tracing", NULL, (unsigned) -1);
  1957. msgctx->outputLogXML(response);
  1958. }
  1959. unsigned replyLen = 0;
  1960. client->write(&replyLen, sizeof(replyLen));
  1961. }
  1962. if (--remainingHttpConnectionRequests > 0)
  1963. {
  1964. readWait = global->maxHttpKeepAliveWait;
  1965. goto readAnother;
  1966. }
  1967. client.clear();
  1968. }
  1969. catch (IException * E)
  1970. {
  1971. StringBuffer error("RoxieSocketWorker failed to close socket ");
  1972. E->errorMessage(error);
  1973. logctx.CTXLOG("%s", error.str()); // MORE - audience?
  1974. E->Release();
  1975. }
  1976. catch(...)
  1977. {
  1978. logctx.CTXLOG("RoxieSocketWorker failed to close socket (Unknown exception)"); // MORE - audience?
  1979. }
  1980. }
  1981. }
  1982. };
  1983. IPooledThread *ProtocolSocketListener::createNew()
  1984. {
  1985. return new RoxieSocketWorker(this, ep);
  1986. }
  1987. void ProtocolSocketListener::runOnce(const char *query)
  1988. {
  1989. Owned<RoxieSocketWorker> p = new RoxieSocketWorker(this, ep);
  1990. p->runOnce(query);
  1991. }
  1992. IHpccProtocolListener *createProtocolListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *certFile=nullptr, const char *keyFile=nullptr, const char *passPhrase=nullptr)
  1993. {
  1994. if (traceLevel)
  1995. DBGLOG("Creating Roxie socket listener, protocol %s, pool size %d, listen queue %d%s", protocol, sink->getPoolSize(), listenQueue, sink->getIsSuspended() ? " SUSPENDED":"");
  1996. return new ProtocolSocketListener(sink, port, listenQueue, protocol, certFile, keyFile, passPhrase);
  1997. }
  1998. extern IHpccProtocolPlugin *loadHpccProtocolPlugin(IHpccProtocolPluginContext *ctx, IActiveQueryLimiterFactory *_limiterFactory)
  1999. {
  2000. if (!queryLimiterFactory)
  2001. queryLimiterFactory.set(_limiterFactory);
  2002. if (global)
  2003. return global.getLink();
  2004. if (!ctx)
  2005. return NULL;
  2006. global.setown(new CHpccProtocolPlugin(*ctx));
  2007. return global.getLink();
  2008. }
  2009. extern void unloadHpccProtocolPlugin()
  2010. {
  2011. queryLimiterFactory.clear();
  2012. global.clear();
  2013. }
  2014. //================================================================================================================================