thorpipe.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "thorpipe.hpp"
  15. #include "thorxmlread.hpp"
  16. #include "thorxmlwrite.hpp"
  17. #include "thorcommon.ipp"
  18. #include "csvsplitter.hpp"
  19. #include "rtlread_imp.hpp"
  20. #include "rtlds_imp.hpp"
  21. #include "rtlformat.hpp"
  22. #include "roxiemem.hpp"
  23. using roxiemem::OwnedRoxieString;
  24. //=====================================================================================================
  25. class CPipeErrorHelper : public Thread, implements IPipeErrorHelper
  26. {
  27. private:
  28. StringBuffer errorOutput;
  29. Linked<IPipeProcess> pipe;
  30. public:
  31. IMPLEMENT_IINTERFACE;
  32. int run()
  33. {
  34. char buffer[10001];
  35. int numErrors = 0;
  36. size32_t read;
  37. char *errorLine;
  38. while (true)
  39. {
  40. read = pipe->readError(10000,buffer);
  41. if ((read == 0) || (read == (size32_t)-1))
  42. break;
  43. if (numErrors < 100)
  44. {
  45. buffer[read] = '\0';
  46. char *saveptr;
  47. errorLine = strtok_r(buffer, "\n", &saveptr);
  48. errorOutput.append(errorLine).newline();
  49. numErrors++;
  50. while ((numErrors < 100) && (errorLine = strtok_r(NULL, "\n", &saveptr)))
  51. {
  52. errorOutput.append(errorLine).newline();
  53. numErrors++;
  54. }
  55. }
  56. }
  57. return 0;
  58. }
  59. void run(IPipeProcess *_pipe)
  60. {
  61. pipe.set(_pipe);
  62. this->start();
  63. }
  64. void wait()
  65. {
  66. this->join();
  67. }
  68. const char *queryErrorOutput()
  69. {
  70. return errorOutput.str();
  71. }
  72. };
  73. //=====================================================================================================
  74. IPipeErrorHelper * createPipeErrorHelper()
  75. {
  76. return new CPipeErrorHelper();
  77. }
  78. //=====================================================================================================
  79. #define PIPE_BUFSIZE 0x8000
  80. class CBufferedReadRowStream : implements IReadRowStream, public CInterface
  81. {
  82. public:
  83. IMPLEMENT_IINTERFACE;
  84. CBufferedReadRowStream(IEngineRowAllocator * _rowAllocator) : rowAllocator(_rowAllocator)
  85. {
  86. }
  87. virtual bool eos()
  88. {
  89. return pipeStream->eos();
  90. }
  91. virtual void setStream(ISimpleReadStream * in)
  92. {
  93. if (in)
  94. {
  95. pipeStream.setown(createSimpleSerialStream(in, PIPE_BUFSIZE));
  96. rowSource.setStream(pipeStream);
  97. }
  98. else
  99. {
  100. rowSource.setStream(NULL);
  101. pipeStream.clear();
  102. }
  103. }
  104. protected:
  105. Owned<ISerialStream> pipeStream;
  106. CThorStreamDeserializerSource rowSource;
  107. IEngineRowAllocator * rowAllocator;
  108. };
  109. class CReadRowBinaryStream : public CBufferedReadRowStream
  110. {
  111. public:
  112. CReadRowBinaryStream(IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * _rowDeserializer)
  113. : CBufferedReadRowStream(_rowAllocator), rowDeserializer(_rowDeserializer)
  114. {
  115. }
  116. virtual const void * next()
  117. {
  118. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  119. size32_t size = rowDeserializer->deserialize(rowBuilder, rowSource);
  120. return rowBuilder.finalizeRowClear(size);
  121. }
  122. private:
  123. IOutputRowDeserializer * rowDeserializer;
  124. };
  125. class CReadRowCSVStream : extends CBufferedReadRowStream
  126. {
  127. public:
  128. CReadRowCSVStream(IEngineRowAllocator * _rowAllocator, ICsvToRowTransformer * _csvTransformer)
  129. : CBufferedReadRowStream(_rowAllocator), csvTransformer(_csvTransformer)
  130. {
  131. ICsvParameters * csvInfo = csvTransformer->queryCsvParameters();
  132. //MORE: This value is never used. Should it be asserting(headerLines == 0)
  133. unsigned int headerLines = csvInfo->queryHeaderLen();
  134. size32_t max = csvInfo->queryMaxSize();
  135. const char * quotes = NULL;
  136. const char * separators = NULL;
  137. const char * terminators = NULL;
  138. const char * escapes = NULL;
  139. csvSplitter.init(csvTransformer->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  140. }
  141. virtual const void * next()
  142. {
  143. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  144. unsigned thisLineLength = csvSplitter.splitLine(pipeStream, maxRowSize);
  145. if (thisLineLength)
  146. {
  147. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  148. unsigned thisSize;
  149. unsigned __int64 fpos=0;
  150. thisSize = csvTransformer->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), fpos);
  151. pipeStream->skip(thisLineLength);
  152. if (thisSize)
  153. return rowBuilder.finalizeRowClear(thisSize);
  154. }
  155. return nullptr;
  156. }
  157. private:
  158. ICsvToRowTransformer * csvTransformer;
  159. CSVSplitter csvSplitter;
  160. };
  161. class CReadRowXMLStream : implements IReadRowStream, implements IXMLSelect, implements IThorDiskCallback, public CInterface
  162. {
  163. public:
  164. IMPLEMENT_IINTERFACE;
  165. CReadRowXMLStream(IEngineRowAllocator * _rowAllocator, IXmlToRowTransformer * _xmlTransformer, const char * _iteratorPath, unsigned _pipeFlags)
  166. : rowAllocator(_rowAllocator), xmlTransformer(_xmlTransformer), iteratorPath(_iteratorPath), pipeFlags(_pipeFlags)
  167. {
  168. }
  169. virtual void setStream(ISimpleReadStream * _in)
  170. {
  171. in.set(_in);
  172. bool noRoot = (pipeFlags & TPFreadnoroot) != 0;
  173. bool useContents = (pipeFlags & TPFreadusexmlcontents) != 0;
  174. if (in)
  175. xmlParser.setown(createXMLParse(*in, iteratorPath, *this, noRoot?ptr_noRoot:ptr_none, useContents));
  176. else
  177. xmlParser.clear();
  178. }
  179. //iface IXMLSelect
  180. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  181. {
  182. lastMatch.set(&entry);
  183. }
  184. virtual bool eos()
  185. {
  186. return !ensureNext();
  187. }
  188. virtual const void * next()
  189. {
  190. for (;;)
  191. {
  192. if (!ensureNext())
  193. return NULL;
  194. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  195. unsigned sizeGot = xmlTransformer->transform(rowBuilder, lastMatch, this);
  196. lastMatch.clear();
  197. if (sizeGot)
  198. return rowBuilder.finalizeRowClear(sizeGot);
  199. }
  200. }
  201. bool ensureNext()
  202. {
  203. while (!lastMatch && xmlParser)
  204. {
  205. if (!xmlParser->next())
  206. return false;
  207. }
  208. return lastMatch != NULL;
  209. }
  210. //interface IThorDiskCallback
  211. virtual unsigned __int64 getFilePosition(const void * row) { return 0; }
  212. virtual unsigned __int64 getLocalFilePosition(const void * row) { return 0; }
  213. virtual const char * queryLogicalFilename(const void * row) { return ""; }
  214. virtual const byte * lookupBlob(unsigned __int64 id) override { throwUnexpected(); }
  215. private:
  216. IXmlToRowTransformer * xmlTransformer;
  217. IEngineRowAllocator * rowAllocator;
  218. Owned<ISimpleReadStream> in;
  219. Owned<IXMLParse> xmlParser;
  220. Owned<IColumnProvider> lastMatch;
  221. StringAttr iteratorPath;
  222. unsigned pipeFlags;
  223. };
  224. IReadRowStream *createReadRowStream(IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * _rowDeserializer, IXmlToRowTransformer * _xmlTransformer, ICsvToRowTransformer * _csvTransformer, const char * iteratorPath, unsigned pipeFlags)
  225. {
  226. if (_xmlTransformer)
  227. return new CReadRowXMLStream(_rowAllocator, _xmlTransformer, iteratorPath, pipeFlags);
  228. else if (_csvTransformer)
  229. return new CReadRowCSVStream(_rowAllocator, _csvTransformer);
  230. else
  231. return new CReadRowBinaryStream(_rowAllocator, _rowDeserializer);
  232. }
  233. //=====================================================================================================
  234. // MORE - should really split into three implementations - XML, CSV and RAW
  235. class THORHELPER_API CPipeWriteXformHelper : implements IPipeWriteXformHelper, public CInterface //Transforms output before being written to pipe. Currently CSV and XML output supported
  236. {
  237. CSVOutputStream csvWriter;
  238. IHThorCsvWriteExtra * csvWriterExtra;
  239. IHThorXmlWriteExtra * xmlWriterExtra;
  240. IOutputRowSerializer *rawSerializer;
  241. StringBuffer header;
  242. StringBuffer footer;
  243. StringBuffer rowTag;
  244. unsigned flags;
  245. public:
  246. CPipeWriteXformHelper(unsigned _flags, IHThorXmlWriteExtra * _xmlWriterExtra, IHThorCsvWriteExtra * _csvWriterExtra, IOutputRowSerializer *_rawSerializer)
  247. : flags(_flags), xmlWriterExtra(_xmlWriterExtra), csvWriterExtra(_csvWriterExtra), rawSerializer(_rawSerializer) {};
  248. IMPLEMENT_IINTERFACE;
  249. virtual void writeHeader(IPipeProcess * pipe)
  250. {
  251. if (header.length())
  252. pipe->write(header.length(),header.str());
  253. }
  254. virtual void writeFooter(IPipeProcess * pipe)
  255. {
  256. if (footer.length())
  257. pipe->write(footer.length(),footer.str());
  258. }
  259. virtual void ready()
  260. {
  261. if (flags & TPFwritexmltopipe)
  262. {
  263. assertex(xmlWriterExtra);
  264. OwnedRoxieString xmlpath(xmlWriterExtra->getXmlIteratorPath());
  265. if (!xmlpath)
  266. rowTag.append("Row");
  267. else
  268. {
  269. const char *path = xmlpath;
  270. if (*path == '/')
  271. path++;
  272. if (strchr(path, '/'))
  273. UNIMPLEMENTED; // more what do we do with /mydata/row
  274. rowTag.append(path);
  275. }
  276. //getHeader/footer can return a tag name, or NULL (indicates to use the default tag), or "" (do not use header/footer)
  277. if (!(flags & TPFwritenoroot))
  278. {
  279. OwnedRoxieString hdr(xmlWriterExtra->getHeader());
  280. if (hdr == NULL)
  281. header.append("<Dataset>\n");
  282. else
  283. header.append(hdr);
  284. OwnedRoxieString ftr(xmlWriterExtra->getFooter());
  285. if (ftr == NULL)
  286. footer.append("</Dataset>\n");
  287. else
  288. footer.append(ftr);
  289. }
  290. }
  291. else if (flags & TPFwritecsvtopipe)
  292. {
  293. assertex(csvWriterExtra);
  294. ICsvParameters * csv = csvWriterExtra->queryCsvParameters();
  295. csvWriter.init(csv, false);
  296. OwnedRoxieString hdr(csv->getHeader());
  297. if (hdr)
  298. {
  299. csvWriter.beginLine();
  300. csvWriter.writeHeaderLn(strlen(hdr), hdr);
  301. header.append(csvWriter.str());
  302. }
  303. OwnedRoxieString ftr(csv->getFooter());
  304. if (ftr)
  305. {
  306. csvWriter.beginLine();
  307. csvWriter.writeHeaderLn(strlen(ftr), ftr);//MORE: no writeFooterLn method, is writeHeaderLn ok?
  308. footer.append(csvWriter.str());
  309. }
  310. }
  311. }
  312. virtual void writeTranslatedText(const void * row, IPipeProcess * pipe)
  313. {
  314. if (xmlWriterExtra)
  315. {
  316. CommonXmlWriter xmlWriter(xmlWriterExtra->getXmlFlags());
  317. xmlWriter.outputBeginNested(rowTag, false);
  318. xmlWriterExtra->toXML((const byte *)row, xmlWriter);
  319. xmlWriter.outputEndNested(rowTag);
  320. pipe->write(xmlWriter.length(), xmlWriter.str());
  321. }
  322. else if (csvWriterExtra)
  323. {
  324. csvWriter.beginLine();
  325. csvWriterExtra->writeRow((const byte *)row, &csvWriter);
  326. csvWriter.endLine();
  327. pipe->write(csvWriter.length(), csvWriter.str());
  328. }
  329. else
  330. {
  331. MemoryBuffer myBuff;
  332. CThorDemoRowSerializer serializerTarget(myBuff);
  333. rawSerializer->serialize(serializerTarget, (const byte *) row);
  334. pipe->write(myBuff.length(), myBuff.toByteArray());
  335. }
  336. }
  337. };
  338. extern THORHELPER_API IPipeWriteXformHelper *createPipeWriteXformHelper(unsigned _flags, IHThorXmlWriteExtra * _xmlWriterExtra, IHThorCsvWriteExtra * _csvWriterExtra, IOutputRowSerializer *_rawSerializer)
  339. {
  340. return new CPipeWriteXformHelper(_flags, _xmlWriterExtra, _csvWriterExtra, _rawSerializer);
  341. }
  342. //=====================================================================================================