csvsplitter.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jregexp.hpp"
  15. #include "jlib.hpp"
  16. #include "jexcept.hpp"
  17. #include "junicode.hpp"
  18. #include "eclhelper.hpp"
  19. #ifdef _USE_ICU
  20. #include "unicode/uchar.h"
  21. #endif
  22. #include "csvsplitter.hpp"
  23. #include "eclrtl.hpp"
  24. #include "roxiemem.hpp"
  25. using roxiemem::OwnedRoxieString;
  26. // If you have lines more than 2Mb in length it is more likely to be a bug - so require an explicit override
  27. #define DEFAULT_CSV_LINE_LENGTH 2048
  28. #define MAX_SENSIBLE_CSV_LINE_LENGTH 0x200000
  29. CSVSplitter::CSVSplitter()
  30. {
  31. lengths = NULL;
  32. data = NULL;
  33. numQuotes = 0;
  34. internalBuffer = NULL;
  35. maxColumns = 0;
  36. internalOffset = 0;
  37. }
  38. CSVSplitter::~CSVSplitter()
  39. {
  40. delete [] lengths;
  41. delete [] data;
  42. free(internalBuffer);
  43. }
  44. void CSVSplitter::addQuote(const char * text)
  45. {
  46. //Allow '' to remove quoting.
  47. if (text && *text)
  48. matcher.addEntry(text, QUOTE+(numQuotes++<<8));
  49. }
  50. void CSVSplitter::addSeparator(const char * text)
  51. {
  52. if (text && *text)
  53. matcher.addEntry(text, SEPARATOR);
  54. }
  55. void CSVSplitter::addTerminator(const char * text)
  56. {
  57. matcher.addEntry(text, TERMINATOR);
  58. }
  59. void CSVSplitter::addEscape(const char * text)
  60. {
  61. matcher.addEntry(text, ESCAPE);
  62. }
  63. void CSVSplitter::reset()
  64. {
  65. matcher.reset();
  66. delete [] lengths;
  67. delete [] data;
  68. free(internalBuffer);
  69. lengths = NULL;
  70. data = NULL;
  71. numQuotes = 0;
  72. internalBuffer = NULL;
  73. internalOffset = 0;
  74. sizeInternal = 0;
  75. maxCsvSize = 0;
  76. }
  77. void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators, const char * dfsEscapes)
  78. {
  79. reset();
  80. maxCsvSize = csvInfo->queryMaxSize();
  81. maxColumns = _maxColumns;
  82. lengths = new unsigned [maxColumns+1]; // NB: One larger to remove some tests in main loop...
  83. data = new const byte * [maxColumns+1];
  84. unsigned idx;
  85. unsigned flags = csvInfo->getFlags();
  86. if (dfsQuotes && (flags & ICsvParameters::defaultQuote))
  87. addActionList(matcher, dfsQuotes, QUOTE);
  88. else
  89. {
  90. for (idx=0;;idx++)
  91. {
  92. OwnedRoxieString text(csvInfo->getQuote(idx));
  93. if (!text)
  94. break;
  95. addQuote(text);
  96. }
  97. }
  98. if (dfsSeparators && (flags & ICsvParameters::defaultSeparate))
  99. addActionList(matcher, dfsSeparators, SEPARATOR);
  100. else
  101. {
  102. for (idx=0;;idx++)
  103. {
  104. OwnedRoxieString text(csvInfo->getSeparator(idx));
  105. if (!text)
  106. break;
  107. addSeparator(text);
  108. }
  109. }
  110. if (dfsTerminators && (flags & ICsvParameters::defaultTerminate))
  111. addActionList(matcher, dfsTerminators, TERMINATOR);
  112. else
  113. {
  114. for (idx=0;;idx++)
  115. {
  116. OwnedRoxieString text(csvInfo->getTerminator(idx));
  117. if (!text)
  118. break;
  119. addTerminator(text);
  120. }
  121. }
  122. if (dfsEscapes && (flags & ICsvParameters::defaultEscape))
  123. addActionList(matcher, dfsEscapes, ESCAPE);
  124. else
  125. {
  126. for (idx=0;;idx++)
  127. {
  128. OwnedRoxieString text(csvInfo->getEscape(idx));
  129. if (!text)
  130. break;
  131. addEscape(text);
  132. }
  133. }
  134. //MORE Should this be configurable??
  135. if (!(flags & ICsvParameters::preserveWhitespace))
  136. {
  137. matcher.queryAddEntry(1, " ", WHITESPACE);
  138. matcher.queryAddEntry(1, "\t", WHITESPACE);
  139. }
  140. }
  141. void CSVSplitter::setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip, bool unescape)
  142. {
  143. size32_t sizeOriginal = (size32_t)(end - start);
  144. //If the field doesn't contain quotes or escape characters, then we can directly store a pointer to the original.
  145. if (!quoteToStrip && !unescape)
  146. {
  147. lengths[curColumn] = sizeOriginal;
  148. data[curColumn] = start;
  149. return;
  150. }
  151. // Either quoting or escaping will need to copy into a local buffer.
  152. size32_t sizeUsed = internalOffset;
  153. size32_t sizeRequired = sizeUsed + sizeOriginal;
  154. if (sizeRequired > sizeInternal)
  155. {
  156. if (sizeInternal == 0)
  157. sizeInternal = maxCsvSize;
  158. //Check again to allow an explicit size to override the maximum sensible line limit
  159. if (sizeRequired > sizeInternal)
  160. {
  161. if (sizeInternal == 0)
  162. sizeInternal = DEFAULT_CSV_LINE_LENGTH;
  163. else if (sizeRequired > MAX_SENSIBLE_CSV_LINE_LENGTH)
  164. throw MakeStringException(99, "CSV File contains a line > %u characters. Use MAXLENGTH to override the maximum length.", sizeRequired);
  165. //Cannot overflow as long as MAX_SENSIBLE_CSV_LINE_LENGTH < 0x80...
  166. while (sizeRequired > sizeInternal)
  167. sizeInternal *= 2;
  168. }
  169. byte * newBuffer = (byte *)realloc(internalBuffer, sizeInternal);
  170. if (!newBuffer)
  171. throw MakeStringException(99, "Failed to allocate CSV read buffer of %u bytes", sizeInternal);
  172. //The buffer has been reallocated, so we need to patch up any fields with pointers into the old buffer
  173. if (internalBuffer)
  174. {
  175. for (unsigned i=0; i < curColumn; i++)
  176. {
  177. byte * cur = (byte *)data[i];
  178. if ((cur >= internalBuffer) && (cur < internalBuffer + sizeInternal))
  179. data[i] = (cur - internalBuffer) + newBuffer;
  180. }
  181. }
  182. internalBuffer = newBuffer;
  183. }
  184. data[curColumn] = internalBuffer + internalOffset;
  185. const byte * lastCopied = start;
  186. const byte *cur;
  187. for (cur = start; cur != end; )
  188. {
  189. unsigned matchLen;
  190. unsigned match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  191. switch (match & 255)
  192. {
  193. case NONE:
  194. matchLen = 1;
  195. break;
  196. case WHITESPACE:
  197. case SEPARATOR:
  198. case TERMINATOR:
  199. break;
  200. case ESCAPE:
  201. {
  202. const byte * next = cur + matchLen;
  203. if (next != end)
  204. {
  205. //Copy all the data up to this escape character, start copying from the next character
  206. memcpy(internalBuffer + internalOffset, lastCopied, cur-lastCopied);
  207. internalOffset += (cur-lastCopied);
  208. lastCopied = cur+matchLen;
  209. //Don't treat the next character specially
  210. unsigned nextMatchLen;
  211. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  212. if (nextMatchLen == 0)
  213. nextMatchLen = 1;
  214. matchLen += nextMatchLen;
  215. }
  216. break;
  217. }
  218. case QUOTE:
  219. {
  220. const byte * next = cur + matchLen;
  221. if ((match == quoteToStrip) && (next != end))
  222. {
  223. unsigned nextMatchLen;
  224. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  225. if (nextMatch == match)
  226. {
  227. memcpy(internalBuffer + internalOffset, lastCopied, next-lastCopied);
  228. internalOffset += (next-lastCopied);
  229. matchLen += nextMatchLen;
  230. lastCopied = cur+matchLen;
  231. }
  232. }
  233. break;
  234. }
  235. }
  236. cur += matchLen;
  237. }
  238. memcpy(internalBuffer + internalOffset, lastCopied, cur-lastCopied);
  239. internalOffset += (cur-lastCopied);
  240. lengths[curColumn] = (size32_t)(internalBuffer + internalOffset - data[curColumn]);
  241. }
  242. size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
  243. {
  244. unsigned curColumn = 0;
  245. unsigned quote = 0;
  246. unsigned quoteToStrip = 0;
  247. const byte * cur = start;
  248. const byte * end = start + maxLength;
  249. const byte * firstGood = start;
  250. const byte * lastGood = start;
  251. bool lastEscape = false;
  252. internalOffset = 0;
  253. while (cur != end)
  254. {
  255. unsigned matchLen;
  256. unsigned match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  257. switch (match & 255)
  258. {
  259. case NONE:
  260. cur++; // matchLen == 0;
  261. lastGood = cur;
  262. break;
  263. case WHITESPACE:
  264. //Skip leading whitespace
  265. if (quote)
  266. lastGood = cur+matchLen;
  267. else if (cur == firstGood)
  268. {
  269. firstGood = cur+matchLen;
  270. lastGood = cur+matchLen;
  271. }
  272. break;
  273. case SEPARATOR:
  274. // Quoted separator
  275. if ((curColumn < maxColumns) && (quote == 0))
  276. {
  277. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  278. lastEscape = false;
  279. quoteToStrip = 0;
  280. curColumn++;
  281. firstGood = cur + matchLen;
  282. }
  283. lastGood = cur+matchLen;
  284. break;
  285. case TERMINATOR:
  286. if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
  287. {
  288. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  289. lastEscape = false;
  290. while (++curColumn < maxColumns)
  291. lengths[curColumn] = 0;
  292. return (size32_t)(cur + matchLen - start);
  293. }
  294. lastGood = cur+matchLen;
  295. break;
  296. case QUOTE:
  297. // Quoted quote
  298. if (quote == 0)
  299. {
  300. if (cur == firstGood)
  301. {
  302. quote = match;
  303. firstGood = cur+matchLen;
  304. }
  305. lastGood = cur+matchLen;
  306. }
  307. else
  308. {
  309. if (quote == match)
  310. {
  311. const byte * next = cur + matchLen;
  312. //Check for double quotes
  313. if ((next != end))
  314. {
  315. unsigned nextMatchLen;
  316. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  317. if (nextMatch == quote)
  318. {
  319. quoteToStrip = quote;
  320. matchLen += nextMatchLen;
  321. lastGood = cur+matchLen;
  322. }
  323. else
  324. quote = 0;
  325. }
  326. else
  327. quote = 0;
  328. }
  329. else
  330. lastGood = cur+matchLen;
  331. }
  332. break;
  333. case ESCAPE:
  334. lastEscape = true;
  335. lastGood = cur+matchLen;
  336. // If this escape is at the end, proceed to field range
  337. if (lastGood == end)
  338. break;
  339. // Skip escape and ignore the next match
  340. cur += matchLen;
  341. match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  342. if ((match & 255) == NONE)
  343. matchLen = 1;
  344. lastGood += matchLen;
  345. break;
  346. }
  347. cur += matchLen;
  348. }
  349. setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
  350. while (++curColumn < maxColumns)
  351. lengths[curColumn] = 0;
  352. return (size32_t)(end - start);
  353. }
  354. //=====================================================================================================
  355. void CSVOutputStream::beginLine()
  356. {
  357. clear();
  358. prefix = NULL;
  359. }
  360. void CSVOutputStream::endLine()
  361. {
  362. append(terminator);
  363. }
  364. void CSVOutputStream::init(ICsvParameters * args, bool _oldOutputFormat)
  365. {
  366. if (args->queryEBCDIC())
  367. throw MakeStringException(99, "EBCDIC CSV output not yet implemented");
  368. OwnedRoxieString rs;
  369. quote.set(rs.setown(args->getQuote(0)));
  370. separator.set(rs.setown(args->getSeparator(0)));
  371. terminator.set(rs.setown(args->getTerminator(0)));
  372. escape.set(rs.setown(args->getEscape(0)));
  373. oldOutputFormat = _oldOutputFormat||!quote.length();
  374. }
  375. void CSVOutputStream::writeUnicode(size32_t len, const UChar * data)
  376. {
  377. unsigned utf8Length;
  378. char * utf8Data = NULL;
  379. rtlUnicodeToCodepageX(utf8Length, utf8Data, len, data, "utf-8");
  380. writeString(utf8Length, utf8Data);
  381. rtlFree(utf8Data);
  382. }
  383. #ifndef _USE_ICU
  384. static inline bool u_isspace(UChar next) { return isspace((byte)next); }
  385. #endif
  386. void CSVOutputStream::writeUtf8(size32_t len, const char * data)
  387. {
  388. append(prefix);
  389. if (oldOutputFormat) {
  390. append(quote).append(rtlUtf8Size(len, data), data).append(quote);
  391. }
  392. else if (len) {
  393. // is this OTT?
  394. // not sure if best way but generate an array of utf8 sizes
  395. MemoryAttr ma;
  396. size32_t * cl;
  397. if (len>256)
  398. cl = (size32_t *)ma.allocate(sizeof(size32_t)*len);
  399. else
  400. cl = (size32_t *)alloca(sizeof(size32_t)*len);
  401. unsigned start=(unsigned)-1;
  402. unsigned end=0;
  403. const byte * s = (const byte *)data;
  404. unsigned i;
  405. for (i=0;i<len;i++) {
  406. const byte *p=s;
  407. UChar next = readUtf8Character(sizeof(UChar), s);
  408. cl[i] = (size32_t)(s-p);
  409. if (!u_isspace(next)) {
  410. end = i;
  411. if (start==(unsigned)-1)
  412. start = i;
  413. }
  414. }
  415. const byte *e=s;
  416. // do trim
  417. if (start!=(unsigned)-1) {
  418. for (i=0;i<start;i++)
  419. data += *(cl++);
  420. len -= start;
  421. end -= start;
  422. end++;
  423. while (end<len)
  424. e -= cl[--len];
  425. }
  426. // now see if need quoting by looking for separator, terminator or quote
  427. // I *think* this can be done with memcmps as has to be exact
  428. size32_t sl = separator.length();
  429. size32_t tl = terminator.length();
  430. size32_t ql = quote.length();
  431. bool needquote=false;
  432. s = (const byte *)data;
  433. for (i=0;i<len;i++) {
  434. size32_t l = (size32_t)(e-s);
  435. if (sl&&(l>=sl)&&(memcmp(separator.get(),s,sl)==0)) {
  436. needquote = true;
  437. break;
  438. }
  439. if (tl&&(l>=tl)&&(memcmp(terminator.get(),s,tl)==0)) {
  440. needquote = true;
  441. break;
  442. }
  443. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0)) {
  444. needquote = true;
  445. break;
  446. }
  447. s+=cl[i];
  448. }
  449. if (needquote) {
  450. append(quote);
  451. s = (const byte *)data;
  452. for (i=0;i<len;i++) {
  453. size32_t l = (size32_t)(e-s);
  454. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0))
  455. append(quote);
  456. append(cl[i],(const char *)s);
  457. s+=cl[i];
  458. }
  459. append(quote);
  460. }
  461. else
  462. append((size32_t)(e-(const byte *)data),data);
  463. }
  464. prefix = separator;
  465. }
  466. void CSVOutputStream::writeString(size32_t len, const char * data)
  467. {
  468. append(prefix);
  469. if (oldOutputFormat) {
  470. append(quote).append(len, data).append(quote);
  471. }
  472. else if (len) {
  473. // New format (as per GS)
  474. // first trim
  475. while (len&&(*data==' ')) {
  476. len--;
  477. data++;
  478. }
  479. while (len&&(data[len-1]==' '))
  480. len--;
  481. // now see if need quoting by looking for separator, terminator or quote
  482. size32_t sl = separator.length();
  483. size32_t tl = terminator.length();
  484. size32_t ql = quote.length();
  485. bool needquote=false;
  486. const char *s = data;
  487. for (unsigned l=len;l>0;l--) {
  488. if (sl&&(l>=sl)&&(memcmp(separator.get(),s,sl)==0)) {
  489. needquote = true;
  490. break;
  491. }
  492. if (tl&&(l>=tl)&&(memcmp(terminator.get(),s,tl)==0)) {
  493. needquote = true;
  494. break;
  495. }
  496. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0)) {
  497. needquote = true;
  498. break;
  499. }
  500. s++;
  501. }
  502. if (needquote) {
  503. append(quote);
  504. const char *s = data;
  505. for (unsigned l=len;l>0;l--) {
  506. if ((l>=ql)&&(memcmp(quote.get(),s,ql)==0))
  507. append(quote);
  508. append(*(s++));
  509. }
  510. append(quote);
  511. }
  512. else
  513. append(len,data);
  514. }
  515. prefix = separator;
  516. }
  517. void CSVOutputStream::writeHeaderLn(size32_t len, const char * data)
  518. {
  519. append(len,data);
  520. if (!oldOutputFormat&&len) {
  521. size32_t tl = terminator.length();
  522. if ((tl>len)||(memcmp(data+len-tl,terminator.get(),tl)!=0))
  523. endLine();
  524. }
  525. }