roxiehelper.hpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef ROXIEHELPER_HPP
  14. #define ROXIEHELPER_HPP
  15. #include "rtlformat.hpp"
  16. #include "thorherror.h"
  17. #include "thorxmlwrite.hpp"
  18. #include "roxiehelper.ipp"
  19. #include "roxiemem.hpp"
  20. #include "mpbase.hpp"
  21. #include "workunit.hpp"
  22. #include "thorhelper.hpp"
  23. //=========================================================================================
  24. void parseHttpParameterString(IProperties *p, const char *str);
  25. enum class HttpMethod {NONE, GET, POST};
  26. enum class HttpCompression {NONE, GZIP, DEFLATE, ZLIB_DEFLATE};
  27. class THORHELPER_API HttpHelper : public CInterface
  28. {
  29. private:
  30. HttpMethod method;
  31. bool useEnvelope = false;
  32. StringAttr version;
  33. StringAttr url;
  34. StringAttr authToken;
  35. StringAttr contentType;
  36. StringAttr queryName;
  37. StringArray pathNodes;
  38. StringArray *validTargets;
  39. Owned<IProperties> parameters;
  40. Owned<IProperties> form;
  41. Owned<IProperties> reqHeaders;
  42. private:
  43. inline void setHttpHeaderValue(StringAttr &s, const char *v, bool ignoreExt)
  44. {
  45. if (!v || !*v)
  46. return;
  47. unsigned len=0;
  48. while (v[len] && v[len]!='\r' && (!ignoreExt || v[len]!=';'))
  49. len++;
  50. if (len)
  51. s.set(v, len);
  52. }
  53. void parseURL();
  54. public:
  55. HttpHelper(StringArray *_validTargets) : method(HttpMethod::NONE), validTargets(_validTargets) {parameters.setown(createProperties(true));}
  56. inline bool isHttp() { return method!=HttpMethod::NONE; }
  57. inline bool isHttpGet(){ return method==HttpMethod::GET; }
  58. inline bool allowKeepAlive()
  59. {
  60. const char *connection = queryRequestHeader("Connection");
  61. if (!connection)
  62. return !streq(version, "1.0");
  63. return strieq(connection, "Keep-Alive");
  64. }
  65. inline bool isControlUrl()
  66. {
  67. const char *control = queryTarget();
  68. return (control && strieq(control, "control"));
  69. }
  70. inline HttpCompression getReqCompression()
  71. {
  72. const char *encoding = queryRequestHeader("Content-Encoding");
  73. if (encoding)
  74. {
  75. if (strieq(encoding, "gzip") || strieq(encoding, "x-gzip"))
  76. return HttpCompression::GZIP;
  77. if (strieq(encoding, "deflate") || strieq(encoding, "x-deflate"))
  78. return HttpCompression::DEFLATE;
  79. }
  80. return HttpCompression::NONE;
  81. }
  82. inline HttpCompression getRespCompression()
  83. {
  84. const char *encoding = queryRequestHeader("Accept-Encoding");
  85. if (encoding)
  86. {
  87. StringArray encodingList;
  88. encodingList.appendList(encoding, ",");
  89. if (encodingList.contains("gzip"))
  90. return HttpCompression::GZIP;
  91. if (encodingList.contains("deflate"))
  92. return HttpCompression::DEFLATE;
  93. //The reason gzip is preferred is that deflate can mean either of two formats
  94. //"x-deflate" isn't any clearer, but since either works either way, we can use the alternate name
  95. //to our advantage. Differentiating here just gives us a way of allowing clients to specify
  96. //in case they can't handle one or the other (e.g. SOAPUI can't handle ZLIB_DEFLATE which I think
  97. //is the "most proper" choice)
  98. if (encodingList.contains("x-deflate"))
  99. return HttpCompression::ZLIB_DEFLATE;
  100. }
  101. return HttpCompression::NONE;
  102. }
  103. bool getUseEnvelope(){return useEnvelope;}
  104. void setUseEnvelope(bool _useEnvelope){useEnvelope=_useEnvelope;}
  105. bool getTrim() {return parameters->getPropBool(".trim", true); /*http currently defaults to true, maintain compatibility */}
  106. void setHttpMethod(HttpMethod _method) { method = _method; }
  107. const char *queryAuthToken() { return authToken.str(); }
  108. const char *queryTarget() { return (pathNodes.length()) ? pathNodes.item(0) : NULL; }
  109. const char *queryQueryName()
  110. {
  111. if (!queryName.isEmpty())
  112. return queryName.str();
  113. if (!pathNodes.isItem(1))
  114. return nullptr;
  115. const char *name = pathNodes.item(1);
  116. const char *at = strchr(name, ';');
  117. if (!at)
  118. queryName.set(name);
  119. else
  120. queryName.set(name, at-name-1);
  121. return queryName.str();
  122. }
  123. inline const char *queryRequestHeader(const char *header)
  124. {
  125. if (!reqHeaders)
  126. return nullptr;
  127. return reqHeaders->queryProp(header);
  128. }
  129. inline void setAuthToken(const char *v)
  130. {
  131. setHttpHeaderValue(authToken, v, false);
  132. };
  133. const char *queryContentType() { return contentType.str(); };
  134. inline void setContentType(const char *v)
  135. {
  136. setHttpHeaderValue(contentType, v, true);
  137. };
  138. inline void parseHTTPRequestLine(const char *v)
  139. {
  140. const char *end = strstr(v, " HTTP");
  141. if (end)
  142. {
  143. url.set(v, end - v);
  144. parseURL();
  145. v=end+5;
  146. if (*v=='/')
  147. {
  148. end=strstr(++v, "\r\n");
  149. if (end)
  150. version.set(v, end-v);
  151. }
  152. }
  153. }
  154. void parseRequestHeaders(const char *headers);
  155. inline bool isFormPost()
  156. {
  157. return (strnicmp(queryContentType(), "application/x-www-form-urlencoded", strlen("application/x-www-form-urlencoded"))==0);
  158. }
  159. TextMarkupFormat getUrlResponseFormat()
  160. {
  161. if (pathNodes.length()>2 && strieq(pathNodes.item(2), "json"))
  162. return MarkupFmt_JSON;
  163. return MarkupFmt_XML;
  164. }
  165. TextMarkupFormat getContentTypeMlFormat()
  166. {
  167. if (!contentType.length())
  168. {
  169. TextMarkupFormat fmt = getUrlResponseFormat();
  170. if (fmt == MarkupFmt_JSON)
  171. contentType.set("application/json");
  172. else
  173. contentType.set("text/xml");
  174. return fmt;
  175. }
  176. return (strieq(queryContentType(), "application/json")) ? MarkupFmt_JSON : MarkupFmt_XML;
  177. }
  178. TextMarkupFormat queryResponseMlFormat()
  179. {
  180. if (isFormPost())
  181. return getUrlResponseFormat();
  182. return getContentTypeMlFormat();
  183. }
  184. TextMarkupFormat queryRequestMlFormat()
  185. {
  186. if (isHttpGet() || isFormPost())
  187. return MarkupFmt_URL;
  188. return getContentTypeMlFormat();
  189. }
  190. IProperties *queryUrlParameters(){return parameters;}
  191. bool validateTarget(const char *target)
  192. {
  193. if (!target)
  194. return false;
  195. if (validTargets && validTargets->contains(target))
  196. return true;
  197. if (strieq(target, "control") && (isHttpGet() || isFormPost()))
  198. return true;
  199. return false;
  200. }
  201. inline void checkTarget()
  202. {
  203. const char *target = queryTarget();
  204. if (!target || !*target)
  205. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not specified");
  206. else if (!validateTarget(target))
  207. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not found");
  208. }
  209. inline void checkSetFormPostContent(const char *content)
  210. {
  211. while ((*content==' ') || (*content=='\t'))
  212. content++;
  213. if (*content=='<')
  214. {
  215. contentType.set("text/xml"); //backward compatible. Some clients have a bug where XML is sent as "form-urlenncoded"
  216. return;
  217. }
  218. checkTarget();
  219. if (!form)
  220. form.setown(createProperties(false));
  221. parseHttpParameterString(form, content);
  222. }
  223. IPropertyTree *createPTreeFromParameters(byte flags)
  224. {
  225. if (!pathNodes.isItem(1))
  226. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Query not specified");
  227. StringBuffer query;
  228. appendDecodedURL(query, pathNodes.item(1));
  229. aindex_t count = pathNodes.ordinality();
  230. if (count>2)
  231. for (aindex_t x = 2; x<count; ++x)
  232. appendDecodedURL(query.append('/'), pathNodes.item(x));
  233. return createPTreeFromHttpParameters(query, form ? form : parameters, true, false, (ipt_flags) flags);
  234. }
  235. bool isMappedToInputParameter()
  236. {
  237. if (isHttp())
  238. {
  239. aindex_t count = pathNodes.ordinality();
  240. if (count>2)
  241. for (aindex_t x = 2; x<count; ++x)
  242. if (strncmp(pathNodes.item(x), "input(", 6)==0)
  243. return true;
  244. }
  245. return false;
  246. }
  247. IPropertyTree *checkAddWrapperForAdaptiveInput(IPropertyTree *content, byte flags)
  248. {
  249. if (!isMappedToInputParameter())
  250. return content;
  251. if (!pathNodes.isItem(1))
  252. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Query not specified");
  253. StringBuffer query;
  254. appendDecodedURL(query, pathNodes.item(1));
  255. aindex_t count = pathNodes.ordinality();
  256. if (count>2)
  257. for (aindex_t x = 2; x<count; ++x)
  258. appendDecodedURL(query.append('/'), pathNodes.item(x));
  259. return createPTreeFromHttpPath(query, content, false, (ipt_flags) flags);
  260. }
  261. void getResultFilterAndTag(StringAttr &filter, StringAttr &tag)
  262. {
  263. if (!isHttp())
  264. return;
  265. aindex_t count = pathNodes.ordinality();
  266. if (count<=2)
  267. return;
  268. StringBuffer temp;
  269. for (aindex_t x = 2; x<count; ++x)
  270. {
  271. if (strncmp(pathNodes.item(x), "result(", 6)==0)
  272. checkParseUrlPathNodeValue(pathNodes.item(x), temp, filter);
  273. else if (strncmp(pathNodes.item(x), "tag(", 4)==0)
  274. checkParseUrlPathNodeValue(pathNodes.item(x), temp, tag);
  275. }
  276. }
  277. };
  278. //==============================================================================================================
  279. //MORE: This should just contain the algorithm, and use a separate field for stable|spilling|parallel
  280. //Should be implemented in a subsequent pull request, which also uses ALGORITHM('x') instead of requiring STABLE/UNSTABLE
  281. typedef enum {
  282. heapSortAlgorithm, // heap sort
  283. insertionSortAlgorithm, // insertion sort - purely for comparison (no longer supported)
  284. quickSortAlgorithm, // jlib implementation of quicksort
  285. stableQuickSortAlgorithm, // jlib version of quick sort that uses an extra array indirect to ensure it is stable
  286. spillingQuickSortAlgorithm, // quickSortAlgorithm with the ability to spill
  287. stableSpillingQuickSortAlgorithm, // stableQuickSortAlgorithm with the ability to spill
  288. mergeSortAlgorithm, // stable merge sort
  289. spillingMergeSortAlgorithm, // stable merge sort that can spill to disk
  290. parallelMergeSortAlgorithm, // parallel version of stable merge sort
  291. spillingParallelMergeSortAlgorithm, // parallel version of stable merge sort that can spill to disk
  292. tbbQuickSortAlgorithm, // (parallel) quick sort implemented by the TBB libraries
  293. tbbStableQuickSortAlgorithm, // stable version of tbbQuickSortAlgorithm
  294. parallelQuickSortAlgorithm, // parallel version of the internal quicksort implementation (for comparison)
  295. parallelStableQuickSortAlgorithm, // stable version of parallelQuickSortAlgorithm
  296. unknownSortAlgorithm
  297. } RoxieSortAlgorithm;
  298. interface ISortAlgorithm : extends IInterface
  299. {
  300. virtual void prepare(IEngineRowStream *input) = 0;
  301. virtual const void *next() = 0;
  302. virtual void reset() = 0;
  303. virtual void getSortedGroup(ConstPointerArray & result) = 0;
  304. virtual cycle_t getElapsedCycles(bool reset) = 0;
  305. };
  306. extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);
  307. extern THORHELPER_API ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare);
  308. extern THORHELPER_API ISortAlgorithm *createParallelQuickSortAlgorithm(ICompare *_compare);
  309. extern THORHELPER_API ISortAlgorithm *createParallelStableQuickSortAlgorithm(ICompare *_compare);
  310. extern THORHELPER_API ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare);
  311. extern THORHELPER_API ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable);
  312. extern THORHELPER_API ISortAlgorithm *createMergeSortAlgorithm(ICompare *_compare);
  313. extern THORHELPER_API ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare);
  314. extern THORHELPER_API ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare);
  315. extern THORHELPER_API ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare);
  316. extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
  317. //=========================================================================================
  318. interface IGroupedInput : extends IEngineRowStream // MORE rename to IGroupedRowStream
  319. {
  320. };
  321. extern THORHELPER_API IGroupedInput *createGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare);
  322. extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IEngineRowStream *_input);
  323. extern THORHELPER_API IGroupedInput *createSortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter);
  324. extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
  325. //=========================================================================================
  326. interface SafeSocket : extends IInterface
  327. {
  328. virtual ISocket *querySocket() = 0;
  329. virtual size32_t write(const void *buf, size32_t size, bool takeOwnership=false) = 0;
  330. virtual bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER) = 0;
  331. virtual bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize) = 0;
  332. virtual void checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName) = 0;
  333. virtual void sendSoapException(IException *E, const char *queryName) = 0;
  334. virtual void sendJsonException(IException *E, const char *queryName) = 0;
  335. virtual void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper) = 0;
  336. virtual void setHttpMode(bool mode) = 0;
  337. virtual void setHttpKeepAlive(bool val) = 0;
  338. virtual void setHeartBeat() = 0;
  339. virtual bool sendHeartBeat(const IContextLogger &logctx) = 0;
  340. virtual void flush() = 0;
  341. virtual unsigned bytesOut() const = 0;
  342. virtual bool checkConnection() const = 0;
  343. virtual void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx) = 0;
  344. // To be removed and replaced with better mechanism when SafeSocket merged with the new output sequencer...
  345. // until then you may need to lock using this if you are making multiple calls and they need to stay together in the output
  346. virtual CriticalSection &queryCrit() = 0;
  347. virtual void setAdaptiveRoot(bool adaptive)=0;
  348. virtual bool getAdaptiveRoot()=0;
  349. };
  350. class THORHELPER_API CSafeSocket : implements SafeSocket, public CInterface
  351. {
  352. protected:
  353. Linked<ISocket> sock;
  354. bool httpMode;
  355. bool httpKeepAlive = false;
  356. bool heartbeat;
  357. bool adaptiveRoot = false;
  358. TextMarkupFormat mlResponseFmt = MarkupFmt_Unknown;
  359. HttpCompression respCompression = HttpCompression::NONE;
  360. StringAttr contentHead;
  361. StringAttr contentTail;
  362. PointerArray queued;
  363. UnsignedArray lengths;
  364. unsigned sent;
  365. CriticalSection crit;
  366. public:
  367. IMPLEMENT_IINTERFACE;
  368. CSafeSocket(ISocket *_sock);
  369. ~CSafeSocket();
  370. ISocket *querySocket() { return sock; }
  371. virtual CriticalSection &queryCrit() { return crit; };
  372. size32_t write(const void *buf, size32_t size, bool takeOwnership=false);
  373. bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER);
  374. bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize);
  375. void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper);
  376. void setHttpMode(bool mode) override {httpMode = mode;}
  377. virtual void setHttpKeepAlive(bool val) { httpKeepAlive = val; }
  378. void setAdaptiveRoot(bool adaptive){adaptiveRoot=adaptive;}
  379. bool getAdaptiveRoot(){return adaptiveRoot;}
  380. void checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName);
  381. void sendSoapException(IException *E, const char *queryName);
  382. void sendJsonException(IException *E, const char *queryName);
  383. void setHeartBeat();
  384. bool sendHeartBeat(const IContextLogger &logctx);
  385. void flush();
  386. unsigned bytesOut() const;
  387. bool checkConnection() const;
  388. void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx);
  389. };
  390. //==============================================================================================================
  391. class THORHELPER_API FlushingStringBuffer : extends CInterface, implements IXmlStreamFlusher, implements IInterface
  392. {
  393. // MORE this code is yukky. Overdue for cleanup!
  394. protected:
  395. SafeSocket *sock;
  396. StringBuffer name;
  397. StringBuffer tail;
  398. unsigned sequenceNumber;
  399. unsigned rowCount;
  400. unsigned emptyLength;
  401. const IContextLogger &logctx;
  402. CriticalSection crit;
  403. PointerArray queued;
  404. UnsignedArray lengths;
  405. bool first = true;
  406. bool needsFlush(bool closing);
  407. public:
  408. TextMarkupFormat mlFmt; // controls whether xml/json elements are output
  409. bool isRaw; // controls whether output as binary or ascii
  410. bool isBlocked;
  411. bool isHttp;
  412. bool isSoap;
  413. bool isEmpty;
  414. bool extend;
  415. bool tagClosed;
  416. StringAttr queryName;
  417. StringBuffer s;
  418. IMPLEMENT_IINTERFACE;
  419. FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHttp, const IContextLogger &_logctx);
  420. ~FlushingStringBuffer();
  421. virtual void append(char data) {append(1, &data);}
  422. virtual void append(const char *data);
  423. virtual void append(unsigned len, const char *data);
  424. virtual void append(double data);
  425. virtual void appendf(const char *format, ...) __attribute__((format(printf, 2, 3)));
  426. virtual void encodeString(const char *x, unsigned len, bool utf8=false);
  427. virtual void encodeData(const void *data, unsigned len);
  428. virtual void flushXML(StringBuffer &current, bool isClosing)
  429. {
  430. flushXML(current, isClosing, nullptr);
  431. }
  432. void flushXML(StringBuffer &current, bool isClosing, const char *delim);
  433. virtual void flush(bool closing) ;
  434. virtual void addPayload(StringBuffer &s, unsigned int reserve=0);
  435. virtual void *getPayload(size32_t &length);
  436. virtual void startBlock();
  437. virtual void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL, bool adaptive=false);
  438. virtual void startScalar(const char *resultName, unsigned sequence, bool simpleTag=false, const char *simplename=nullptr);
  439. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size);
  440. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size);
  441. virtual void incrementRowCount();
  442. void setTail(const char *value){tail.set(value);}
  443. const char *queryResultName(){return name;}
  444. };
  445. class THORHELPER_API FlushingJsonBuffer : public FlushingStringBuffer
  446. {
  447. protected:
  448. bool extend;
  449. public:
  450. FlushingJsonBuffer(SafeSocket *_sock, bool _isBlocked, bool _isHttp, const IContextLogger &_logctx, bool _extend = false) :
  451. FlushingStringBuffer(_sock, _isBlocked, MarkupFmt_JSON, false, _isHttp, _logctx), extend(_extend)
  452. {
  453. }
  454. void append(double data);
  455. void encodeString(const char *x, unsigned len, bool utf8=false);
  456. void encodeData(const void *data, unsigned len);
  457. void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL, bool adaptive=false);
  458. void startScalar(const char *resultName, unsigned sequence, bool simpleTag, const char *simplename=nullptr);
  459. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size, bool simpleTag = false, const char *simplename=nullptr);
  460. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size, bool simpleTag = false, const char *simplename=nullptr);
  461. virtual void flushXML(StringBuffer &current, bool isClosing)
  462. {
  463. FlushingStringBuffer::flushXML(current, isClosing, (extend) ? "," : nullptr);
  464. }
  465. };
  466. inline const char *getFormatName(TextMarkupFormat fmt)
  467. {
  468. if (fmt==MarkupFmt_XML)
  469. return "xml";
  470. if (fmt==MarkupFmt_JSON)
  471. return "json";
  472. return "raw";
  473. }
  474. //==============================================================================================================
  475. class THORHELPER_API OwnedRowArray
  476. {
  477. public:
  478. OwnedRowArray() {}
  479. ~OwnedRowArray() { clear(); }
  480. void clear();
  481. void clearPart(aindex_t from, aindex_t to);
  482. void replace(const void * row, aindex_t pos);
  483. void append(const void * row) { buff.append(row); }
  484. aindex_t ordinality() const { return buff.ordinality(); }
  485. const void * * getArray() { return buff.getArray(); }
  486. bool isItem(aindex_t pos) const { return buff.isItem(pos); }
  487. const void * item(aindex_t pos) { return buff.item(pos); }
  488. const void * itemClear(aindex_t pos) { const void * ret = buff.item(pos); buff.replace(NULL, pos); return ret; }
  489. protected:
  490. ConstPointerArray buff;
  491. };
  492. //==============================================================================================================
  493. interface IFileDescriptor;
  494. interface IAgentContext;
  495. class THORHELPER_API ClusterWriteHandler : public CInterface
  496. {
  497. public:
  498. ClusterWriteHandler(char const * _logicalName, char const * _activityType);
  499. void addCluster(char const * cluster);
  500. void getLocalPhysicalFilename(StringAttr & out) const;
  501. void splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const;
  502. void copyPhysical(IFile * source, bool noCopy) const;
  503. void setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const;
  504. void finish(IFile * file) const;
  505. void getClusters(StringArray &clusters) const;
  506. private:
  507. virtual void getTempFilename(StringAttr & out) const ;
  508. private:
  509. StringAttr logicalName;
  510. StringBuffer physicalName;
  511. StringBuffer physicalDir;
  512. StringBuffer physicalBase;
  513. StringAttr activityType;
  514. StringAttr localClusterName;
  515. Owned<IGroup> localCluster;
  516. IArrayOf<IGroup> remoteNodes;
  517. StringArray remoteClusters;
  518. };
  519. //==============================================================================================================
  520. THORHELPER_API StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags);
  521. THORHELPER_API StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in);
  522. THORHELPER_API StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally, bool ignoreForeignPrefix);
  523. THORHELPER_API ISectionTimer * queryNullSectionTimer();
  524. #endif // ROXIEHELPER_HPP