roxiehelper.hpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef ROXIEHELPER_HPP
  14. #define ROXIEHELPER_HPP
  15. #include "thorherror.h"
  16. #include "thorxmlwrite.hpp"
  17. #include "roxiehelper.ipp"
  18. #include "roxiemem.hpp"
  19. #include "mpbase.hpp"
  20. #include "workunit.hpp"
  21. #include "thorhelper.hpp"
  22. //=========================================================================================
  23. void parseHttpParameterString(IProperties *p, const char *str);
  24. enum class HttpMethod {NONE, GET, POST};
  25. class THORHELPER_API HttpHelper : public CInterface
  26. {
  27. private:
  28. HttpMethod method;
  29. bool useEnvelope = false;
  30. StringAttr url;
  31. StringAttr authToken;
  32. StringAttr contentType;
  33. StringAttr queryName;
  34. StringArray pathNodes;
  35. StringArray *validTargets;
  36. Owned<IProperties> parameters;
  37. Owned<IProperties> form;
  38. private:
  39. inline void setHttpHeaderValue(StringAttr &s, const char *v, bool ignoreExt)
  40. {
  41. if (!v || !*v)
  42. return;
  43. unsigned len=0;
  44. while (v[len] && v[len]!='\r' && (!ignoreExt || v[len]!=';'))
  45. len++;
  46. if (len)
  47. s.set(v, len);
  48. }
  49. void parseURL();
  50. public:
  51. IMPLEMENT_IINTERFACE;
  52. HttpHelper(StringArray *_validTargets) : validTargets(_validTargets), method(HttpMethod::NONE) {parameters.setown(createProperties(true));}
  53. inline bool isHttp() { return method!=HttpMethod::NONE; }
  54. inline bool isHttpGet(){ return method==HttpMethod::GET; }
  55. inline bool isControlUrl()
  56. {
  57. const char *control = queryTarget();
  58. return (control && strieq(control, "control"));
  59. }
  60. bool getUseEnvelope(){return useEnvelope;}
  61. void setUseEnvelope(bool _useEnvelope){useEnvelope=_useEnvelope;}
  62. bool getTrim() {return parameters->getPropBool(".trim", true); /*http currently defaults to true, maintain compatibility */}
  63. void setHttpMethod(HttpMethod _method) { method = _method; }
  64. const char *queryAuthToken() { return authToken.str(); }
  65. const char *queryTarget() { return (pathNodes.length()) ? pathNodes.item(0) : NULL; }
  66. const char *queryQueryName()
  67. {
  68. if (!queryName.isEmpty())
  69. return queryName.str();
  70. if (!pathNodes.isItem(1))
  71. return nullptr;
  72. const char *name = pathNodes.item(1);
  73. const char *at = strchr(name, ';');
  74. if (!at)
  75. queryName.set(name);
  76. else
  77. queryName.set(name, at-name-1);
  78. return queryName.str();
  79. }
  80. inline void setAuthToken(const char *v)
  81. {
  82. setHttpHeaderValue(authToken, v, false);
  83. };
  84. const char *queryContentType() { return contentType.str(); };
  85. inline void setContentType(const char *v)
  86. {
  87. setHttpHeaderValue(contentType, v, true);
  88. };
  89. inline void parseHTTPRequestLine(const char *v)
  90. {
  91. const char *end = strstr(v, " HTTP");
  92. if (end)
  93. {
  94. url.set(v, end - v);
  95. parseURL();
  96. }
  97. }
  98. inline bool isFormPost()
  99. {
  100. return (strnicmp(queryContentType(), "application/x-www-form-urlencoded", strlen("application/x-www-form-urlencoded"))==0);
  101. }
  102. TextMarkupFormat getUrlResponseFormat()
  103. {
  104. if (pathNodes.length()>2 && strieq(pathNodes.item(2), "json"))
  105. return MarkupFmt_JSON;
  106. return MarkupFmt_XML;
  107. }
  108. TextMarkupFormat getContentTypeMlFormat()
  109. {
  110. if (!contentType.length())
  111. {
  112. TextMarkupFormat fmt = getUrlResponseFormat();
  113. if (fmt == MarkupFmt_JSON)
  114. contentType.set("application/json");
  115. else
  116. contentType.set("text/xml");
  117. return fmt;
  118. }
  119. return (strieq(queryContentType(), "application/json")) ? MarkupFmt_JSON : MarkupFmt_XML;
  120. }
  121. TextMarkupFormat queryResponseMlFormat()
  122. {
  123. if (isFormPost())
  124. return getUrlResponseFormat();
  125. return getContentTypeMlFormat();
  126. }
  127. TextMarkupFormat queryRequestMlFormat()
  128. {
  129. if (isHttpGet() || isFormPost())
  130. return MarkupFmt_URL;
  131. return getContentTypeMlFormat();
  132. }
  133. IProperties *queryUrlParameters(){return parameters;}
  134. bool validateTarget(const char *target)
  135. {
  136. if (!target)
  137. return false;
  138. if (validTargets && validTargets->contains(target))
  139. return true;
  140. if (strieq(target, "control") && (isHttpGet() || isFormPost()))
  141. return true;
  142. return false;
  143. }
  144. inline void checkTarget()
  145. {
  146. const char *target = queryTarget();
  147. if (!target || !*target)
  148. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not specified");
  149. else if (!validateTarget(target))
  150. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not found");
  151. }
  152. inline void setFormContent(const char *content)
  153. {
  154. if (!form)
  155. form.setown(createProperties(false));
  156. parseHttpParameterString(form, content);
  157. }
  158. IPropertyTree *createPTreeFromParameters(byte flags)
  159. {
  160. if (!pathNodes.isItem(1))
  161. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Query not specified");
  162. StringBuffer query;
  163. appendDecodedURL(query, pathNodes.item(1));
  164. aindex_t count = pathNodes.ordinality();
  165. if (count>2)
  166. for (aindex_t x = 2; x<count; ++x)
  167. appendDecodedURL(query.append('/'), pathNodes.item(x));
  168. return createPTreeFromHttpParameters(query, form ? form : parameters, true, false, (ipt_flags) flags);
  169. }
  170. bool isMappedToInputParameter()
  171. {
  172. if (isHttp())
  173. {
  174. aindex_t count = pathNodes.ordinality();
  175. if (count>2)
  176. for (aindex_t x = 2; x<count; ++x)
  177. if (strncmp(pathNodes.item(x), "input(", 6)==0)
  178. return true;
  179. }
  180. return false;
  181. }
  182. IPropertyTree *checkAddWrapperForAdaptiveInput(IPropertyTree *content, byte flags)
  183. {
  184. if (!isMappedToInputParameter())
  185. return content;
  186. if (!pathNodes.isItem(1))
  187. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Query not specified");
  188. StringBuffer query;
  189. appendDecodedURL(query, pathNodes.item(1));
  190. aindex_t count = pathNodes.ordinality();
  191. if (count>2)
  192. for (aindex_t x = 2; x<count; ++x)
  193. appendDecodedURL(query.append('/'), pathNodes.item(x));
  194. return createPTreeFromHttpPath(query, content, false, (ipt_flags) flags);
  195. }
  196. void getResultFilterAndTag(StringAttr &filter, StringAttr &tag)
  197. {
  198. if (!isHttp())
  199. return;
  200. aindex_t count = pathNodes.ordinality();
  201. if (count<=2)
  202. return;
  203. StringBuffer temp;
  204. for (aindex_t x = 2; x<count; ++x)
  205. {
  206. if (strncmp(pathNodes.item(x), "result(", 6)==0)
  207. checkParseUrlPathNodeValue(pathNodes.item(x), temp, filter);
  208. else if (strncmp(pathNodes.item(x), "tag(", 4)==0)
  209. checkParseUrlPathNodeValue(pathNodes.item(x), temp, tag);
  210. }
  211. }
  212. };
  213. //==============================================================================================================
  214. //MORE: This should just contain the algorithm, and use a separate field for stable|spilling|parallel
  215. //Should be implemented in a subsequent pull request, which also uses ALGORITHM('x') instead of requiring STABLE/UNSTABLE
  216. typedef enum {
  217. heapSortAlgorithm, // heap sort
  218. insertionSortAlgorithm, // insertion sort - purely for comparison (no longer supported)
  219. quickSortAlgorithm, // jlib implementation of quicksort
  220. stableQuickSortAlgorithm, // jlib version of quick sort that uses an extra array indirect to ensure it is stable
  221. spillingQuickSortAlgorithm, // quickSortAlgorithm with the ability to spill
  222. stableSpillingQuickSortAlgorithm, // stableQuickSortAlgorithm with the ability to spill
  223. mergeSortAlgorithm, // stable merge sort
  224. spillingMergeSortAlgorithm, // stable merge sort that can spill to disk
  225. parallelMergeSortAlgorithm, // parallel version of stable merge sort
  226. spillingParallelMergeSortAlgorithm, // parallel version of stable merge sort that can spill to disk
  227. tbbQuickSortAlgorithm, // (parallel) quick sort implemented by the TBB libraries
  228. tbbStableQuickSortAlgorithm, // stable version of tbbQuickSortAlgorithm
  229. parallelQuickSortAlgorithm, // parallel version of the internal quicksort implementation (for comparison)
  230. parallelStableQuickSortAlgorithm, // stable version of parallelQuickSortAlgorithm
  231. unknownSortAlgorithm
  232. } RoxieSortAlgorithm;
  233. interface ISortAlgorithm : extends IInterface
  234. {
  235. virtual void prepare(IEngineRowStream *input) = 0;
  236. virtual const void *next() = 0;
  237. virtual void reset() = 0;
  238. virtual void getSortedGroup(ConstPointerArray & result) = 0;
  239. virtual cycle_t getElapsedCycles(bool reset) = 0;
  240. };
  241. extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);
  242. extern THORHELPER_API ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare);
  243. extern THORHELPER_API ISortAlgorithm *createParallelQuickSortAlgorithm(ICompare *_compare);
  244. extern THORHELPER_API ISortAlgorithm *createParallelStableQuickSortAlgorithm(ICompare *_compare);
  245. extern THORHELPER_API ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare);
  246. extern THORHELPER_API ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable);
  247. extern THORHELPER_API ISortAlgorithm *createMergeSortAlgorithm(ICompare *_compare);
  248. extern THORHELPER_API ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare);
  249. extern THORHELPER_API ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare);
  250. extern THORHELPER_API ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare);
  251. extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
  252. //=========================================================================================
  253. interface IGroupedInput : extends IEngineRowStream // MORE rename to IGroupedRowStream
  254. {
  255. };
  256. extern THORHELPER_API IGroupedInput *createGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare);
  257. extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IEngineRowStream *_input);
  258. extern THORHELPER_API IGroupedInput *createSortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter);
  259. extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
  260. //=========================================================================================
  261. interface SafeSocket : extends IInterface
  262. {
  263. virtual ISocket *querySocket() = 0;
  264. virtual size32_t write(const void *buf, size32_t size, bool takeOwnership=false) = 0;
  265. virtual bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER) = 0;
  266. virtual bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize) = 0;
  267. virtual void checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName) = 0;
  268. virtual void sendSoapException(IException *E, const char *queryName) = 0;
  269. virtual void sendJsonException(IException *E, const char *queryName) = 0;
  270. virtual void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper) = 0;
  271. virtual void setHeartBeat() = 0;
  272. virtual bool sendHeartBeat(const IContextLogger &logctx) = 0;
  273. virtual void flush() = 0;
  274. virtual unsigned bytesOut() const = 0;
  275. virtual bool checkConnection() const = 0;
  276. virtual void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx) = 0;
  277. // TO be removed and replaced with better mechanism when SafeSocket merged with tht new output sequencer...
  278. // until then you may need to lock using this if you are making multiple calls and they need to stay together in the output
  279. virtual CriticalSection &queryCrit() = 0;
  280. virtual void setAdaptiveRoot(bool adaptive)=0;
  281. virtual bool getAdaptiveRoot()=0;
  282. };
  283. class THORHELPER_API CSafeSocket : public CInterface, implements SafeSocket
  284. {
  285. protected:
  286. Linked<ISocket> sock;
  287. bool httpMode;
  288. bool heartbeat;
  289. bool adaptiveRoot = false;
  290. TextMarkupFormat mlResponseFmt = MarkupFmt_Unknown;
  291. StringAttr contentHead;
  292. StringAttr contentTail;
  293. PointerArray queued;
  294. UnsignedArray lengths;
  295. unsigned sent;
  296. CriticalSection crit;
  297. public:
  298. IMPLEMENT_IINTERFACE;
  299. CSafeSocket(ISocket *_sock);
  300. ~CSafeSocket();
  301. ISocket *querySocket() { return sock; }
  302. virtual CriticalSection &queryCrit() { return crit; };
  303. size32_t write(const void *buf, size32_t size, bool takeOwnership=false);
  304. bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER);
  305. bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize);
  306. void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper);
  307. void setAdaptiveRoot(bool adaptive){adaptiveRoot=adaptive;}
  308. bool getAdaptiveRoot(){return adaptiveRoot;}
  309. void checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName);
  310. void sendSoapException(IException *E, const char *queryName);
  311. void sendJsonException(IException *E, const char *queryName);
  312. void setHeartBeat();
  313. bool sendHeartBeat(const IContextLogger &logctx);
  314. void flush();
  315. unsigned bytesOut() const;
  316. bool checkConnection() const;
  317. void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx);
  318. };
  319. //==============================================================================================================
  320. class THORHELPER_API FlushingStringBuffer : extends CInterface, implements IXmlStreamFlusher, implements IInterface
  321. {
  322. // MORE this code is yukky. Overdue for cleanup!
  323. protected:
  324. SafeSocket *sock;
  325. StringBuffer name;
  326. StringBuffer tail;
  327. unsigned sequenceNumber;
  328. unsigned rowCount;
  329. unsigned emptyLength;
  330. const IContextLogger &logctx;
  331. CriticalSection crit;
  332. PointerArray queued;
  333. UnsignedArray lengths;
  334. bool needsFlush(bool closing);
  335. public:
  336. TextMarkupFormat mlFmt; // controls whether xml/json elements are output
  337. bool isRaw; // controls whether output as binary or ascii
  338. bool isBlocked;
  339. bool isHttp;
  340. bool isSoap;
  341. bool isEmpty;
  342. bool extend;
  343. bool trim;
  344. bool tagClosed;
  345. StringAttr queryName;
  346. StringBuffer s;
  347. IMPLEMENT_IINTERFACE;
  348. FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHttp, const IContextLogger &_logctx);
  349. ~FlushingStringBuffer();
  350. virtual void append(char data) {append(1, &data);}
  351. virtual void append(const char *data);
  352. virtual void append(unsigned len, const char *data);
  353. virtual void append(double data);
  354. virtual void appendf(const char *format, ...) __attribute__((format(printf, 2, 3)));
  355. virtual void encodeString(const char *x, unsigned len, bool utf8=false);
  356. virtual void encodeData(const void *data, unsigned len);
  357. virtual void flushXML(StringBuffer &current, bool isClosing);
  358. virtual void flush(bool closing) ;
  359. virtual void addPayload(StringBuffer &s, unsigned int reserve=0);
  360. virtual void *getPayload(size32_t &length);
  361. virtual void startBlock();
  362. virtual void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL, bool adaptive=false);
  363. virtual void startScalar(const char *resultName, unsigned sequence, bool simpleTag=false, const char *simplename=nullptr);
  364. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size);
  365. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size);
  366. virtual void incrementRowCount();
  367. void setTail(const char *value){tail.set(value);}
  368. const char *queryResultName(){return name;}
  369. };
  370. class THORHELPER_API FlushingJsonBuffer : public FlushingStringBuffer
  371. {
  372. public:
  373. FlushingJsonBuffer(SafeSocket *_sock, bool _isBlocked, bool _isHttp, const IContextLogger &_logctx) :
  374. FlushingStringBuffer(_sock, _isBlocked, MarkupFmt_JSON, false, _isHttp, _logctx)
  375. {
  376. }
  377. void append(double data);
  378. void encodeString(const char *x, unsigned len, bool utf8=false);
  379. void encodeData(const void *data, unsigned len);
  380. void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL, bool adaptive=false);
  381. void startScalar(const char *resultName, unsigned sequence, bool simpleTag, const char *simplename=nullptr);
  382. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size, bool simpleTag = false, const char *simplename=nullptr);
  383. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size, bool simpleTag = false, const char *simplename=nullptr);
  384. };
  385. inline const char *getFormatName(TextMarkupFormat fmt)
  386. {
  387. if (fmt==MarkupFmt_XML)
  388. return "xml";
  389. if (fmt==MarkupFmt_JSON)
  390. return "json";
  391. return "raw";
  392. }
  393. //==============================================================================================================
  394. class THORHELPER_API OwnedRowArray
  395. {
  396. public:
  397. OwnedRowArray() {}
  398. ~OwnedRowArray() { clear(); }
  399. void clear();
  400. void clearPart(aindex_t from, aindex_t to);
  401. void replace(const void * row, aindex_t pos);
  402. void append(const void * row) { buff.append(row); }
  403. aindex_t ordinality() const { return buff.ordinality(); }
  404. const void * * getArray() { return buff.getArray(); }
  405. bool isItem(aindex_t pos) const { return buff.isItem(pos); }
  406. const void * item(aindex_t pos) { return buff.item(pos); }
  407. const void * itemClear(aindex_t pos) { const void * ret = buff.item(pos); buff.replace(NULL, pos); return ret; }
  408. protected:
  409. ConstPointerArray buff;
  410. };
  411. //==============================================================================================================
  412. interface IFileDescriptor;
  413. interface IAgentContext;
  414. class THORHELPER_API ClusterWriteHandler : public CInterface
  415. {
  416. public:
  417. ClusterWriteHandler(char const * _logicalName, char const * _activityType);
  418. void addCluster(char const * cluster);
  419. void getLocalPhysicalFilename(StringAttr & out) const;
  420. void splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const;
  421. void copyPhysical(IFile * source, bool noCopy) const;
  422. void setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const;
  423. void finish(IFile * file) const;
  424. void getClusters(StringArray &clusters) const;
  425. private:
  426. virtual void getTempFilename(StringAttr & out) const ;
  427. private:
  428. StringAttr logicalName;
  429. StringBuffer physicalName;
  430. StringBuffer physicalDir;
  431. StringBuffer physicalBase;
  432. StringAttr activityType;
  433. StringAttr localClusterName;
  434. Owned<IGroup> localCluster;
  435. IArrayOf<IGroup> remoteNodes;
  436. StringArray remoteClusters;
  437. };
  438. //==============================================================================================================
  439. THORHELPER_API StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags);
  440. THORHELPER_API StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in);
  441. THORHELPER_API StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally, bool ignoreForeignPrefix);
  442. #endif // ROXIEHELPER_HPP