roxiehelper.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef ROXIEHELPER_HPP
  14. #define ROXIEHELPER_HPP
  15. #include "thorxmlwrite.hpp"
  16. #include "roxiehelper.ipp"
  17. #include "roxiemem.hpp"
  18. #include "mpbase.hpp"
  19. #include "workunit.hpp"
  20. #include "thorhelper.hpp"
  21. //=========================================================================================
  22. class THORHELPER_API HttpHelper : public CInterface
  23. {
  24. private:
  25. bool _isHttp;
  26. StringAttr url;
  27. StringAttr authToken;
  28. StringAttr contentType;
  29. StringArray pathNodes;
  30. Owned<IProperties> parameters;
  31. private:
  32. inline void setHttpHeaderValue(StringAttr &s, const char *v, bool ignoreExt)
  33. {
  34. if (!v || !*v)
  35. return;
  36. unsigned len=0;
  37. while (v[len] && v[len]!='\r' && (!ignoreExt || v[len]!=';'))
  38. len++;
  39. if (len)
  40. s.set(v, len);
  41. }
  42. void parseURL();
  43. public:
  44. IMPLEMENT_IINTERFACE;
  45. HttpHelper() { _isHttp = false; parameters.setown(createProperties(true));}
  46. bool isHttp() { return _isHttp; }
  47. bool getTrim() {return parameters->getPropBool(".trim", true); /*http currently defaults to true, maintain compatibility */}
  48. void setIsHttp(bool __isHttp) { _isHttp = __isHttp; }
  49. const char *queryAuthToken() { return authToken.str(); }
  50. const char *queryTarget() { return (pathNodes.length()) ? pathNodes.item(0) : NULL; }
  51. inline void setAuthToken(const char *v)
  52. {
  53. setHttpHeaderValue(authToken, v, false);
  54. };
  55. const char *queryContentType() { return contentType.str(); };
  56. inline void setContentType(const char *v)
  57. {
  58. setHttpHeaderValue(contentType, v, true);
  59. };
  60. inline void parseHTTPRequestLine(const char *v)
  61. {
  62. const char *end = strstr(v, " HTTP");
  63. if (end)
  64. {
  65. url.set(v, end - v);
  66. parseURL();
  67. }
  68. }
  69. TextMarkupFormat queryContentFormat(){return (strieq(queryContentType(), "application/json")) ? MarkupFmt_JSON : MarkupFmt_XML;}
  70. IProperties *queryUrlParameters(){return parameters;}
  71. };
  72. //==============================================================================================================
  73. typedef enum {heapSortAlgorithm, insertionSortAlgorithm, quickSortAlgorithm, stableQuickSortAlgorithm, spillingQuickSortAlgorithm, stableSpillingQuickSortAlgorithm, unknownSortAlgorithm } RoxieSortAlgorithm;
  74. interface ISortAlgorithm : extends IInterface
  75. {
  76. virtual void prepare(IInputBase *input) = 0;
  77. virtual const void *next() = 0;
  78. virtual void reset() = 0;
  79. };
  80. extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);
  81. extern THORHELPER_API ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare);
  82. extern THORHELPER_API ISortAlgorithm *createInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId);
  83. extern THORHELPER_API ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare);
  84. extern THORHELPER_API ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable);
  85. extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
  86. //=========================================================================================
  87. interface IGroupedInput : extends IInterface, extends IInputBase
  88. {
  89. };
  90. extern THORHELPER_API IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare);
  91. extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IInputBase *_input);
  92. extern THORHELPER_API IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter);
  93. extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
  94. //=========================================================================================
  95. interface SafeSocket : extends IInterface
  96. {
  97. virtual ISocket *querySocket() = 0;
  98. virtual size32_t write(const void *buf, size32_t size, bool takeOwnership=false) = 0;
  99. virtual bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER) = 0;
  100. virtual bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize) = 0;
  101. virtual void setHttpMode(const char *queryName, bool arrayMode, TextMarkupFormat txtfmt) = 0;
  102. virtual void setHeartBeat() = 0;
  103. virtual bool sendHeartBeat(const IContextLogger &logctx) = 0;
  104. virtual void flush() = 0;
  105. virtual unsigned bytesOut() const = 0;
  106. virtual bool checkConnection() const = 0;
  107. virtual void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx) = 0;
  108. // TO be removed and replaced with better mechanism when SafeSocket merged with tht new output sequencer...
  109. // until then you may need to lock using this if you are making multiple calls and they need to stay together in the output
  110. virtual CriticalSection &queryCrit() = 0;
  111. };
  112. class THORHELPER_API CSafeSocket : public CInterface, implements SafeSocket
  113. {
  114. protected:
  115. Linked<ISocket> sock;
  116. bool httpMode;
  117. bool heartbeat;
  118. TextMarkupFormat mlFmt;
  119. StringAttr contentHead;
  120. StringAttr contentTail;
  121. PointerArray queued;
  122. UnsignedArray lengths;
  123. unsigned sent;
  124. CriticalSection crit;
  125. public:
  126. IMPLEMENT_IINTERFACE;
  127. CSafeSocket(ISocket *_sock);
  128. ~CSafeSocket();
  129. ISocket *querySocket() { return sock; }
  130. virtual CriticalSection &queryCrit() { return crit; };
  131. size32_t write(const void *buf, size32_t size, bool takeOwnership=false);
  132. bool readBlock(MemoryBuffer &ret, unsigned maxBlockSize, unsigned timeout = (unsigned) WAIT_FOREVER);
  133. bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize);
  134. void setHttpMode(const char *queryName, bool arrayMode, TextMarkupFormat txtfmt);
  135. void setHeartBeat();
  136. bool sendHeartBeat(const IContextLogger &logctx);
  137. void flush();
  138. unsigned bytesOut() const;
  139. bool checkConnection() const;
  140. void sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx);
  141. };
  142. //==============================================================================================================
  143. class THORHELPER_API FlushingStringBuffer : extends CInterface, implements IXmlStreamFlusher, implements IInterface
  144. {
  145. // MORE this code is yukky. Overdue for cleanup!
  146. protected:
  147. SafeSocket *sock;
  148. StringBuffer name;
  149. StringBuffer tail;
  150. unsigned sequenceNumber;
  151. unsigned rowCount;
  152. unsigned emptyLength;
  153. const IContextLogger &logctx;
  154. CriticalSection crit;
  155. PointerArray queued;
  156. UnsignedArray lengths;
  157. bool needsFlush(bool closing);
  158. void startBlock();
  159. public:
  160. TextMarkupFormat mlFmt; // controls whether xml/json elements are output
  161. bool isRaw; // controls whether output as binary or ascii
  162. bool isBlocked;
  163. bool isHttp;
  164. bool isSoap;
  165. bool isEmpty;
  166. bool extend;
  167. bool trim;
  168. bool tagClosed;
  169. StringAttr queryName;
  170. StringBuffer s;
  171. IMPLEMENT_IINTERFACE;
  172. FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHttp, const IContextLogger &_logctx);
  173. ~FlushingStringBuffer();
  174. virtual void append(char data) {append(1, &data);}
  175. virtual void append(const char *data);
  176. virtual void append(unsigned len, const char *data);
  177. virtual void append(double data);
  178. virtual void appendf(const char *format, ...) __attribute__((format(printf, 2, 3)));
  179. virtual void encodeString(const char *x, unsigned len, bool utf8=false);
  180. virtual void encodeData(const void *data, unsigned len);
  181. virtual void flushXML(StringBuffer &current, bool isClosing);
  182. virtual void flush(bool closing) ;
  183. virtual void addPayload(StringBuffer &s, unsigned int reserve=0);
  184. virtual void *getPayload(size32_t &length);
  185. virtual void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL);
  186. virtual void startScalar(const char *resultName, unsigned sequence);
  187. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size);
  188. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size);
  189. virtual void incrementRowCount();
  190. };
  191. class THORHELPER_API FlushingJsonBuffer : public FlushingStringBuffer
  192. {
  193. public:
  194. FlushingJsonBuffer(SafeSocket *_sock, bool _isBlocked, bool _isHttp, const IContextLogger &_logctx) :
  195. FlushingStringBuffer(_sock, _isBlocked, MarkupFmt_JSON, false, _isHttp, _logctx)
  196. {
  197. }
  198. void append(double data);
  199. void encodeString(const char *x, unsigned len, bool utf8=false);
  200. void encodeData(const void *data, unsigned len);
  201. void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL);
  202. void startScalar(const char *resultName, unsigned sequence);
  203. virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size);
  204. virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size);
  205. };
  206. inline const char *getFormatName(TextMarkupFormat fmt)
  207. {
  208. if (fmt==MarkupFmt_XML)
  209. return "xml";
  210. if (fmt==MarkupFmt_JSON)
  211. return "json";
  212. return "raw";
  213. }
  214. //==============================================================================================================
  215. class THORHELPER_API OwnedRowArray
  216. {
  217. public:
  218. OwnedRowArray() {}
  219. ~OwnedRowArray() { clear(); }
  220. void clear();
  221. void clearPart(aindex_t from, aindex_t to);
  222. void replace(const void * row, aindex_t pos);
  223. void append(const void * row) { buff.append(row); }
  224. aindex_t ordinality() const { return buff.ordinality(); }
  225. const void * * getArray() { return buff.getArray(); }
  226. bool isItem(aindex_t pos) const { return buff.isItem(pos); }
  227. const void * item(aindex_t pos) { return buff.item(pos); }
  228. const void * itemClear(aindex_t pos) { const void * ret = buff.item(pos); buff.replace(NULL, pos); return ret; }
  229. protected:
  230. ConstPointerArray buff;
  231. };
  232. //==============================================================================================================
  233. interface IFileDescriptor;
  234. interface IAgentContext;
  235. class THORHELPER_API ClusterWriteHandler : public CInterface
  236. {
  237. public:
  238. ClusterWriteHandler(char const * _logicalName, char const * _activityType);
  239. void addCluster(char const * cluster);
  240. void getLocalPhysicalFilename(StringAttr & out) const;
  241. void splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const;
  242. void copyPhysical(IFile * source, bool noCopy) const;
  243. void setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const;
  244. void finish(IFile * file) const;
  245. void getClusters(StringArray &clusters) const;
  246. private:
  247. virtual void getTempFilename(StringAttr & out) const ;
  248. private:
  249. StringAttr logicalName;
  250. StringBuffer physicalName;
  251. StringBuffer physicalDir;
  252. StringBuffer physicalBase;
  253. StringAttr activityType;
  254. StringAttr localClusterName;
  255. Owned<IGroup> localCluster;
  256. IArrayOf<IGroup> remoteNodes;
  257. StringArray remoteClusters;
  258. };
  259. //==============================================================================================================
  260. THORHELPER_API StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags);
  261. THORHELPER_API StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in);
  262. THORHELPER_API StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally, bool ignoreForeignPrefix);
  263. #endif // ROXIEHELPER_HPP