ccdprotocol.cpp 62 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jthread.hpp"
  16. #include "roxie.hpp"
  17. #include "roxiehelper.hpp"
  18. #include "ccdprotocol.hpp"
  19. //================================================================================================================================
  20. IHpccProtocolListener *createProtocolListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue);
  21. class CHpccProtocolPlugin : public CInterface, implements IHpccProtocolPlugin
  22. {
  23. public:
  24. IMPLEMENT_IINTERFACE;
  25. CHpccProtocolPlugin(IHpccProtocolPluginContext &ctx)
  26. {
  27. targetNames.appendListUniq(ctx.ctxQueryProp("@querySets"), ",");
  28. targetAliases.setown(createProperties());
  29. StringArray tempList;
  30. tempList.appendListUniq(ctx.ctxQueryProp("@targetAliases"), ",");
  31. ForEachItemIn(i, tempList)
  32. {
  33. const char *alias = tempList.item(i);
  34. const char *eq = strchr(alias, '=');
  35. if (eq)
  36. {
  37. StringAttr name(alias, eq-alias);
  38. if (!targetNames.contains(name))
  39. targetAliases->setProp(name.str(), ++eq);
  40. }
  41. }
  42. maxBlockSize = ctx.ctxGetPropInt("@maxBlockSize", 10000000);
  43. defaultXmlReadFlags = ctx.ctxGetPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
  44. trapTooManyActiveQueries = ctx.ctxGetPropBool("@trapTooManyActiveQueries", true);
  45. numRequestArrayThreads = ctx.ctxGetPropInt("@requestArrayThreads", 5);
  46. }
  47. IHpccProtocolListener *createListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *config)
  48. {
  49. return createProtocolListener(protocol, sink, port, listenQueue);
  50. }
  51. public:
  52. StringArray targetNames;
  53. Owned<IProperties> targetAliases;
  54. PTreeReaderOptions defaultXmlReadFlags;
  55. unsigned maxBlockSize;
  56. unsigned numRequestArrayThreads;
  57. bool trapTooManyActiveQueries;
  58. };
  59. Owned<CHpccProtocolPlugin> global;
  60. class ProtocolListener : public Thread, implements IHpccProtocolListener, implements IThreadFactory
  61. {
  62. public:
  63. IMPLEMENT_IINTERFACE;
  64. ProtocolListener(IHpccProtocolMsgSink *_sink) : Thread("RoxieListener")
  65. {
  66. running = false;
  67. sink.set(dynamic_cast<IHpccNativeProtocolMsgSink*>(_sink));
  68. }
  69. virtual IHpccProtocolMsgSink *queryMsgSink()
  70. {
  71. return sink;
  72. }
  73. static void updateAffinity()
  74. {
  75. #ifdef CPU_ZERO
  76. if (sched_getaffinity(0, sizeof(cpu_set_t), &cpuMask))
  77. {
  78. if (traceLevel)
  79. DBGLOG("Unable to get CPU affinity - thread affinity settings will be ignored");
  80. cpuCores = 0;
  81. lastCore = 0;
  82. CPU_ZERO(&cpuMask);
  83. }
  84. else
  85. {
  86. #if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 6)
  87. cpuCores = CPU_COUNT(&cpuMask);
  88. #else
  89. cpuCores = 0;
  90. unsigned setSize = CPU_SETSIZE;
  91. while (setSize--)
  92. {
  93. if (CPU_ISSET(setSize, &cpuMask))
  94. ++cpuCores;
  95. }
  96. #endif /* GLIBC */
  97. if (traceLevel)
  98. traceAffinity(&cpuMask);
  99. }
  100. #endif
  101. }
  102. virtual void start()
  103. {
  104. // Note we allow a few additional threads than requested - these are the threads that return "Too many active queries" responses
  105. pool.setown(createThreadPool("RoxieSocketWorkerPool", this, NULL, sink->getPoolSize()+5, INFINITE));
  106. assertex(!running);
  107. Thread::start();
  108. started.wait();
  109. }
  110. virtual bool stop(unsigned timeout)
  111. {
  112. if (running)
  113. {
  114. running = false;
  115. join();
  116. Release();
  117. }
  118. return pool->joinAll(false, timeout);
  119. }
  120. virtual bool suspend(bool suspendIt)
  121. {
  122. return sink->suspend(suspendIt);
  123. }
  124. void setThreadAffinity(int numCores)
  125. {
  126. #ifdef CPU_ZERO
  127. // Note - strictly speaking not threadsafe but any race conditions are (a) unlikely and (b) harmless
  128. if (cpuCores)
  129. {
  130. if (numCores > 0 && numCores < cpuCores)
  131. {
  132. cpu_set_t threadMask;
  133. CPU_ZERO(&threadMask);
  134. unsigned cores = 0;
  135. unsigned offset = lastCore;
  136. unsigned core;
  137. for (core = 0; core < CPU_SETSIZE; core++)
  138. {
  139. unsigned useCore = (core + offset) % CPU_SETSIZE;
  140. if (CPU_ISSET(useCore, &cpuMask))
  141. {
  142. CPU_SET(useCore, &threadMask);
  143. cores++;
  144. if (cores == numCores)
  145. {
  146. lastCore = useCore+1;
  147. break;
  148. }
  149. }
  150. }
  151. if (traceLevel > 3)
  152. traceAffinity(&threadMask);
  153. pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &threadMask);
  154. }
  155. else
  156. {
  157. if (traceLevel > 3)
  158. traceAffinity(&cpuMask);
  159. pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &cpuMask);
  160. }
  161. }
  162. #endif
  163. }
  164. protected:
  165. bool running;
  166. Semaphore started;
  167. Owned<IThreadPool> pool;
  168. Linked<IHpccNativeProtocolMsgSink> sink;
  169. #ifdef CPU_ZERO
  170. static cpu_set_t cpuMask;
  171. static unsigned cpuCores;
  172. static unsigned lastCore;
  173. private:
  174. static void traceAffinity(cpu_set_t *mask)
  175. {
  176. StringBuffer trace;
  177. for (unsigned core = 0; core < CPU_SETSIZE; core++)
  178. {
  179. if (CPU_ISSET(core, mask))
  180. trace.appendf(",%d", core);
  181. }
  182. if (trace.length())
  183. DBGLOG("Process affinity is set to use core(s) %s", trace.str()+1);
  184. }
  185. #endif
  186. };
  187. #ifdef CPU_ZERO
  188. cpu_set_t ProtocolListener::cpuMask;
  189. unsigned ProtocolListener::cpuCores;
  190. unsigned ProtocolListener::lastCore;
  191. #endif
  192. class ProtocolSocketListener : public ProtocolListener
  193. {
  194. unsigned port;
  195. unsigned listenQueue;
  196. Owned<ISocket> socket;
  197. SocketEndpoint ep;
  198. public:
  199. ProtocolSocketListener(IHpccProtocolMsgSink *_sink, unsigned _port, unsigned _listenQueue)
  200. : ProtocolListener(_sink)
  201. {
  202. port = _port;
  203. listenQueue = _listenQueue;
  204. ep.set(port, queryHostIP());
  205. }
  206. IHpccProtocolMsgSink *queryMsgSink()
  207. {
  208. return sink.get();
  209. }
  210. virtual bool stop(unsigned timeout)
  211. {
  212. if (socket)
  213. socket->cancel_accept();
  214. return ProtocolListener::stop(timeout);
  215. }
  216. virtual void disconnectQueue()
  217. {
  218. // This is for dali queues only
  219. }
  220. virtual void stopListening()
  221. {
  222. // Not threadsafe, but we only call this when generating a core file... what's the worst that can happen?
  223. try
  224. {
  225. DBGLOG("Closing listening socket %d", port);
  226. socket.clear();
  227. DBGLOG("Closed listening socket %d", port);
  228. }
  229. catch(...)
  230. {
  231. }
  232. }
  233. virtual void runOnce(const char *query);
  234. virtual int run()
  235. {
  236. DBGLOG("ProtocolSocketListener (%d threads) listening to socket on port %d", sink->getPoolSize(), port);
  237. socket.setown(ISocket::create(port, listenQueue));
  238. running = true;
  239. started.signal();
  240. while (running)
  241. {
  242. ISocket *client = socket->accept(true);
  243. if (client)
  244. {
  245. client->set_linger(-1);
  246. pool->start(client);
  247. }
  248. }
  249. DBGLOG("ProtocolSocketListener closed query socket");
  250. return 0;
  251. }
  252. virtual IPooledThread *createNew();
  253. virtual const SocketEndpoint &queryEndpoint() const
  254. {
  255. return ep;
  256. }
  257. virtual unsigned queryPort() const
  258. {
  259. return port;
  260. }
  261. };
  262. class ProtocolQueryWorker : public CInterface, implements IPooledThread
  263. {
  264. public:
  265. IMPLEMENT_IINTERFACE;
  266. ProtocolQueryWorker(ProtocolListener *_listener) : listener(_listener)
  267. {
  268. qstart = msTick();
  269. time(&startTime);
  270. }
  271. // interface IPooledThread
  272. virtual void init(void *)
  273. {
  274. qstart = msTick();
  275. time(&startTime);
  276. }
  277. virtual bool canReuse()
  278. {
  279. return true;
  280. }
  281. virtual bool stop()
  282. {
  283. ERRLOG("RoxieQueryWorker stopped with queries active");
  284. return true;
  285. }
  286. protected:
  287. ProtocolListener *listener;
  288. unsigned qstart;
  289. time_t startTime;
  290. };
  291. //================================================================================================================
  292. class CHpccNativeResultsWriter : public CInterface, implements IHpccNativeProtocolResultsWriter
  293. {
  294. protected:
  295. SafeSocket *client;
  296. CriticalSection resultsCrit;
  297. IPointerArrayOf<FlushingStringBuffer> resultMap;
  298. StringAttr queryName;
  299. const IContextLogger &logctx;
  300. Owned<FlushingStringBuffer> probe;
  301. TextMarkupFormat mlFmt;
  302. PTreeReaderOptions xmlReadFlags;
  303. bool isBlocked;
  304. bool isRaw;
  305. bool isHTTP;
  306. bool trim;
  307. bool failed;
  308. public:
  309. IMPLEMENT_IINTERFACE;
  310. CHpccNativeResultsWriter(const char *queryname, SafeSocket *_client, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  311. client(_client), queryName(queryname), logctx(_logctx), mlFmt(_mlFmt), xmlReadFlags(_xmlReadFlags), isBlocked(_isBlocked), isRaw(_isRaw), isHTTP(_isHTTP)
  312. {
  313. }
  314. ~CHpccNativeResultsWriter()
  315. {
  316. }
  317. virtual FlushingStringBuffer *queryResult(unsigned sequence)
  318. {
  319. CriticalBlock procedure(resultsCrit);
  320. while (!resultMap.isItem(sequence))
  321. resultMap.append(NULL);
  322. FlushingStringBuffer *result = resultMap.item(sequence);
  323. if (!result)
  324. {
  325. if (mlFmt==MarkupFmt_JSON)
  326. result = new FlushingJsonBuffer(client, isBlocked, isHTTP, logctx);
  327. else
  328. result = new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHTTP, logctx);
  329. result->isSoap = isHTTP;
  330. result->trim = trim;
  331. result->queryName.set(queryName);
  332. resultMap.replace(result, sequence);
  333. }
  334. return result;
  335. }
  336. virtual FlushingStringBuffer *createFlushingBuffer()
  337. {
  338. return new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHTTP, logctx);
  339. }
  340. virtual IXmlWriter *addDataset(const char *name, unsigned sequence, const char *elementName, bool &appendRawData, unsigned writeFlags, bool _extend, const IProperties *xmlns)
  341. {
  342. FlushingStringBuffer *response = queryResult(sequence);
  343. if (response)
  344. {
  345. appendRawData = response->isRaw;
  346. response->startDataset(elementName, name, sequence, _extend, xmlns);
  347. if (response->mlFmt==MarkupFmt_XML || response->mlFmt==MarkupFmt_JSON)
  348. {
  349. if (response->mlFmt==MarkupFmt_JSON)
  350. writeFlags |= XWFnoindent;
  351. Owned<IXmlWriter> xmlwriter = createIXmlWriterExt(writeFlags, 1, response, (response->mlFmt==MarkupFmt_JSON) ? WTJSON : WTStandard);
  352. xmlwriter->outputBeginArray("Row");
  353. return xmlwriter.getClear();
  354. }
  355. }
  356. return NULL;
  357. }
  358. virtual void finalizeXmlRow(unsigned sequence)
  359. {
  360. if (mlFmt==MarkupFmt_XML || mlFmt==MarkupFmt_JSON)
  361. {
  362. FlushingStringBuffer *r = queryResult(sequence);
  363. if (r)
  364. {
  365. r->incrementRowCount();
  366. r->flush(false);
  367. }
  368. }
  369. }
  370. virtual void appendRaw(unsigned sequence, unsigned len, const char *data)
  371. {
  372. FlushingStringBuffer *r = queryResult(sequence);
  373. if (r)
  374. r->append(len, data);
  375. }
  376. virtual void appendRawRow(unsigned sequence, unsigned len, const char *data)
  377. {
  378. FlushingStringBuffer *r = queryResult(sequence);
  379. if (r)
  380. {
  381. r->append(len, data);
  382. r->incrementRowCount();
  383. r->flush(false);
  384. }
  385. }
  386. virtual void appendSimpleRow(unsigned sequence, const char *str)
  387. {
  388. FlushingStringBuffer *r = queryResult(sequence);
  389. if (r)
  390. r->append(str);
  391. }
  392. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  393. {
  394. FlushingStringBuffer *r = queryResult(sequence);
  395. if (r)
  396. {
  397. r->startScalar(name, sequence);
  398. if (isRaw)
  399. r->append(sizeof(value), (char *)&value);
  400. else
  401. r->append(value ? "true" : "false");
  402. }
  403. }
  404. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  405. {
  406. FlushingStringBuffer *r = queryResult(sequence);
  407. if (r)
  408. {
  409. r->startScalar(name, sequence);
  410. r->encodeData(data, len);
  411. }
  412. }
  413. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  414. {
  415. FlushingStringBuffer *r = queryResult(sequence);
  416. if (r)
  417. {
  418. r->startScalar(name, sequence);
  419. if (isRaw)
  420. r->append(len, (const char *) data);
  421. else
  422. UNIMPLEMENTED;
  423. }
  424. }
  425. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  426. {
  427. FlushingStringBuffer *r = queryResult(sequence);
  428. if (r)
  429. {
  430. r->startScalar(name, sequence);
  431. if (isRaw)
  432. r->append(len, (char *)data);
  433. else if (mlFmt==MarkupFmt_XML)
  434. {
  435. assertex(transformer);
  436. CommonXmlWriter writer(xmlReadFlags|XWFnoindent, 0);
  437. transformer->toXML(isAll, len, (byte *)data, writer);
  438. r->append(writer.str());
  439. }
  440. else if (mlFmt==MarkupFmt_JSON)
  441. {
  442. assertex(transformer);
  443. CommonJsonWriter writer(xmlReadFlags|XWFnoindent, 0);
  444. transformer->toXML(isAll, len, (byte *)data, writer);
  445. r->append(writer.str());
  446. }
  447. else
  448. {
  449. assertex(transformer);
  450. r->append('[');
  451. if (isAll)
  452. r->appendf("*]");
  453. else
  454. {
  455. SimpleOutputWriter x;
  456. transformer->toXML(isAll, len, (const byte *) data, x);
  457. r->appendf("%s]", x.str());
  458. }
  459. }
  460. }
  461. }
  462. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  463. {
  464. FlushingStringBuffer *r = queryResult(sequence);
  465. if (r)
  466. {
  467. r->startScalar(name, sequence);
  468. if (isRaw)
  469. r->append(len, (char *)val);
  470. else
  471. {
  472. StringBuffer s;
  473. if (isSigned)
  474. outputXmlDecimal(val, len, precision, NULL, s);
  475. else
  476. outputXmlUDecimal(val, len, precision, NULL, s);
  477. r->append(s);
  478. }
  479. }
  480. }
  481. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size)
  482. {
  483. FlushingStringBuffer *r = queryResult(sequence);
  484. if (r)
  485. {
  486. if (isRaw)
  487. {
  488. r->startScalar(name, sequence);
  489. r->append(sizeof(value), (char *)&value);
  490. }
  491. else
  492. r->setScalarInt(name, sequence, value, size);
  493. }
  494. }
  495. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size)
  496. {
  497. FlushingStringBuffer *r = queryResult(sequence);
  498. if (r)
  499. {
  500. if (isRaw)
  501. {
  502. r->startScalar(name, sequence);
  503. r->append(sizeof(value), (char *)&value);
  504. }
  505. else
  506. r->setScalarUInt(name, sequence, value, size);
  507. }
  508. }
  509. virtual void setResultReal(const char *name, unsigned sequence, double value)
  510. {
  511. FlushingStringBuffer *r = queryResult(sequence);
  512. if (r)
  513. {
  514. r->startScalar(name, sequence);
  515. r->append(value);
  516. }
  517. }
  518. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  519. {
  520. FlushingStringBuffer *r = queryResult(sequence);
  521. if (r)
  522. {
  523. r->startScalar(name, sequence);
  524. if (r->isRaw)
  525. {
  526. r->append(len, str);
  527. }
  528. else
  529. {
  530. r->encodeString(str, len);
  531. }
  532. }
  533. }
  534. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  535. {
  536. FlushingStringBuffer *r = queryResult(sequence);
  537. if (r)
  538. {
  539. r->startScalar(name, sequence);
  540. if (r->isRaw)
  541. {
  542. r->append(len*2, (const char *) str);
  543. }
  544. else
  545. {
  546. rtlDataAttr buff;
  547. unsigned bufflen = 0;
  548. rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
  549. r->encodeString(buff.getstr(), bufflen, true); // output as UTF-8
  550. }
  551. }
  552. }
  553. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  554. {
  555. setResultString(name, sequence, strlen(value), value);
  556. }
  557. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  558. {
  559. setResultUnicode(name, sequence, rtlUnicodeStrlen(value), value);
  560. }
  561. virtual void flush()
  562. {
  563. ForEachItemIn(seq, resultMap)
  564. {
  565. FlushingStringBuffer *result = resultMap.item(seq);
  566. if (result)
  567. result->flush(true);
  568. }
  569. }
  570. virtual void finalize(unsigned seqNo)
  571. {
  572. ForEachItemIn(seq, resultMap)
  573. {
  574. FlushingStringBuffer *result = resultMap.item(seq);
  575. if (result)
  576. {
  577. result->flush(true);
  578. for(;;)
  579. {
  580. size32_t length;
  581. void *payload = result->getPayload(length);
  582. if (!length)
  583. break;
  584. client->write(payload, length, true);
  585. }
  586. }
  587. }
  588. }
  589. virtual void appendProbeGraph(const char *xml)
  590. {
  591. if (!xml)
  592. {
  593. if (probe)
  594. probe.clear();
  595. return;
  596. }
  597. if (!probe)
  598. {
  599. probe.setown(new FlushingStringBuffer(client, isBlocked, MarkupFmt_XML, false, isHTTP, logctx));
  600. probe->startDataset("_Probe", NULL, (unsigned) -1); // initialize it
  601. }
  602. probe->append("\n");
  603. probe->append(xml);
  604. }
  605. };
  606. class CHpccXmlResultsWriter : public CHpccNativeResultsWriter
  607. {
  608. public:
  609. CHpccXmlResultsWriter(const char *queryname, SafeSocket *_client, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  610. CHpccNativeResultsWriter(queryname, _client, false, MarkupFmt_XML, false, _isHTTP, _logctx, _xmlReadFlags)
  611. {
  612. }
  613. virtual void addContent(TextMarkupFormat fmt, const char *content, const char *name)
  614. {
  615. StringBuffer xml;
  616. if (!content || !*content)
  617. return;
  618. if (fmt==MarkupFmt_JSON)
  619. {
  620. Owned<IPropertyTree> convertPT = createPTreeFromXMLString(content);
  621. if (name && *name)
  622. appendXMLOpenTag(xml, name);
  623. toXML(convertPT, xml, 0, 0);
  624. if (name && *name)
  625. appendXMLCloseTag(xml, name);
  626. }
  627. }
  628. virtual void finalize(unsigned seqNo)
  629. {
  630. if (!isHTTP)
  631. {
  632. flush();
  633. return;
  634. }
  635. CriticalBlock b(resultsCrit);
  636. CriticalBlock b1(client->queryCrit());
  637. StringBuffer responseHead, responseTail;
  638. responseHead.append("<Results><Result>");
  639. unsigned len = responseHead.length();
  640. client->write(responseHead.detach(), len, true);
  641. ForEachItemIn(seq, resultMap)
  642. {
  643. FlushingStringBuffer *result = resultMap.item(seq);
  644. if (result)
  645. {
  646. result->flush(true);
  647. for(;;)
  648. {
  649. size32_t length;
  650. void *payload = result->getPayload(length);
  651. if (!length)
  652. break;
  653. client->write(payload, length, true);
  654. }
  655. }
  656. }
  657. responseTail.append("</Result></Results>");
  658. len = responseTail.length();
  659. client->write(responseTail.detach(), len, true);
  660. }
  661. };
  662. class CHpccJsonResultsWriter : public CHpccNativeResultsWriter
  663. {
  664. public:
  665. CHpccJsonResultsWriter(const char *queryname, SafeSocket *_client, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  666. CHpccNativeResultsWriter(queryname, _client, false, MarkupFmt_JSON, false, true, _logctx, _xmlReadFlags)
  667. {
  668. }
  669. virtual FlushingStringBuffer *createFlushingBuffer()
  670. {
  671. return new FlushingJsonBuffer(client, isBlocked, isHTTP, logctx);
  672. }
  673. virtual void finalize(unsigned seqNo)
  674. {
  675. CriticalBlock b(resultsCrit);
  676. CriticalBlock b1(client->queryCrit());
  677. StringBuffer responseHead, responseTail;
  678. appendJSONName(responseHead, "Results").append(" {");
  679. unsigned len = responseHead.length();
  680. client->write(responseHead.detach(), len, true);
  681. bool needDelimiter = false;
  682. ForEachItemIn(seq, resultMap)
  683. {
  684. FlushingStringBuffer *result = resultMap.item(seq);
  685. if (result)
  686. {
  687. result->flush(true);
  688. for(;;)
  689. {
  690. size32_t length;
  691. void *payload = result->getPayload(length);
  692. if (!length)
  693. break;
  694. if (needDelimiter)
  695. {
  696. StringAttr s(","); //write() will take ownership of buffer
  697. size32_t len = s.length();
  698. client->write((void *)s.detach(), len, true);
  699. needDelimiter=false;
  700. }
  701. client->write(payload, length, true);
  702. }
  703. needDelimiter=true;
  704. }
  705. }
  706. responseTail.append("}");
  707. len = responseTail.length();
  708. client->write(responseTail.detach(), len, true);
  709. }
  710. };
  711. class CHpccNativeProtocolResponse : public CInterface, implements IHpccNativeProtocolResponse
  712. {
  713. protected:
  714. SafeSocket *client;
  715. StringAttr queryName;
  716. const IContextLogger &logctx;
  717. TextMarkupFormat mlFmt;
  718. PTreeReaderOptions xmlReadFlags;
  719. Owned<CHpccNativeResultsWriter> results; //hpcc results section
  720. IPointerArrayOf<FlushingStringBuffer> contentsMap; //other sections
  721. CriticalSection contentsCrit;
  722. unsigned protocolFlags;
  723. bool isHTTP;
  724. bool failed;
  725. public:
  726. IMPLEMENT_IINTERFACE;
  727. CHpccNativeProtocolResponse(const char *queryname, SafeSocket *_client, TextMarkupFormat _mlFmt, unsigned flags, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  728. client(_client), queryName(queryname), logctx(_logctx), mlFmt(_mlFmt), xmlReadFlags(_xmlReadFlags), protocolFlags(flags), isHTTP(_isHTTP)
  729. {
  730. }
  731. ~CHpccNativeProtocolResponse()
  732. {
  733. }
  734. virtual unsigned getFlags()
  735. {
  736. return protocolFlags;
  737. }
  738. inline bool getIsRaw()
  739. {
  740. return (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW);
  741. }
  742. inline bool getIsBlocked()
  743. {
  744. return (protocolFlags & HPCC_PROTOCOL_BLOCKED);
  745. }
  746. inline bool getTrim()
  747. {
  748. return (protocolFlags & HPCC_PROTOCOL_TRIM);
  749. }
  750. virtual FlushingStringBuffer *queryAppendContentBuffer()
  751. {
  752. CriticalBlock procedure(contentsCrit);
  753. FlushingStringBuffer *content;
  754. if (mlFmt==MarkupFmt_JSON)
  755. content = new FlushingJsonBuffer(client, getIsBlocked(), isHTTP, logctx);
  756. else
  757. content = new FlushingStringBuffer(client, getIsBlocked(), mlFmt, getIsRaw(), isHTTP, logctx);
  758. content->isSoap = isHTTP;
  759. content->trim = getTrim();
  760. content->queryName.set(queryName);
  761. if (!isHTTP)
  762. content->startBlock();
  763. contentsMap.append(content);
  764. return content;
  765. }
  766. virtual IHpccProtocolResultsWriter *queryHpccResultsSection()
  767. {
  768. if (!results)
  769. results.setown(new CHpccNativeResultsWriter(queryName, client, getIsBlocked(), mlFmt, getIsRaw(), isHTTP, logctx, xmlReadFlags));
  770. return results;
  771. }
  772. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  773. {
  774. throwUnexpected();
  775. }
  776. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  777. {
  778. throwUnexpected();
  779. }
  780. virtual void finalize(unsigned seqNo)
  781. {
  782. flush();
  783. if (!isHTTP)
  784. {
  785. unsigned replyLen = 0;
  786. client->write(&replyLen, sizeof(replyLen));
  787. }
  788. }
  789. virtual bool checkConnection()
  790. {
  791. return client->checkConnection();
  792. }
  793. virtual void sendHeartBeat()
  794. {
  795. client->sendHeartBeat(logctx);
  796. }
  797. virtual SafeSocket *querySafeSocket()
  798. {
  799. return client;
  800. }
  801. virtual void flush()
  802. {
  803. if (results)
  804. results->flush();
  805. ForEachItemIn(i, contentsMap)
  806. contentsMap.item(i)->flush(true);
  807. }
  808. virtual void appendProbeGraph(const char *xml)
  809. {
  810. if (results)
  811. results->appendProbeGraph(xml);
  812. }
  813. };
  814. class CHpccJsonResponse : public CHpccNativeProtocolResponse
  815. {
  816. public:
  817. CHpccJsonResponse(const char *queryname, SafeSocket *_client, unsigned flags, bool _isHttp, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  818. CHpccNativeProtocolResponse(queryname, _client, MarkupFmt_JSON, flags, _isHttp, _logctx, _xmlReadFlags)
  819. {
  820. }
  821. virtual IHpccProtocolResultsWriter *getHpccResultsSection()
  822. {
  823. if (!results)
  824. results.setown(new CHpccJsonResultsWriter(queryName, client, logctx, xmlReadFlags));
  825. return results;
  826. }
  827. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  828. {
  829. if (mlFmt!=MarkupFmt_XML && mlFmt!=MarkupFmt_JSON)
  830. return;
  831. StringBuffer json;
  832. if (mlFmt==MarkupFmt_XML)
  833. {
  834. Owned<IPropertyTree> convertPT = createPTreeFromXMLString(content);
  835. toJSON(convertPT, json, 0, 0);
  836. content = json.str();
  837. }
  838. FlushingStringBuffer *contentBuffer = queryAppendContentBuffer();
  839. StringBuffer tag;
  840. if (name && *name)
  841. appendJSONName(tag, name);
  842. contentBuffer->append(tag);
  843. contentBuffer->append(content);
  844. }
  845. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  846. {
  847. FlushingStringBuffer *content = queryAppendContentBuffer();
  848. if (name && *name)
  849. {
  850. StringBuffer tag;
  851. appendJSONName(tag, name);
  852. content->append(tag);
  853. }
  854. Owned<IXmlWriter> xmlwriter = createIXmlWriterExt(XWFnoindent, 1, content, WTJSON);
  855. return xmlwriter.getClear();
  856. }
  857. virtual void outputContent()
  858. {
  859. CriticalBlock b1(client->queryCrit());
  860. bool needDelimiter = false;
  861. ForEachItemIn(seq, contentsMap)
  862. {
  863. FlushingStringBuffer *content = contentsMap.item(seq);
  864. if (content)
  865. {
  866. content->flush(true);
  867. for(;;)
  868. {
  869. size32_t length;
  870. void *payload = content->getPayload(length);
  871. if (!length)
  872. break;
  873. if (needDelimiter)
  874. {
  875. StringAttr s(","); //write() will take ownership of buffer
  876. size32_t len = s.length();
  877. client->write((void *)s.detach(), len, true);
  878. needDelimiter=false;
  879. }
  880. client->write(payload, length, true);
  881. }
  882. needDelimiter=true;
  883. }
  884. }
  885. }
  886. virtual void finalize(unsigned seqNo)
  887. {
  888. if (!isHTTP)
  889. {
  890. CHpccNativeProtocolResponse::finalize(seqNo);
  891. return;
  892. }
  893. CriticalBlock b(contentsCrit);
  894. StringBuffer responseHead, responseTail;
  895. StringBuffer name(queryName.get());
  896. if (isHTTP)
  897. name.append("Response");
  898. appendJSONName(responseHead, name.str()).append(" {");
  899. appendJSONValue(responseHead, "sequence", seqNo);
  900. if (contentsMap.length() || results)
  901. delimitJSON(responseHead);
  902. unsigned len = responseHead.length();
  903. client->write(responseHead.detach(), len, true);
  904. outputContent();
  905. if (results)
  906. results->finalize(seqNo);
  907. responseTail.append("}");
  908. len = responseTail.length();
  909. client->write(responseTail.detach(), len, true);
  910. }
  911. };
  912. class CHpccXmlResponse : public CHpccNativeProtocolResponse
  913. {
  914. public:
  915. CHpccXmlResponse(const char *queryname, SafeSocket *_client, unsigned flags, bool _isHTTP, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags) :
  916. CHpccNativeProtocolResponse(queryname, _client, MarkupFmt_XML, flags, _isHTTP, _logctx, _xmlReadFlags)
  917. {
  918. }
  919. virtual IHpccProtocolResultsWriter *getHpccResultsSection()
  920. {
  921. if (!results)
  922. results.setown(new CHpccXmlResultsWriter(queryName, client, isHTTP, logctx, xmlReadFlags));
  923. return results;
  924. }
  925. virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL)
  926. {
  927. if (mlFmt!=MarkupFmt_XML && mlFmt!=MarkupFmt_JSON)
  928. return;
  929. StringBuffer xml;
  930. if (mlFmt==MarkupFmt_JSON)
  931. {
  932. Owned<IPropertyTree> convertPT = createPTreeFromJSONString(content);
  933. toXML(convertPT, xml, 0, 0);
  934. content = xml.str();
  935. }
  936. FlushingStringBuffer *contentBuffer = queryAppendContentBuffer();
  937. if (name && *name)
  938. {
  939. StringBuffer tag;
  940. appendXMLOpenTag(tag, name);
  941. contentBuffer->append(tag.append('\n'));
  942. appendXMLCloseTag(tag.clear(), name);
  943. contentBuffer->setTail(tag.append('\n'));
  944. }
  945. contentBuffer->append(content);
  946. }
  947. virtual IXmlWriter *writeAppendContent(const char *name = NULL)
  948. {
  949. FlushingStringBuffer *content = queryAppendContentBuffer();
  950. StringBuffer tag;
  951. if (name && *name)
  952. {
  953. appendXMLOpenTag(tag, name);
  954. content->append(tag);
  955. appendXMLCloseTag(tag.clear(), name);
  956. content->setTail(tag);
  957. }
  958. Owned<IXmlWriter> xmlwriter = createIXmlWriterExt(0, 1, content, WTStandard);
  959. return xmlwriter.getClear();
  960. }
  961. virtual void outputContent()
  962. {
  963. CriticalBlock b1(client->queryCrit());
  964. bool needDelimiter = false;
  965. ForEachItemIn(seq, contentsMap)
  966. {
  967. FlushingStringBuffer *content = contentsMap.item(seq);
  968. if (content)
  969. {
  970. content->flush(true);
  971. if (!this->isHTTP)
  972. continue;
  973. for(;;)
  974. {
  975. size32_t length;
  976. void *payload = content->getPayload(length);
  977. if (!length)
  978. break;
  979. client->write(payload, length, true);
  980. }
  981. }
  982. }
  983. }
  984. virtual void finalize(unsigned seqNo)
  985. {
  986. if (!isHTTP)
  987. {
  988. CHpccNativeProtocolResponse::finalize(seqNo);
  989. return;
  990. }
  991. CriticalBlock b(contentsCrit);
  992. StringBuffer responseHead, responseTail;
  993. responseHead.append("<").append(queryName);
  994. responseHead.append("Response").append(" xmlns=\"urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.str()).append('\"');
  995. responseHead.append(" sequence=\"").append(seqNo).append("\">");
  996. unsigned len = responseHead.length();
  997. client->write(responseHead.detach(), len, true);
  998. outputContent();
  999. if (results)
  1000. results->finalize(seqNo);
  1001. responseTail.append("</").append(queryName);
  1002. if (isHTTP)
  1003. responseTail.append("Response");
  1004. responseTail.append('>');
  1005. len = responseTail.length();
  1006. client->write(responseTail.detach(), len, true);
  1007. }
  1008. };
  1009. IHpccProtocolResponse *createProtocolResponse(const char *queryname, SafeSocket *client, HttpHelper &httpHelper, const IContextLogger &logctx, unsigned protocolFlags, PTreeReaderOptions xmlReadFlags)
  1010. {
  1011. if (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW || protocolFlags & HPCC_PROTOCOL_NATIVE_ASCII)
  1012. return new CHpccNativeProtocolResponse(queryname, client, MarkupFmt_Unknown, protocolFlags, false, logctx, xmlReadFlags);
  1013. else if (httpHelper.queryContentFormat()==MarkupFmt_JSON)
  1014. return new CHpccJsonResponse(queryname, client, protocolFlags, httpHelper.isHttp(), logctx, xmlReadFlags);
  1015. return new CHpccXmlResponse(queryname, client, protocolFlags, httpHelper.isHttp(), logctx, xmlReadFlags);
  1016. }
  1017. class CHttpRequestAsyncFor : public CInterface, public CAsyncFor
  1018. {
  1019. private:
  1020. const char *queryName, *queryText, *querySetName;
  1021. const IContextLogger &logctx;
  1022. IArrayOf<IPropertyTree> &requestArray;
  1023. Linked<IHpccProtocolMsgSink> sink;
  1024. Linked<IHpccProtocolMsgContext> msgctx;
  1025. SafeSocket &client;
  1026. HttpHelper &httpHelper;
  1027. PTreeReaderOptions xmlReadFlags;
  1028. unsigned &memused;
  1029. unsigned &slaveReplyLen;
  1030. CriticalSection crit;
  1031. unsigned flags;
  1032. public:
  1033. CHttpRequestAsyncFor(const char *_queryName, IHpccProtocolMsgSink *_sink, IHpccProtocolMsgContext *_msgctx, IArrayOf<IPropertyTree> &_requestArray,
  1034. SafeSocket &_client, HttpHelper &_httpHelper, unsigned _flags, unsigned &_memused, unsigned &_slaveReplyLen, const char *_queryText, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
  1035. : sink(_sink), msgctx(_msgctx), requestArray(_requestArray), client(_client), httpHelper(_httpHelper), memused(_memused),
  1036. slaveReplyLen(_slaveReplyLen), logctx(_logctx), xmlReadFlags(_xmlReadFlags), querySetName(_querySetName), flags(_flags)
  1037. {
  1038. queryName = _queryName;
  1039. queryText = _queryText;
  1040. }
  1041. IMPLEMENT_IINTERFACE;
  1042. void onException(IException *E)
  1043. {
  1044. //if (!logctx.isBlind())
  1045. // logctx.CTXLOG("FAILED: %s", queryText);
  1046. StringBuffer error("EXCEPTION: ");
  1047. E->errorMessage(error);
  1048. DBGLOG("%s", error.str());
  1049. client.checkSendHttpException(httpHelper, E, queryName);
  1050. E->Release();
  1051. }
  1052. void Do(unsigned idx)
  1053. {
  1054. try
  1055. {
  1056. IPropertyTree &request = requestArray.item(idx);
  1057. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(request.queryName(), &client, httpHelper, logctx, flags, xmlReadFlags);
  1058. sink->onQueryMsg(msgctx, &request, protocol, flags, xmlReadFlags, querySetName, idx, memused, slaveReplyLen);
  1059. }
  1060. catch (IException * E)
  1061. {
  1062. onException(E);
  1063. }
  1064. catch (...)
  1065. {
  1066. onException(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception"));
  1067. }
  1068. }
  1069. };
  1070. //ADF - Haven't changed it yet, but this should eliminate the need to parse the query twice below
  1071. //I can load the query and lookup the parse flags before doing a full parse
  1072. //if it turns out I need more info I may delete this.
  1073. class QueryNameExtractor : public CInterface, implements IPTreeNotifyEvent
  1074. {
  1075. public:
  1076. TextMarkupFormat mlFmt;
  1077. StringAttr prefix;
  1078. StringAttr name;
  1079. unsigned headerDepth;
  1080. bool isSoap;
  1081. bool isRequestArray;
  1082. bool stripWhitespace;
  1083. bool more;
  1084. public:
  1085. IMPLEMENT_IINTERFACE;
  1086. QueryNameExtractor(TextMarkupFormat _mlFmt, bool _stripWhitespace) : mlFmt(_mlFmt), headerDepth(0), isSoap(false), isRequestArray(false), stripWhitespace(_stripWhitespace), more(true)
  1087. {
  1088. }
  1089. void extractName(const char *msg, const IContextLogger &logctx, const char *peer, unsigned port)
  1090. {
  1091. Owned<IPullPTreeReader> parser;
  1092. if (mlFmt==MarkupFmt_JSON)
  1093. parser.setown(createPullJSONStringReader(msg, *this));
  1094. else if (mlFmt==MarkupFmt_XML)
  1095. parser.setown(createPullXMLStringReader(msg, *this));
  1096. if (!parser)
  1097. return;
  1098. while (more && parser->next());
  1099. if (name.isEmpty())
  1100. {
  1101. const char *fmt = mlFmt==MarkupFmt_XML ? "XML" : "JSON";
  1102. IException *E = MakeStringException(-1, "ERROR: Invalid %s received from %s:%d - %s queryName not found", fmt, peer, port, msg);
  1103. logctx.logOperatorException(E, __FILE__, __LINE__, "Invalid query %s", fmt);
  1104. throw E;
  1105. }
  1106. String nameStr(name.get());
  1107. if (nameStr.endsWith("RequestArray"))
  1108. {
  1109. isRequestArray = true;
  1110. name.set(nameStr.str(), nameStr.length() - strlen("RequestArray"));
  1111. }
  1112. else if (nameStr.endsWith("Request"))
  1113. {
  1114. name.set(nameStr.str(), nameStr.length() - strlen("Request"));
  1115. }
  1116. }
  1117. virtual void beginNode(const char *tag, offset_t startOffset)
  1118. {
  1119. if (streq(tag, "__object__"))
  1120. return;
  1121. const char *local = strchr(tag, ':');
  1122. if (local)
  1123. local++;
  1124. else
  1125. local = tag;
  1126. if (mlFmt==MarkupFmt_XML)
  1127. {
  1128. if (!isSoap && streq(local, "Envelope"))
  1129. {
  1130. isSoap=true;
  1131. return;
  1132. }
  1133. if (isSoap && streq(local, "Header"))
  1134. {
  1135. headerDepth++;
  1136. return;
  1137. }
  1138. if (isSoap && !headerDepth && streq(local, "Body"))
  1139. return;
  1140. }
  1141. if (!headerDepth)
  1142. {
  1143. name.set(local);
  1144. if (tag!=local)
  1145. prefix.set(tag, local-tag-1);
  1146. }
  1147. }
  1148. virtual void newAttribute(const char *attr, const char *value)
  1149. {
  1150. if (!name.isEmpty() && streq(attr, "@_stripWhitespaceFromStoredDataset"))
  1151. {
  1152. stripWhitespace = strToBool(value);
  1153. more = false;
  1154. }
  1155. }
  1156. virtual void beginNodeContent(const char *tag)
  1157. {
  1158. if (!name.isEmpty())
  1159. more = false;
  1160. }
  1161. virtual void endNode(const char *tag, unsigned length, const void *value, bool binary, offset_t endOffset)
  1162. {
  1163. if (!name.isEmpty())
  1164. more = false;
  1165. else if (headerDepth && streq(tag, "Header"))
  1166. headerDepth--;
  1167. }
  1168. };
  1169. static Owned<IActiveQueryLimiterFactory> queryLimiterFactory;
  1170. class RoxieSocketWorker : public ProtocolQueryWorker
  1171. {
  1172. SocketEndpoint ep;
  1173. Owned<SafeSocket> client;
  1174. Owned<IHpccNativeProtocolMsgSink> sink;
  1175. public:
  1176. IMPLEMENT_IINTERFACE;
  1177. RoxieSocketWorker(ProtocolSocketListener *_pool, SocketEndpoint &_ep)
  1178. : ProtocolQueryWorker(_pool), ep(_ep)
  1179. {
  1180. sink.set(dynamic_cast<IHpccNativeProtocolMsgSink*>(_pool->queryMsgSink()));
  1181. }
  1182. // interface IPooledThread
  1183. virtual void init(void *_r)
  1184. {
  1185. client.setown(new CSafeSocket((ISocket *) _r));
  1186. ProtocolQueryWorker::init(_r);
  1187. }
  1188. virtual void main()
  1189. {
  1190. doMain("");
  1191. }
  1192. virtual void runOnce(const char *query)
  1193. {
  1194. doMain(query);
  1195. }
  1196. private:
  1197. static void sendHttpServerTooBusy(SafeSocket &client, const IContextLogger &logctx)
  1198. {
  1199. StringBuffer message;
  1200. message.append("HTTP/1.0 503 Server Too Busy\r\n\r\n");
  1201. message.append("Server too busy, please try again later");
  1202. StringBuffer err("Too many active queries"); // write out Too many active queries - make searching for this error consistent
  1203. if (!global->trapTooManyActiveQueries)
  1204. {
  1205. err.appendf(" %s", message.str());
  1206. logctx.CTXLOG("%s", err.str());
  1207. }
  1208. else
  1209. {
  1210. IException *E = MakeStringException(ROXIE_TOO_MANY_QUERIES, "%s", err.str());
  1211. logctx.logOperatorException(E, __FILE__, __LINE__, "%s", message.str());
  1212. E->Release();
  1213. }
  1214. try
  1215. {
  1216. client.write(message.str(), message.length());
  1217. }
  1218. catch (IException *E)
  1219. {
  1220. logctx.logOperatorException(E, __FILE__, __LINE__, "Exception caught in sendHttpServerTooBusy");
  1221. E->Release();
  1222. }
  1223. catch (...)
  1224. {
  1225. logctx.logOperatorException(NULL, __FILE__, __LINE__, "sendHttpServerTooBusy write failed (Unknown exception)");
  1226. }
  1227. }
  1228. void skipProtocolRoot(Owned<IPropertyTree> &queryPT, HttpHelper &httpHelper, StringAttr &queryName, bool &isRequest, bool &isRequestArray)
  1229. {
  1230. if (queryPT)
  1231. {
  1232. queryName.set(queryPT->queryName());
  1233. isRequest = false;
  1234. isRequestArray = false;
  1235. if (httpHelper.isHttp())
  1236. {
  1237. if (httpHelper.queryContentFormat()==MarkupFmt_JSON)
  1238. {
  1239. if (strieq(queryName, "__object__"))
  1240. {
  1241. queryPT.setown(queryPT->getPropTree("*[1]"));
  1242. queryName.set(queryPT->queryName());
  1243. isRequest = true;
  1244. if (!queryPT)
  1245. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed JSON request (missing Body)");
  1246. }
  1247. else if (strieq(queryName, "__array__"))
  1248. throw MakeStringException(ROXIE_DATA_ERROR, "JSON request array not implemented");
  1249. else
  1250. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed JSON request");
  1251. }
  1252. else
  1253. {
  1254. if (strieq(queryName, "envelope"))
  1255. queryPT.setown(queryPT->getPropTree("Body/*"));
  1256. else if (!strnicmp(httpHelper.queryContentType(), "application/soap", strlen("application/soap")))
  1257. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed SOAP request");
  1258. else
  1259. httpHelper.setUseEnvelope(false);
  1260. if (!queryPT)
  1261. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed SOAP request (missing Body)");
  1262. String reqName(queryPT->queryName());
  1263. queryPT->removeProp("@xmlns:m");
  1264. // following code is moved from main() - should be no performance hit
  1265. String requestString("Request");
  1266. String requestArrayString("RequestArray");
  1267. if (reqName.endsWith(requestArrayString))
  1268. {
  1269. isRequestArray = true;
  1270. queryName.set(reqName.str(), reqName.length() - requestArrayString.length());
  1271. }
  1272. else if (reqName.endsWith(requestString))
  1273. {
  1274. isRequest = true;
  1275. queryName.set(reqName.str(), reqName.length() - requestString.length());
  1276. }
  1277. else
  1278. queryName.set(reqName.str());
  1279. queryPT->renameProp("/", queryName.get()); // reset the name of the tree
  1280. }
  1281. }
  1282. }
  1283. }
  1284. void sanitizeQuery(Owned<IPropertyTree> &queryPT, StringAttr &queryName, StringBuffer &saniText, HttpHelper &httpHelper, const char *&uid, bool &isRequest, bool &isRequestArray, bool &isBlind, bool &isDebug)
  1285. {
  1286. if (queryPT)
  1287. {
  1288. skipProtocolRoot(queryPT, httpHelper, queryName, isRequest, isRequestArray);
  1289. // convert to XML with attribute values in single quotes - makes replaying queries easier
  1290. uid = queryPT->queryProp("@uid");
  1291. if (!uid)
  1292. uid = queryPT->queryProp("_TransactionId");
  1293. isBlind = queryPT->getPropBool("@blind", false) || queryPT->getPropBool("_blind", false);
  1294. isDebug = queryPT->getPropBool("@debug") || queryPT->getPropBool("_Probe", false);
  1295. toXML(queryPT, saniText, 0, isBlind ? (XML_SingleQuoteAttributeValues | XML_Sanitize) : XML_SingleQuoteAttributeValues);
  1296. }
  1297. else
  1298. throw MakeStringException(ROXIE_DATA_ERROR, "Malformed request");
  1299. }
  1300. void parseQueryPTFromString(Owned<IPropertyTree> &queryPT, HttpHelper &httpHelper, const char *text, PTreeReaderOptions options)
  1301. {
  1302. if (strieq(httpHelper.queryContentType(), "application/json"))
  1303. queryPT.setown(createPTreeFromJSONString(text, ipt_caseInsensitive, options));
  1304. else
  1305. queryPT.setown(createPTreeFromXMLString(text, ipt_caseInsensitive, options));
  1306. }
  1307. void doMain(const char *runQuery)
  1308. {
  1309. StringBuffer rawText(runQuery);
  1310. unsigned memused = 0;
  1311. IpAddress peer;
  1312. bool continuationNeeded = false;
  1313. bool isStatus = false;
  1314. Owned<IHpccProtocolMsgContext> msgctx = sink->createMsgContext(startTime);
  1315. IContextLogger &logctx = *msgctx->queryLogContext();
  1316. readAnother:
  1317. unsigned slavesReplyLen = 0;
  1318. StringArray allTargets;
  1319. sink->getTargetNames(allTargets);
  1320. HttpHelper httpHelper(&allTargets);
  1321. try
  1322. {
  1323. if (client)
  1324. {
  1325. client->querySocket()->getPeerAddress(peer);
  1326. if (!client->readBlock(rawText.clear(), WAIT_FOREVER, &httpHelper, continuationNeeded, isStatus, global->maxBlockSize))
  1327. {
  1328. if (traceLevel > 8)
  1329. {
  1330. StringBuffer b;
  1331. DBGLOG("No data reading query from socket");
  1332. }
  1333. client.clear();
  1334. return;
  1335. }
  1336. }
  1337. if (continuationNeeded)
  1338. {
  1339. qstart = msTick();
  1340. time(&startTime);
  1341. }
  1342. }
  1343. catch (IException * E)
  1344. {
  1345. if (traceLevel > 0)
  1346. {
  1347. StringBuffer b;
  1348. DBGLOG("Error reading query from socket: %s", E->errorMessage(b).str());
  1349. }
  1350. E->Release();
  1351. client.clear();
  1352. return;
  1353. }
  1354. bool isHTTP = httpHelper.isHttp();
  1355. TextMarkupFormat mlFmt = isHTTP ? httpHelper.queryContentFormat() : MarkupFmt_XML;
  1356. bool failed = false;
  1357. bool isRequest = false;
  1358. bool isRequestArray = false;
  1359. bool isBlind = false;
  1360. bool isDebug = false;
  1361. unsigned protocolFlags = isHTTP ? 0 : HPCC_PROTOCOL_NATIVE;
  1362. Owned<IPropertyTree> queryPT;
  1363. StringBuffer sanitizedText;
  1364. StringBuffer peerStr;
  1365. peer.getIpText(peerStr);
  1366. const char *uid = "-";
  1367. StringAttr queryName;
  1368. StringAttr queryPrefix;
  1369. bool stripWhitespace = msgctx->getStripWhitespace();
  1370. if (mlFmt==MarkupFmt_XML || mlFmt==MarkupFmt_JSON)
  1371. {
  1372. QueryNameExtractor extractor(mlFmt, stripWhitespace);
  1373. extractor.extractName(rawText.str(), logctx, peerStr, ep.port);
  1374. queryName.set(extractor.name);
  1375. queryPrefix.set(extractor.prefix);
  1376. stripWhitespace = extractor.stripWhitespace;
  1377. }
  1378. try
  1379. {
  1380. if (streq(queryPrefix.str(), "control"))
  1381. {
  1382. if (httpHelper.isHttp())
  1383. client->setHttpMode(queryName, false, httpHelper);
  1384. bool aclupdate = strieq(queryName, "aclupdate"); //ugly
  1385. byte iptFlags = aclupdate ? ipt_caseInsensitive : 0;
  1386. if (mlFmt==MarkupFmt_JSON)
  1387. queryPT.setown(createPTreeFromJSONString(rawText.str(), iptFlags, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_ignoreNameSpaces)));
  1388. else
  1389. queryPT.setown(createPTreeFromXMLString(rawText.str(), iptFlags, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_ignoreNameSpaces)));
  1390. IPropertyTree *root = queryPT;
  1391. skipProtocolRoot(queryPT, httpHelper, queryName, isRequest, isRequestArray);
  1392. if (!strchr(queryName, ':'))
  1393. {
  1394. VStringBuffer fullname("control:%s", queryName.str()); //just easier to keep for debugging and internal checking
  1395. queryPT->renameProp("/", fullname);
  1396. }
  1397. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags, global->defaultXmlReadFlags);
  1398. sink->onControlMsg(msgctx, queryPT, protocol);
  1399. protocol->finalize(0);
  1400. if (streq(queryName, "lock") || streq(queryName, "childlock"))
  1401. goto readAnother;
  1402. }
  1403. else if (isStatus)
  1404. {
  1405. client->write("OK", 2);
  1406. }
  1407. else
  1408. {
  1409. unsigned readFlags = (unsigned) global->defaultXmlReadFlags | ptr_ignoreNameSpaces;
  1410. readFlags &= ~ptr_ignoreWhiteSpace;
  1411. readFlags |= (stripWhitespace ? ptr_ignoreWhiteSpace : ptr_none);
  1412. try
  1413. {
  1414. parseQueryPTFromString(queryPT, httpHelper, rawText.str(), (PTreeReaderOptions)readFlags);
  1415. }
  1416. catch (IException *E)
  1417. {
  1418. logctx.logOperatorException(E, __FILE__, __LINE__, "Invalid XML received from %s:%d - %s", peerStr.str(), listener->queryPort(), rawText.str());
  1419. logctx.CTXLOG("ERROR: Invalid XML received from %s:%d - %s", peerStr.str(), listener->queryPort(), rawText.str());
  1420. throw;
  1421. }
  1422. uid = NULL;
  1423. sanitizeQuery(queryPT, queryName, sanitizedText, httpHelper, uid, isRequest, isRequestArray, isBlind, isDebug);
  1424. if (!uid)
  1425. uid = "-";
  1426. sink->checkAccess(peer, queryName, sanitizedText, isBlind);
  1427. if (isDebug)
  1428. msgctx->verifyAllowDebug();
  1429. isBlind = msgctx->checkSetBlind(isBlind);
  1430. if (msgctx->logFullQueries())
  1431. {
  1432. StringBuffer soapStr;
  1433. (isRequest) ? soapStr.append("SoapRequest") : (isRequestArray) ? soapStr.append("SoapRequest") : soapStr.clear();
  1434. logctx.CTXLOG("%s %s:%d %s %s %s", isBlind ? "BLIND:" : "QUERY:", peerStr.str(), listener->queryPort(), uid, soapStr.str(), sanitizedText.str());
  1435. }
  1436. if (strieq(queryPrefix.str(), "debug"))
  1437. {
  1438. FlushingStringBuffer response(client, false, MarkupFmt_XML, false, isHTTP, logctx);
  1439. response.startDataset("Debug", NULL, (unsigned) -1);
  1440. CommonXmlWriter out(0, 1);
  1441. sink->onDebugMsg(msgctx, uid, queryPT, out);
  1442. response.append(out.str());
  1443. }
  1444. Owned<IActiveQueryLimiter> l;
  1445. if (queryLimiterFactory)
  1446. l.setown(queryLimiterFactory->create(listener));
  1447. if (l && !l->isAccepted())
  1448. {
  1449. if (isHTTP)
  1450. {
  1451. sendHttpServerTooBusy(*client, logctx);
  1452. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1453. logctx.CTXLOG("EXCEPTION: Too many active queries");
  1454. }
  1455. else
  1456. {
  1457. IException *e = MakeStringException(ROXIE_TOO_MANY_QUERIES, "Too many active queries");
  1458. if (msgctx->trapTooManyActiveQueries())
  1459. logctx.logOperatorException(e, __FILE__, __LINE__, NULL);
  1460. throw e;
  1461. }
  1462. }
  1463. else
  1464. {
  1465. StringBuffer querySetName;
  1466. if (isHTTP)
  1467. {
  1468. client->setHttpMode(queryName, isRequestArray, httpHelper);
  1469. querySetName.set(httpHelper.queryTarget());
  1470. if (querySetName.length())
  1471. {
  1472. const char *target = global->targetAliases->queryProp(querySetName.str());
  1473. if (target)
  1474. querySetName.set(target);
  1475. }
  1476. }
  1477. if (msgctx->initQuery(querySetName, queryName))
  1478. {
  1479. int bindCores = queryPT->getPropInt("@bindCores", msgctx->getBindCores());
  1480. if (bindCores > 0)
  1481. listener->setThreadAffinity(bindCores);
  1482. IArrayOf<IPropertyTree> requestArray;
  1483. if (isHTTP)
  1484. {
  1485. mlFmt = httpHelper.queryContentFormat();
  1486. if (isRequestArray)
  1487. {
  1488. StringBuffer reqIterString;
  1489. reqIterString.append(queryName).append("Request");
  1490. Owned<IPropertyTreeIterator> reqIter = queryPT->getElements(reqIterString.str());
  1491. ForEach(*reqIter)
  1492. {
  1493. IPropertyTree *fixedreq = createPTree(queryName, ipt_caseInsensitive);
  1494. Owned<IPropertyTreeIterator> iter = reqIter->query().getElements("*");
  1495. ForEach(*iter)
  1496. {
  1497. fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
  1498. }
  1499. requestArray.append(*fixedreq);
  1500. }
  1501. }
  1502. else
  1503. {
  1504. IPropertyTree *fixedreq = createPTree(queryName, ipt_caseInsensitive);
  1505. Owned<IPropertyTreeIterator> iter = queryPT->getElements("*");
  1506. ForEach(*iter)
  1507. {
  1508. fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
  1509. }
  1510. requestArray.append(*fixedreq);
  1511. }
  1512. if (httpHelper.getTrim())
  1513. protocolFlags |= HPCC_PROTOCOL_TRIM;
  1514. }
  1515. else
  1516. {
  1517. const char *format = queryPT->queryProp("@format");
  1518. if (format)
  1519. {
  1520. if (stricmp(format, "raw") == 0)
  1521. {
  1522. protocolFlags |= HPCC_PROTOCOL_NATIVE_RAW;
  1523. mlFmt = MarkupFmt_Unknown;
  1524. }
  1525. else if (stricmp(format, "bxml") == 0)
  1526. {
  1527. protocolFlags |= HPCC_PROTOCOL_BLOCKED;
  1528. }
  1529. else if (stricmp(format, "ascii") == 0)
  1530. {
  1531. protocolFlags |= HPCC_PROTOCOL_NATIVE_ASCII;
  1532. mlFmt = MarkupFmt_Unknown;
  1533. }
  1534. else if (stricmp(format, "xml") != 0) // xml is the default
  1535. throw MakeStringException(ROXIE_INVALID_INPUT, "Unsupported format specified: %s", format);
  1536. }
  1537. if (queryPT->getPropBool("@trim", false))
  1538. protocolFlags |= HPCC_PROTOCOL_TRIM;
  1539. msgctx->setIntercept(queryPT->getPropBool("@log", false));
  1540. msgctx->setTraceLevel(queryPT->getPropInt("@traceLevel", logctx.queryTraceLevel()));
  1541. }
  1542. msgctx->noteQueryActive();
  1543. if (isHTTP)
  1544. {
  1545. CHttpRequestAsyncFor af(queryName, sink, msgctx, requestArray, *client, httpHelper, protocolFlags, memused, slavesReplyLen, sanitizedText, logctx, (PTreeReaderOptions)readFlags, querySetName);
  1546. af.For(requestArray.length(), global->numRequestArrayThreads);
  1547. }
  1548. else
  1549. {
  1550. Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags, (PTreeReaderOptions)readFlags);
  1551. sink->onQueryMsg(msgctx, queryPT, protocol, protocolFlags, (PTreeReaderOptions)readFlags, querySetName, 0, memused, slavesReplyLen);
  1552. }
  1553. }
  1554. }
  1555. }
  1556. }
  1557. catch (IException * E)
  1558. {
  1559. failed = true;
  1560. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1561. StringBuffer error;
  1562. E->errorMessage(error);
  1563. logctx.CTXLOG("EXCEPTION: %s", error.str());
  1564. unsigned code = E->errorCode();
  1565. if (QUERYINTERFACE(E, ISEH_Exception))
  1566. code = ROXIE_INTERNAL_ERROR;
  1567. else if (QUERYINTERFACE(E, IOutOfMemException))
  1568. code = ROXIE_MEMORY_ERROR;
  1569. if (client)
  1570. {
  1571. if (isHTTP)
  1572. client->checkSendHttpException(httpHelper, E, queryName);
  1573. else
  1574. client->sendException("Roxie", code, error.str(), (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW), logctx);
  1575. }
  1576. else
  1577. {
  1578. fprintf(stderr, "EXCEPTION: %s\n", error.str());
  1579. }
  1580. E->Release();
  1581. }
  1582. #ifndef _DEBUG
  1583. catch(...)
  1584. {
  1585. failed = true;
  1586. logctx.CTXLOG("FAILED: %s", sanitizedText.str());
  1587. logctx.CTXLOG("EXCEPTION: Unknown exception");
  1588. {
  1589. if (isHTTP)
  1590. {
  1591. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception");
  1592. client->checkSendHttpException(httpHelper, E, queryName);
  1593. }
  1594. else
  1595. client->sendException("Roxie", ROXIE_INTERNAL_ERROR, "Unknown exception", (protocolFlags & HPCC_PROTOCOL_BLOCKED), logctx);
  1596. }
  1597. }
  1598. #endif
  1599. if (isHTTP)
  1600. {
  1601. try
  1602. {
  1603. client->flush();
  1604. }
  1605. catch (IException * E)
  1606. {
  1607. StringBuffer error("RoxieSocketWorker failed to write to socket ");
  1608. E->errorMessage(error);
  1609. logctx.CTXLOG("%s", error.str());
  1610. E->Release();
  1611. }
  1612. catch(...)
  1613. {
  1614. logctx.CTXLOG("RoxieSocketWorker failed to write to socket (Unknown exception)");
  1615. }
  1616. }
  1617. unsigned bytesOut = client? client->bytesOut() : 0;
  1618. unsigned elapsed = msTick() - qstart;
  1619. if (continuationNeeded)
  1620. {
  1621. rawText.clear();
  1622. goto readAnother;
  1623. }
  1624. else
  1625. {
  1626. try
  1627. {
  1628. if (client && !isHTTP && !isStatus)
  1629. {
  1630. if (msgctx->getIntercept())
  1631. {
  1632. FlushingStringBuffer response(client, (protocolFlags & HPCC_PROTOCOL_BLOCKED), mlFmt, (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW), false, logctx);
  1633. response.startDataset("Tracing", NULL, (unsigned) -1);
  1634. msgctx->outputLogXML(response);
  1635. }
  1636. unsigned replyLen = 0;
  1637. client->write(&replyLen, sizeof(replyLen));
  1638. }
  1639. client.clear();
  1640. }
  1641. catch (IException * E)
  1642. {
  1643. StringBuffer error("RoxieSocketWorker failed to close socket ");
  1644. E->errorMessage(error);
  1645. logctx.CTXLOG("%s", error.str()); // MORE - audience?
  1646. E->Release();
  1647. }
  1648. catch(...)
  1649. {
  1650. logctx.CTXLOG("RoxieSocketWorker failed to close socket (Unknown exception)"); // MORE - audience?
  1651. }
  1652. }
  1653. }
  1654. };
  1655. IPooledThread *ProtocolSocketListener::createNew()
  1656. {
  1657. return new RoxieSocketWorker(this, ep);
  1658. }
  1659. void ProtocolSocketListener::runOnce(const char *query)
  1660. {
  1661. Owned<RoxieSocketWorker> p = new RoxieSocketWorker(this, ep);
  1662. p->runOnce(query);
  1663. }
  1664. IHpccProtocolListener *createProtocolListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue)
  1665. {
  1666. if (traceLevel)
  1667. DBGLOG("Creating Roxie socket listener, protocol %s, pool size %d, listen queue %d%s", protocol, sink->getPoolSize(), listenQueue, sink->getIsSuspended() ? " SUSPENDED":"");
  1668. return new ProtocolSocketListener(sink, port, listenQueue);
  1669. }
  1670. extern IHpccProtocolPlugin *loadHpccProtocolPlugin(IHpccProtocolPluginContext *ctx, IActiveQueryLimiterFactory *_limiterFactory)
  1671. {
  1672. if (!queryLimiterFactory)
  1673. queryLimiterFactory.set(_limiterFactory);
  1674. if (global)
  1675. return global.getLink();
  1676. if (!ctx)
  1677. return NULL;
  1678. global.setown(new CHpccProtocolPlugin(*ctx));
  1679. return global.getLink();
  1680. }
  1681. //================================================================================================================================