csvsplitter.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jregexp.hpp"
  15. #include "jlib.hpp"
  16. #include "jexcept.hpp"
  17. #include "junicode.hpp"
  18. #include "jfile.hpp"
  19. #include "eclhelper.hpp"
  20. #ifdef _USE_ICU
  21. #include "unicode/uchar.h"
  22. #endif
  23. #include "csvsplitter.hpp"
  24. #include "eclrtl.hpp"
  25. #include "roxiemem.hpp"
  26. using roxiemem::OwnedRoxieString;
  27. // If you have lines more than 2Mb in length it is more likely to be a bug - so require an explicit override
  28. #define DEFAULT_CSV_LINE_LENGTH 2048
  29. #define MAX_SENSIBLE_CSV_LINE_LENGTH 0x200000
  30. CSVSplitter::CSVSplitter()
  31. {
  32. lengths = NULL;
  33. data = NULL;
  34. numQuotes = 0;
  35. internalBuffer = NULL;
  36. maxColumns = 0;
  37. internalOffset = 0;
  38. }
  39. CSVSplitter::~CSVSplitter()
  40. {
  41. delete [] lengths;
  42. delete [] data;
  43. free(internalBuffer);
  44. }
  45. void CSVSplitter::addQuote(const char * text)
  46. {
  47. //Allow '' to remove quoting.
  48. if (text && *text)
  49. matcher.addEntry(text, QUOTE+(numQuotes++<<8));
  50. }
  51. void CSVSplitter::addSeparator(const char * text)
  52. {
  53. if (text && *text)
  54. matcher.addEntry(text, SEPARATOR);
  55. }
  56. void CSVSplitter::addTerminator(const char * text)
  57. {
  58. matcher.addEntry(text, TERMINATOR);
  59. }
  60. void CSVSplitter::addItem(MatchItem item, const char * text)
  61. {
  62. if (text)
  63. matcher.addEntry(text, item);
  64. }
  65. void CSVSplitter::addEscape(const char * text)
  66. {
  67. matcher.queryAddEntry((size32_t)strlen(text), text, ESCAPE);
  68. }
  69. void CSVSplitter::addWhitespace()
  70. {
  71. matcher.queryAddEntry(1, " ", WHITESPACE);
  72. matcher.queryAddEntry(1, "\t", WHITESPACE);
  73. }
  74. void CSVSplitter::reset()
  75. {
  76. matcher.reset();
  77. delete [] lengths;
  78. delete [] data;
  79. free(internalBuffer);
  80. lengths = NULL;
  81. data = NULL;
  82. numQuotes = 0;
  83. internalBuffer = NULL;
  84. internalOffset = 0;
  85. sizeInternal = 0;
  86. maxCsvSize = 0;
  87. }
  88. void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators, const char * dfsEscapes)
  89. {
  90. reset();
  91. maxCsvSize = csvInfo->queryMaxSize();
  92. maxColumns = _maxColumns;
  93. lengths = new unsigned [maxColumns+1]; // NB: One larger to remove some tests in main loop...
  94. data = new const byte * [maxColumns+1];
  95. unsigned idx;
  96. unsigned flags = csvInfo->getFlags();
  97. if (dfsQuotes && (flags & ICsvParameters::defaultQuote))
  98. addActionList(matcher, dfsQuotes, QUOTE);
  99. else
  100. {
  101. for (idx=0;;idx++)
  102. {
  103. OwnedRoxieString text(csvInfo->getQuote(idx));
  104. if (!text)
  105. break;
  106. addQuote(text);
  107. }
  108. }
  109. if (dfsSeparators && (flags & ICsvParameters::defaultSeparate))
  110. addActionList(matcher, dfsSeparators, SEPARATOR);
  111. else
  112. {
  113. for (idx=0;;idx++)
  114. {
  115. OwnedRoxieString text(csvInfo->getSeparator(idx));
  116. if (!text)
  117. break;
  118. addSeparator(text);
  119. }
  120. }
  121. if (dfsTerminators && (flags & ICsvParameters::defaultTerminate))
  122. addActionList(matcher, dfsTerminators, TERMINATOR);
  123. else
  124. {
  125. for (idx=0;;idx++)
  126. {
  127. OwnedRoxieString text(csvInfo->getTerminator(idx));
  128. if (!text)
  129. break;
  130. addTerminator(text);
  131. }
  132. }
  133. if (dfsEscapes && (flags & ICsvParameters::defaultEscape))
  134. addActionList(matcher, dfsEscapes, ESCAPE);
  135. else
  136. {
  137. for (idx=0;;idx++)
  138. {
  139. OwnedRoxieString text(csvInfo->getEscape(idx));
  140. if (!text)
  141. break;
  142. addEscape(text);
  143. }
  144. }
  145. //MORE Should this be configurable??
  146. if (!(flags & ICsvParameters::preserveWhitespace))
  147. addWhitespace();
  148. }
  149. void CSVSplitter::init(unsigned _maxColumns, size32_t _maxCsvSize, const char *quotes, const char *separators, const char *terminators, const char *escapes, bool preserveWhitespace)
  150. {
  151. reset();
  152. maxCsvSize = _maxCsvSize;
  153. maxColumns = _maxColumns;
  154. lengths = new unsigned [maxColumns+1]; // NB: One larger to remove some tests in main loop...
  155. data = new const byte * [maxColumns+1];
  156. unsigned idx;
  157. if (quotes)
  158. addActionList(matcher, quotes, QUOTE);
  159. if (separators)
  160. addActionList(matcher, separators, SEPARATOR);
  161. if (terminators)
  162. addActionList(matcher, terminators, TERMINATOR);
  163. if (escapes)
  164. addActionList(matcher, escapes, ESCAPE);
  165. if (!preserveWhitespace)
  166. addWhitespace();
  167. }
  168. void CSVSplitter::setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip, bool unescape)
  169. {
  170. size32_t sizeOriginal = (size32_t)(end - start);
  171. //If the field doesn't contain quotes or escape characters, then we can directly store a pointer to the original.
  172. if (!quoteToStrip && !unescape)
  173. {
  174. lengths[curColumn] = sizeOriginal;
  175. data[curColumn] = start;
  176. return;
  177. }
  178. // Either quoting or escaping will need to copy into a local buffer.
  179. size32_t sizeUsed = internalOffset;
  180. size32_t sizeRequired = sizeUsed + sizeOriginal;
  181. if (sizeRequired > sizeInternal)
  182. {
  183. if (sizeInternal == 0)
  184. sizeInternal = maxCsvSize;
  185. //Check again to allow an explicit size to override the maximum sensible line limit
  186. if (sizeRequired > sizeInternal)
  187. {
  188. if (sizeInternal == 0)
  189. sizeInternal = DEFAULT_CSV_LINE_LENGTH;
  190. else if (sizeRequired > MAX_SENSIBLE_CSV_LINE_LENGTH)
  191. throw MakeStringException(99, "CSV File contains a line > %u characters. Use MAXLENGTH to override the maximum length.", sizeRequired);
  192. //Cannot overflow as long as MAX_SENSIBLE_CSV_LINE_LENGTH < 0x80...
  193. while (sizeRequired > sizeInternal)
  194. sizeInternal *= 2;
  195. }
  196. byte * newBuffer = (byte *)realloc(internalBuffer, sizeInternal);
  197. if (!newBuffer)
  198. throw MakeStringException(99, "Failed to allocate CSV read buffer of %u bytes", sizeInternal);
  199. //The buffer has been reallocated, so we need to patch up any fields with pointers into the old buffer
  200. if (internalBuffer)
  201. {
  202. for (unsigned i=0; i < curColumn; i++)
  203. {
  204. byte * cur = (byte *)data[i];
  205. if ((cur >= internalBuffer) && (cur < internalBuffer + sizeInternal))
  206. data[i] = (cur - internalBuffer) + newBuffer;
  207. }
  208. }
  209. internalBuffer = newBuffer;
  210. }
  211. data[curColumn] = internalBuffer + internalOffset;
  212. const byte * lastCopied = start;
  213. const byte *cur;
  214. for (cur = start; cur != end; )
  215. {
  216. unsigned matchLen;
  217. unsigned match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  218. switch (match & 255)
  219. {
  220. case NONE:
  221. matchLen = 1;
  222. break;
  223. case WHITESPACE:
  224. case SEPARATOR:
  225. case TERMINATOR:
  226. break;
  227. case ESCAPE:
  228. {
  229. const byte * next = cur + matchLen;
  230. if (next != end)
  231. {
  232. //Copy all the data up to this escape character, start copying from the next character
  233. memcpy(internalBuffer + internalOffset, lastCopied, cur-lastCopied);
  234. internalOffset += (cur-lastCopied);
  235. lastCopied = cur+matchLen;
  236. //Don't treat the next character specially
  237. unsigned nextMatchLen;
  238. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  239. if (nextMatchLen == 0)
  240. nextMatchLen = 1;
  241. matchLen += nextMatchLen;
  242. }
  243. break;
  244. }
  245. case QUOTE:
  246. {
  247. const byte * next = cur + matchLen;
  248. if ((match == quoteToStrip) && (next != end))
  249. {
  250. unsigned nextMatchLen;
  251. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  252. if (nextMatch == match)
  253. {
  254. memcpy(internalBuffer + internalOffset, lastCopied, next-lastCopied);
  255. internalOffset += (next-lastCopied);
  256. matchLen += nextMatchLen;
  257. lastCopied = cur+matchLen;
  258. }
  259. }
  260. break;
  261. }
  262. }
  263. cur += matchLen;
  264. }
  265. memcpy(internalBuffer + internalOffset, lastCopied, cur-lastCopied);
  266. internalOffset += (cur-lastCopied);
  267. lengths[curColumn] = (size32_t)(internalBuffer + internalOffset - data[curColumn]);
  268. }
  269. unsigned CSVSplitter::splitLine(ISerialStream *stream, size32_t maxRowSize)
  270. {
  271. if (stream->eos())
  272. return 0;
  273. size32_t minRequired = 4096; // MORE - make configurable
  274. size32_t thisLineLength;
  275. for (;;)
  276. {
  277. size32_t avail;
  278. const void *peek = stream->peek(minRequired, avail);
  279. thisLineLength = splitLine(avail, (const byte *)peek);
  280. if (thisLineLength < avail || avail < minRequired)
  281. break;
  282. if (minRequired == maxRowSize)
  283. throw MakeStringException(99, "Stream contained a line of length greater than %u bytes.", maxRowSize);
  284. if (avail > minRequired*2)
  285. minRequired = avail+minRequired;
  286. else
  287. minRequired += minRequired;
  288. if (minRequired >= maxRowSize/2)
  289. minRequired = maxRowSize;
  290. }
  291. return thisLineLength;
  292. }
  293. size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
  294. {
  295. unsigned curColumn = 0;
  296. unsigned quote = 0;
  297. unsigned quoteToStrip = 0;
  298. const byte * cur = start;
  299. const byte * end = start + maxLength;
  300. const byte * firstGood = start;
  301. const byte * lastGood = start;
  302. bool lastEscape = false;
  303. internalOffset = 0;
  304. while (cur != end)
  305. {
  306. unsigned matchLen;
  307. unsigned match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  308. switch (match & 255)
  309. {
  310. case NONE:
  311. cur++; // matchLen == 0;
  312. lastGood = cur;
  313. break;
  314. case WHITESPACE:
  315. //Skip leading whitespace
  316. if (quote)
  317. lastGood = cur+matchLen;
  318. else if (cur == firstGood)
  319. {
  320. firstGood = cur+matchLen;
  321. lastGood = cur+matchLen;
  322. }
  323. break;
  324. case SEPARATOR:
  325. // Quoted separator
  326. if ((curColumn < maxColumns) && (quote == 0))
  327. {
  328. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  329. lastEscape = false;
  330. quoteToStrip = 0;
  331. curColumn++;
  332. firstGood = cur + matchLen;
  333. }
  334. lastGood = cur+matchLen;
  335. break;
  336. case TERMINATOR:
  337. if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
  338. {
  339. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  340. lastEscape = false;
  341. while (++curColumn < maxColumns)
  342. lengths[curColumn] = 0;
  343. return (size32_t)(cur + matchLen - start);
  344. }
  345. lastGood = cur+matchLen;
  346. break;
  347. case QUOTE:
  348. // Quoted quote
  349. if (quote == 0)
  350. {
  351. if (cur == firstGood)
  352. {
  353. quote = match;
  354. firstGood = cur+matchLen;
  355. }
  356. lastGood = cur+matchLen;
  357. }
  358. else
  359. {
  360. if (quote == match)
  361. {
  362. const byte * next = cur + matchLen;
  363. //Check for double quotes
  364. if ((next != end))
  365. {
  366. unsigned nextMatchLen;
  367. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  368. if (nextMatch == quote)
  369. {
  370. quoteToStrip = quote;
  371. matchLen += nextMatchLen;
  372. lastGood = cur+matchLen;
  373. }
  374. else
  375. quote = 0;
  376. }
  377. else
  378. quote = 0;
  379. }
  380. else
  381. lastGood = cur+matchLen;
  382. }
  383. break;
  384. case ESCAPE:
  385. lastEscape = true;
  386. lastGood = cur+matchLen;
  387. // If this escape is at the end, proceed to field range
  388. if (lastGood == end)
  389. break;
  390. // Skip escape and ignore the next match
  391. cur += matchLen;
  392. match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  393. if ((match & 255) == NONE)
  394. matchLen = 1;
  395. lastGood += matchLen;
  396. break;
  397. }
  398. cur += matchLen;
  399. }
  400. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  401. while (++curColumn < maxColumns)
  402. lengths[curColumn] = 0;
  403. return (size32_t)(end - start);
  404. }
  405. //=====================================================================================================
  406. void CSVOutputStream::beginLine()
  407. {
  408. clear();
  409. prefix = NULL;
  410. }
  411. void CSVOutputStream::endLine()
  412. {
  413. append(terminator);
  414. }
  415. void CSVOutputStream::init(ICsvParameters * args, bool _oldOutputFormat)
  416. {
  417. if (args->queryEBCDIC())
  418. throw MakeStringException(99, "EBCDIC CSV output not yet implemented");
  419. OwnedRoxieString rs;
  420. quote.set(rs.setown(args->getQuote(0)));
  421. separator.set(rs.setown(args->getSeparator(0)));
  422. terminator.set(rs.setown(args->getTerminator(0)));
  423. escape.set(rs.setown(args->getEscape(0)));
  424. oldOutputFormat = _oldOutputFormat||!quote.length();
  425. }
  426. void CSVOutputStream::writeUnicode(size32_t len, const UChar * data)
  427. {
  428. unsigned utf8Length;
  429. char * utf8Data = NULL;
  430. rtlUnicodeToCodepageX(utf8Length, utf8Data, len, data, "utf-8");
  431. writeString(utf8Length, utf8Data);
  432. rtlFree(utf8Data);
  433. }
  434. #ifndef _USE_ICU
  435. static inline bool u_isspace(UChar next) { return isspace((byte)next); }
  436. #endif
  437. void CSVOutputStream::writeUtf8(size32_t len, const char * data)
  438. {
  439. append(prefix);
  440. if (oldOutputFormat) {
  441. append(quote).append(rtlUtf8Size(len, data), data).append(quote);
  442. }
  443. else if (len) {
  444. // is this OTT?
  445. // not sure if best way but generate an array of utf8 sizes
  446. MemoryAttr ma;
  447. size32_t * cl;
  448. if (len>256)
  449. cl = (size32_t *)ma.allocate(sizeof(size32_t)*len);
  450. else
  451. cl = (size32_t *)alloca(sizeof(size32_t)*len);
  452. unsigned start=(unsigned)-1;
  453. unsigned end=0;
  454. const byte * s = (const byte *)data;
  455. unsigned i;
  456. for (i=0;i<len;i++) {
  457. const byte *p=s;
  458. UChar next = readUtf8Character(sizeof(UChar), s);
  459. cl[i] = (size32_t)(s-p);
  460. if (next != ' ') {
  461. end = i;
  462. if (start==(unsigned)-1)
  463. start = i;
  464. }
  465. }
  466. const byte *e=s;
  467. // do trim
  468. if (start!=(unsigned)-1) {
  469. for (i=0;i<start;i++)
  470. data += *(cl++);
  471. len -= start;
  472. end -= start;
  473. end++;
  474. while (end<len)
  475. e -= cl[--len];
  476. }
  477. // now see if need quoting by looking for separator, terminator or quote
  478. // I *think* this can be done with memcmps as has to be exact
  479. size32_t sl = separator.length();
  480. size32_t tl = terminator.length();
  481. size32_t ql = quote.length();
  482. bool needquote=false;
  483. s = (const byte *)data;
  484. for (i=0;i<len;i++) {
  485. size32_t l = (size32_t)(e-s);
  486. if (sl&&(l>=sl)&&(memcmp(separator.get(),s,sl)==0)) {
  487. needquote = true;
  488. break;
  489. }
  490. if (tl&&(l>=tl)&&(memcmp(terminator.get(),s,tl)==0)) {
  491. needquote = true;
  492. break;
  493. }
  494. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0)) {
  495. needquote = true;
  496. break;
  497. }
  498. s+=cl[i];
  499. }
  500. if (needquote) {
  501. append(quote);
  502. s = (const byte *)data;
  503. for (i=0;i<len;i++) {
  504. size32_t l = (size32_t)(e-s);
  505. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0))
  506. append(quote);
  507. append(cl[i],(const char *)s);
  508. s+=cl[i];
  509. }
  510. append(quote);
  511. }
  512. else
  513. append((size32_t)(e-(const byte *)data),data);
  514. }
  515. prefix = separator;
  516. }
  517. void CSVOutputStream::writeString(size32_t len, const char * data)
  518. {
  519. append(prefix);
  520. if (oldOutputFormat) {
  521. append(quote).append(len, data).append(quote);
  522. }
  523. else if (len) {
  524. // New format (as per GS)
  525. // first trim
  526. while (len&&(*data==' ')) {
  527. len--;
  528. data++;
  529. }
  530. while (len&&(data[len-1]==' '))
  531. len--;
  532. // now see if need quoting by looking for separator, terminator or quote
  533. size32_t sl = separator.length();
  534. size32_t tl = terminator.length();
  535. size32_t ql = quote.length();
  536. bool needquote=false;
  537. const char *s = data;
  538. for (unsigned l=len;l>0;l--) {
  539. if (sl&&(l>=sl)&&(memcmp(separator.get(),s,sl)==0)) {
  540. needquote = true;
  541. break;
  542. }
  543. if (tl&&(l>=tl)&&(memcmp(terminator.get(),s,tl)==0)) {
  544. needquote = true;
  545. break;
  546. }
  547. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0)) {
  548. needquote = true;
  549. break;
  550. }
  551. s++;
  552. }
  553. if (needquote) {
  554. append(quote);
  555. const char *s = data;
  556. for (unsigned l=len;l>0;l--) {
  557. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0))
  558. append(quote);
  559. append(*(s++));
  560. }
  561. append(quote);
  562. }
  563. else
  564. append(len,data);
  565. }
  566. prefix = separator;
  567. }
  568. void CSVOutputStream::writeHeaderLn(size32_t len, const char * data)
  569. {
  570. append(len,data);
  571. if (!oldOutputFormat&&len) {
  572. size32_t tl = terminator.length();
  573. if ((tl>len)||(memcmp(data+len-tl,terminator.get(),tl)!=0))
  574. endLine();
  575. }
  576. }