thorpipe.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "thorpipe.hpp"
  15. #include "thorxmlread.hpp"
  16. #include "thorxmlwrite.hpp"
  17. #include "thorcommon.ipp"
  18. #include "csvsplitter.hpp"
  19. #include "rtlread_imp.hpp"
  20. #include "rtlds_imp.hpp"
  21. #include "roxiemem.hpp"
  22. using roxiemem::OwnedRoxieString;
  23. //=====================================================================================================
  24. class CPipeErrorHelper : public Thread, implements IPipeErrorHelper
  25. {
  26. private:
  27. StringBuffer errorOutput;
  28. Linked<IPipeProcess> pipe;
  29. public:
  30. IMPLEMENT_IINTERFACE;
  31. int run()
  32. {
  33. char buffer[10001];
  34. int numErrors = 0;
  35. size32_t read;
  36. char *errorLine;
  37. while (true)
  38. {
  39. read = pipe->readError(10000,buffer);
  40. if ((read == 0) || (read == (size32_t)-1))
  41. break;
  42. if (numErrors < 100)
  43. {
  44. buffer[read] = '\0';
  45. char *saveptr;
  46. errorLine = strtok_r(buffer, "\n", &saveptr);
  47. errorOutput.append(errorLine).newline();
  48. numErrors++;
  49. while ((numErrors < 100) && (errorLine = strtok_r(NULL, "\n", &saveptr)))
  50. {
  51. errorOutput.append(errorLine).newline();
  52. numErrors++;
  53. }
  54. }
  55. }
  56. return 0;
  57. }
  58. void run(IPipeProcess *_pipe)
  59. {
  60. pipe.set(_pipe);
  61. this->start();
  62. }
  63. void wait()
  64. {
  65. this->join();
  66. }
  67. const char *queryErrorOutput()
  68. {
  69. return errorOutput.str();
  70. }
  71. };
  72. //=====================================================================================================
  73. IPipeErrorHelper * createPipeErrorHelper()
  74. {
  75. return new CPipeErrorHelper();
  76. }
  77. //=====================================================================================================
  78. #define PIPE_BUFSIZE 0x8000
  79. class CBufferedReadRowStream : public CInterface, implements IReadRowStream
  80. {
  81. public:
  82. IMPLEMENT_IINTERFACE;
  83. CBufferedReadRowStream(IEngineRowAllocator * _rowAllocator) : rowAllocator(_rowAllocator)
  84. {
  85. }
  86. virtual bool eos()
  87. {
  88. return pipeStream->eos();
  89. }
  90. virtual void setStream(ISimpleReadStream * in)
  91. {
  92. if (in)
  93. {
  94. pipeStream.setown(createSimpleSerialStream(in, PIPE_BUFSIZE));
  95. rowSource.setStream(pipeStream);
  96. }
  97. else
  98. {
  99. rowSource.setStream(NULL);
  100. pipeStream.clear();
  101. }
  102. }
  103. protected:
  104. Owned<ISerialStream> pipeStream;
  105. CThorStreamDeserializerSource rowSource;
  106. IEngineRowAllocator * rowAllocator;
  107. };
  108. class CReadRowBinaryStream : public CBufferedReadRowStream
  109. {
  110. public:
  111. CReadRowBinaryStream(IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * _rowDeserializer)
  112. : CBufferedReadRowStream(_rowAllocator), rowDeserializer(_rowDeserializer)
  113. {
  114. }
  115. virtual const void * next()
  116. {
  117. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  118. size32_t size = rowDeserializer->deserialize(rowBuilder, rowSource);
  119. return rowBuilder.finalizeRowClear(size);
  120. }
  121. private:
  122. IOutputRowDeserializer * rowDeserializer;
  123. };
  124. class CReadRowCSVStream : extends CBufferedReadRowStream
  125. {
  126. public:
  127. IMPLEMENT_IINTERFACE;
  128. CReadRowCSVStream(IEngineRowAllocator * _rowAllocator, ICsvToRowTransformer * _csvTransformer)
  129. : CBufferedReadRowStream(_rowAllocator), csvTransformer(_csvTransformer)
  130. {
  131. ICsvParameters * csvInfo = csvTransformer->queryCsvParameters();
  132. //MORE: This value is never used. Should it be asserting(headerLines == 0)
  133. unsigned int headerLines = csvInfo->queryHeaderLen();
  134. size32_t max = csvInfo->queryMaxSize();
  135. const char * quotes = NULL;
  136. const char * separators = NULL;
  137. const char * terminators = NULL;
  138. const char * escapes = NULL;
  139. csvSplitter.init(csvTransformer->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  140. }
  141. virtual const void * next()
  142. {
  143. if (!pipeStream->eos())
  144. {
  145. size32_t rowSize = 4096; // MORE - make configurable
  146. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  147. size32_t thisLineLength;
  148. loop
  149. {
  150. size32_t avail;
  151. const void *peek = pipeStream->peek(rowSize, avail);
  152. thisLineLength = csvSplitter.splitLine(avail, (const byte *)peek);
  153. if (thisLineLength < rowSize || avail < rowSize)
  154. break;
  155. if (rowSize == maxRowSize)
  156. throw MakeStringException(99, "CSV file contained a line of length greater than %d bytes.", rowSize);
  157. if (rowSize >= maxRowSize/2)
  158. rowSize = maxRowSize;
  159. else
  160. rowSize += rowSize;
  161. }
  162. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  163. unsigned thisSize;
  164. unsigned __int64 fpos=0;
  165. thisSize = csvTransformer->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), fpos);
  166. pipeStream->skip(thisLineLength);
  167. if (thisSize)
  168. return rowBuilder.finalizeRowClear(thisSize);
  169. }
  170. return NULL;
  171. }
  172. private:
  173. ICsvToRowTransformer * csvTransformer;
  174. CSVSplitter csvSplitter;
  175. };
  176. class CReadRowXMLStream : public CInterface, implements IReadRowStream, implements IXMLSelect, implements IThorDiskCallback
  177. {
  178. public:
  179. IMPLEMENT_IINTERFACE;
  180. CReadRowXMLStream(IEngineRowAllocator * _rowAllocator, IXmlToRowTransformer * _xmlTransformer, const char * _iteratorPath, unsigned _pipeFlags)
  181. : rowAllocator(_rowAllocator), xmlTransformer(_xmlTransformer), iteratorPath(_iteratorPath), pipeFlags(_pipeFlags)
  182. {
  183. }
  184. virtual void setStream(ISimpleReadStream * _in)
  185. {
  186. in.set(_in);
  187. bool noRoot = (pipeFlags & TPFreadnoroot) != 0;
  188. bool useContents = (pipeFlags & TPFreadusexmlcontents) != 0;
  189. if (in)
  190. xmlParser.setown(createXMLParse(*in, iteratorPath, *this, noRoot?ptr_noRoot:ptr_none, useContents));
  191. else
  192. xmlParser.clear();
  193. }
  194. //iface IXMLSelect
  195. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  196. {
  197. lastMatch.set(&entry);
  198. }
  199. virtual bool eos()
  200. {
  201. return !ensureNext();
  202. }
  203. virtual const void * next()
  204. {
  205. loop
  206. {
  207. if (!ensureNext())
  208. return NULL;
  209. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  210. unsigned sizeGot = xmlTransformer->transform(rowBuilder, lastMatch, this);
  211. lastMatch.clear();
  212. if (sizeGot)
  213. return rowBuilder.finalizeRowClear(sizeGot);
  214. }
  215. }
  216. bool ensureNext()
  217. {
  218. while (!lastMatch && xmlParser)
  219. {
  220. if (!xmlParser->next())
  221. return false;
  222. }
  223. return lastMatch != NULL;
  224. }
  225. //interface IThorDiskCallback
  226. virtual unsigned __int64 getFilePosition(const void * row) { return 0; }
  227. virtual unsigned __int64 getLocalFilePosition(const void * row) { return 0; }
  228. virtual const char * queryLogicalFilename(const void * row) { return ""; }
  229. private:
  230. IXmlToRowTransformer * xmlTransformer;
  231. IEngineRowAllocator * rowAllocator;
  232. Owned<ISimpleReadStream> in;
  233. Owned<IXMLParse> xmlParser;
  234. Owned<IColumnProvider> lastMatch;
  235. StringAttr iteratorPath;
  236. unsigned pipeFlags;
  237. };
  238. IReadRowStream *createReadRowStream(IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * _rowDeserializer, IXmlToRowTransformer * _xmlTransformer, ICsvToRowTransformer * _csvTransformer, const char * iteratorPath, unsigned pipeFlags)
  239. {
  240. if (_xmlTransformer)
  241. return new CReadRowXMLStream(_rowAllocator, _xmlTransformer, iteratorPath, pipeFlags);
  242. else if (_csvTransformer)
  243. return new CReadRowCSVStream(_rowAllocator, _csvTransformer);
  244. else
  245. return new CReadRowBinaryStream(_rowAllocator, _rowDeserializer);
  246. }
  247. //=====================================================================================================
  248. // MORE - should really split into three implementations - XML, CSV and RAW
  249. class THORHELPER_API CPipeWriteXformHelper : implements IPipeWriteXformHelper, public CInterface //Transforms output before being written to pipe. Currently CSV and XML output supported
  250. {
  251. CSVOutputStream csvWriter;
  252. IHThorCsvWriteExtra * csvWriterExtra;
  253. IHThorXmlWriteExtra * xmlWriterExtra;
  254. IOutputRowSerializer *rawSerializer;
  255. StringBuffer header;
  256. StringBuffer footer;
  257. StringBuffer rowTag;
  258. unsigned flags;
  259. public:
  260. CPipeWriteXformHelper(unsigned _flags, IHThorXmlWriteExtra * _xmlWriterExtra, IHThorCsvWriteExtra * _csvWriterExtra, IOutputRowSerializer *_rawSerializer)
  261. : flags(_flags), xmlWriterExtra(_xmlWriterExtra), csvWriterExtra(_csvWriterExtra), rawSerializer(_rawSerializer) {};
  262. IMPLEMENT_IINTERFACE;
  263. virtual void writeHeader(IPipeProcess * pipe)
  264. {
  265. if (header.length())
  266. pipe->write(header.length(),header.str());
  267. }
  268. virtual void writeFooter(IPipeProcess * pipe)
  269. {
  270. if (footer.length())
  271. pipe->write(footer.length(),footer.str());
  272. }
  273. virtual void ready()
  274. {
  275. if (flags & TPFwritexmltopipe)
  276. {
  277. assertex(xmlWriterExtra);
  278. OwnedRoxieString xmlpath(xmlWriterExtra->getXmlIteratorPath());
  279. if (!xmlpath)
  280. rowTag.append("Row");
  281. else
  282. {
  283. const char *path = xmlpath;
  284. if (*path == '/')
  285. path++;
  286. if (strchr(path, '/'))
  287. UNIMPLEMENTED; // more what do we do with /mydata/row
  288. rowTag.append(path);
  289. }
  290. //getHeader/footer can return a tag name, or NULL (indicates to use the default tag), or "" (do not use header/footer)
  291. if (!(flags & TPFwritenoroot))
  292. {
  293. OwnedRoxieString hdr(xmlWriterExtra->getHeader());
  294. if (hdr == NULL)
  295. header.append("<Dataset>\n");
  296. else
  297. header.append(hdr);
  298. OwnedRoxieString ftr(xmlWriterExtra->getFooter());
  299. if (ftr == NULL)
  300. footer.append("</Dataset>\n");
  301. else
  302. footer.append(ftr);
  303. }
  304. }
  305. else if (flags & TPFwritecsvtopipe)
  306. {
  307. assertex(csvWriterExtra);
  308. ICsvParameters * csv = csvWriterExtra->queryCsvParameters();
  309. csvWriter.init(csv, false);
  310. OwnedRoxieString hdr(csv->getHeader());
  311. if (hdr)
  312. {
  313. csvWriter.beginLine();
  314. csvWriter.writeHeaderLn(strlen(hdr), hdr);
  315. header.append(csvWriter.str());
  316. }
  317. OwnedRoxieString ftr(csv->getFooter());
  318. if (ftr)
  319. {
  320. csvWriter.beginLine();
  321. csvWriter.writeHeaderLn(strlen(ftr), ftr);//MORE: no writeFooterLn method, is writeHeaderLn ok?
  322. footer.append(csvWriter.str());
  323. }
  324. }
  325. }
  326. virtual void writeTranslatedText(const void * row, IPipeProcess * pipe)
  327. {
  328. if (xmlWriterExtra)
  329. {
  330. CommonXmlWriter xmlWriter(xmlWriterExtra->getXmlFlags());
  331. xmlWriter.outputBeginNested(rowTag, false);
  332. xmlWriterExtra->toXML((const byte *)row, xmlWriter);
  333. xmlWriter.outputEndNested(rowTag);
  334. pipe->write(xmlWriter.length(), xmlWriter.str());
  335. }
  336. else if (csvWriterExtra)
  337. {
  338. csvWriter.beginLine();
  339. csvWriterExtra->writeRow((const byte *)row, &csvWriter);
  340. csvWriter.endLine();
  341. pipe->write(csvWriter.length(), csvWriter.str());
  342. }
  343. else
  344. {
  345. MemoryBuffer myBuff;
  346. CThorDemoRowSerializer serializerTarget(myBuff);
  347. rawSerializer->serialize(serializerTarget, (const byte *) row);
  348. pipe->write(myBuff.length(), myBuff.toByteArray());
  349. }
  350. }
  351. };
  352. extern THORHELPER_API IPipeWriteXformHelper *createPipeWriteXformHelper(unsigned _flags, IHThorXmlWriteExtra * _xmlWriterExtra, IHThorCsvWriteExtra * _csvWriterExtra, IOutputRowSerializer *_rawSerializer)
  353. {
  354. return new CPipeWriteXformHelper(_flags, _xmlWriterExtra, _csvWriterExtra, _rawSerializer);
  355. }
  356. //=====================================================================================================