浏览代码

Merge pull request #3396 from rengolin/dfs-spray

HPCC-3092 Add ESCAPE option to CSV dataset

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 12 年之前
父节点
当前提交
aa1217b642

+ 95 - 17
common/thorhelper/csvsplitter.cpp

@@ -32,7 +32,7 @@ CSVSplitter::CSVSplitter()
     lengths = NULL;
     data = NULL;
     numQuotes = 0;
-    unquotedBuffer = NULL;
+    internalBuffer = NULL;
     maxColumns = 0;
     curUnquoted = NULL;
 }
@@ -41,7 +41,7 @@ CSVSplitter::~CSVSplitter()
 {
     delete [] lengths;
     delete [] data;
-    free(unquotedBuffer);
+    free(internalBuffer);
 }
 
 void CSVSplitter::addQuote(const char * text)
@@ -62,25 +62,29 @@ void CSVSplitter::addTerminator(const char * text)
     matcher.addEntry(text, TERMINATOR);
 }
 
+void CSVSplitter::addEscape(const char * text)
+{
+    matcher.addEntry(text, ESCAPE);
+}
 
 void CSVSplitter::reset()
 {
     matcher.reset();
     delete [] lengths;
     delete [] data;
-    free(unquotedBuffer);
+    free(internalBuffer);
     lengths = NULL;
     data = NULL;
     numQuotes = 0;
-    unquotedBuffer = NULL;
+    internalBuffer = NULL;
     maxCsvSize = 0;
 }
 
-void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators)
+void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators, const char * dfsEscapes)
 {
     reset();
     maxCsvSize = csvInfo->queryMaxSize();
-    unquotedBuffer = (byte *)malloc(maxCsvSize);
+    internalBuffer = (byte *)malloc(maxCsvSize);
 
     maxColumns = _maxColumns;
     lengths = new unsigned [maxColumns+1];      // NB: One larger to remove some tests in main loop...
@@ -127,6 +131,23 @@ void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const cha
         }
     }
 
+    // Old workunits won't have queryEscape. MORE: deprecate on the next major version
+    if (flags & ICsvParameters::supportsEscape)
+    {
+        if (dfsEscapes && (flags & ICsvParameters::defaultEscape))
+            addActionList(matcher, dfsEscapes, ESCAPE);
+        else
+        {
+            for (idx=0;;idx++)
+            {
+                const char * text = csvInfo->queryEscape(idx);
+                if (!text)
+                    break;
+                addEscape(text);
+            }
+        }
+    }
+
     //MORE Should this be configurable??
     if (!(flags & ICsvParameters::preserveWhitespace))
     {
@@ -135,14 +156,17 @@ void CSVSplitter::init(unsigned _maxColumns, ICsvParameters * csvInfo, const cha
     }
 }
 
-
-void CSVSplitter::setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip)
+void CSVSplitter::setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip, bool unescape)
 {
+    // Either quoting or escaping will use the local buffer
+    if ((quoteToStrip || unescape) &&
+        (unsigned)(curUnquoted - internalBuffer) + (unsigned)(end - start) > maxCsvSize)
+        throw MakeStringException(99, "MAXLENGTH for CSV file is not large enough");
+
+    // point to the beginning of the local (possibly changed) buffer, for escaping later
+    byte * curUnescaped = curUnquoted;
     if (quoteToStrip)
     {
-        if ((unsigned)(curUnquoted - unquotedBuffer) + (unsigned)(end - start) > maxCsvSize)
-            throw MakeStringException(99, "MAXLENGTH for CSV file is not large enough");
-
         data[curColumn] = curUnquoted;
         const byte * lastCopied = start;
         const byte *cur;
@@ -187,8 +211,42 @@ done:
     }
     else
     {
-        data[curColumn] = start;
         lengths[curColumn] = (size32_t)(end-start);
+        // Only if ESCAPEs were detected in the input
+        if (unescape)
+        {
+            // Need to copy original to a local string (using allocated buffer)
+            memcpy(curUnescaped, start, lengths[curColumn]);
+            data[curColumn] = curUnescaped;
+            // and update the buffer pointer, to re-use on next iteration
+            curUnquoted = curUnescaped + lengths[curColumn];
+        }
+        else
+        {
+            data[curColumn] = start;
+            return;
+        }
+    }
+    // Un-escape string, if necessary.
+    if (unescape)
+    {
+        byte * cur = curUnescaped; // data[curColumn] is already pointing here one way or another
+        byte * end = cur + lengths[curColumn];
+        for (; cur < end; cur++)
+        {
+            unsigned matchLen;
+            unsigned match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
+            if ((match & 255) == ESCAPE)
+            {
+                ptrdiff_t restLen = end-cur+matchLen;
+                memmove(cur, cur+matchLen, restLen);
+                end -= matchLen;
+                lengths[curColumn] -= matchLen;
+                // Avoid having cur past end
+                if (cur == end)
+                    break;
+            }
+        }
     }
 }
 
@@ -201,7 +259,8 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
     const byte * end = start + maxLength;
     const byte * firstGood = start;
     const byte * lastGood = start;
-    curUnquoted = unquotedBuffer;
+    bool lastEscape = false;
+    curUnquoted = internalBuffer;
 
     while (cur != end)
     {
@@ -214,7 +273,7 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
             lastGood = cur;
             break;
         case WHITESPACE:
-            //Skip leading whitepace
+            //Skip leading whitespace
             if (quote)
                 lastGood = cur+matchLen;
             else if (cur == firstGood)
@@ -224,9 +283,11 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
             }
             break;
         case SEPARATOR:
+            // Quoted separator
             if ((curColumn < maxColumns) && (quote == 0))
             {
-                setFieldRange(firstGood, lastGood, curColumn, quoteToStrip);
+                setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
+                lastEscape = false;
                 quoteToStrip = 0;
                 curColumn++;
                 firstGood = cur + matchLen;
@@ -236,7 +297,8 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
         case TERMINATOR:
             if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
             {
-                setFieldRange(firstGood, lastGood, curColumn, quoteToStrip);
+                setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
+                lastEscape = false;
                 while (++curColumn < maxColumns)
                     lengths[curColumn] = 0;
                 return (size32_t)(cur + matchLen - start);
@@ -244,6 +306,7 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
             lastGood = cur+matchLen;
             break;
         case QUOTE:
+            // Quoted quote
             if (quote == 0)
             {
                 if (cur == firstGood)
@@ -279,11 +342,25 @@ size32_t CSVSplitter::splitLine(size32_t maxLength, const byte * start)
                     lastGood = cur+matchLen;
             }
             break;
+        case ESCAPE:
+            lastEscape = true;
+            lastGood = cur+matchLen;
+            // If this escape is at the end, proceed to field range
+            if (lastGood == end)
+                break;
+
+            // Skip escape and ignore the next match
+            cur += matchLen;
+            match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
+            if ((match & 255) == NONE)
+                matchLen = 1;
+            lastGood += matchLen;
+            break;
         }
         cur += matchLen;
     }
 
-    setFieldRange(firstGood, lastGood, curColumn, quoteToStrip);
+    setFieldRange(firstGood, lastGood, curColumn, quoteToStrip, lastEscape);
     while (++curColumn < maxColumns)
         lengths[curColumn] = 0;
     return (size32_t)(end - start);
@@ -310,6 +387,7 @@ void CSVOutputStream::init(ICsvParameters * args, bool _oldOutputFormat)
     quote.set(args->queryQuote(0));
     separator.set(args->querySeparator(0));
     terminator.set(args->queryTerminator(0));
+    escape.set(args->queryEscape(0));
     oldOutputFormat = _oldOutputFormat||!quote.length();
 }
 

+ 35 - 4
common/thorhelper/csvsplitter.hpp

@@ -32,6 +32,35 @@
 #include "eclhelper.hpp"
 #include "unicode/utf.h"
 
+/**
+ * CSVSplitter - splits CSV files into fields and rows.
+ *
+ * CSV files are text based records that can have user defined syntax for quoting,
+ * escaping, separating fields and rows. According to RFC-4180, there isn't a
+ * standard way of building CSV files, however, there is a set of general rules
+ * that most implementations seem to follow. This makes it hard to implement a CSV
+ * parser, since even if you follow the RFC, you might not read some files as the
+ * producer intended.
+ *
+ * The general rules are:
+ *  * rows are separated by EOL
+ *  * fields are separated by comma
+ *  * special text must be enclosed by quotes
+ *  * there must be a form of escaping quotes
+ *
+ * However, this implementation allows for user-specified quotes, (field) separators,
+ * terminators (row separators), whitespace and (multi-char) escaping sequences, so
+ * it should be possible to accommodate most files that deviate from the norm, while
+ * still reading the files correctly by default.
+ *
+ * One important rule is that any special behaviour should be enclosed by quotes, so
+ * you don't need to account for escaping separators or terminators when they're not
+ * themselves quoted. This, and non-matching quotes should be considered syntax error
+ * and the producer should, then, fix their output.
+ *
+ * Also, many CSV producers (including commercial databases) use slash (\) as escaping
+ * char, while the RFC mentions re-using quotes (""). We implement both.
+ */
 class THORHELPER_API CSVSplitter
 {
 public:
@@ -41,8 +70,9 @@ public:
     void addQuote(const char * text);
     void addSeparator(const char * text);
     void addTerminator(const char * text);
+    void addEscape(const char * text);
 
-    void init(unsigned maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators);
+    void init(unsigned maxColumns, ICsvParameters * csvInfo, const char * dfsQuotes, const char * dfsSeparators, const char * dfsTerminators, const char * dfsEscapes);
     void reset();
     size32_t splitLine(size32_t maxLen, const byte * start);
 
@@ -50,16 +80,16 @@ public:
     inline const byte * * queryData() { return data; }
 
 protected:
-    void setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip);
+    void setFieldRange(const byte * start, const byte * end, unsigned curColumn, unsigned quoteToStrip, bool unescape);
 
 protected:
-    enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4 };
+    enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned            maxColumns;
     StringMatcher       matcher;
     unsigned            numQuotes;
     unsigned *          lengths;
     const byte * *      data;
-    byte *              unquotedBuffer;
+    byte *              internalBuffer;
     byte *              curUnquoted;
     unsigned            maxCsvSize;
 };
@@ -83,6 +113,7 @@ protected:
     StringAttr separator;
     StringAttr terminator;
     StringAttr quote;
+    StringAttr escape;
     const char * prefix;
     bool oldOutputFormat;
 };

+ 2 - 1
common/thorhelper/thorpipe.cpp

@@ -163,7 +163,8 @@ public:
         const char * quotes = NULL;
         const char * separators = NULL;
         const char * terminators = NULL;
-        csvSplitter.init(csvTransformer->getMaxColumns(), csvInfo, quotes, separators, terminators);
+        const char * escapes = NULL;
+        csvSplitter.init(csvTransformer->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
     }
 
     virtual const void * next()

+ 1 - 1
common/thorhelper/thorxmlread.cpp

@@ -672,7 +672,7 @@ IDataVal & CCsvToRawTransformer::transform(IDataVal & result, size32_t len, cons
 {
     CSVSplitter csvSplitter;
 
-    csvSplitter.init(rowTransformer->getMaxColumns(), rowTransformer->queryCsvParameters(), NULL, NULL, NULL);
+    csvSplitter.init(rowTransformer->getMaxColumns(), rowTransformer->queryCsvParameters(), NULL, NULL, NULL, NULL);
 
     unsigned maxRecordSize = rowTransformer->queryRecordSize()->getRecordSize(NULL);
     const byte *finger = (const byte *) text;

+ 2 - 0
ecl/hql/hqlatoms.cpp

@@ -302,6 +302,7 @@ _ATOM _selectors_Atom;
 _ATOM _selectorSequence_Atom;
 _ATOM selfAtom;
 _ATOM separatorAtom;
+_ATOM escapeAtom;
 _ATOM sequenceAtom;
 _ATOM _sequence_Atom;
 _ATOM sequentialAtom;
@@ -715,6 +716,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
     MAKEATOM(template);
     MAKEATOM(terminate);
     MAKEATOM(terminator);
+    MAKEATOM(escape);
     MAKEATOM(thor);
     MAKEATOM(threshold);
     MAKEATOM(timeout);

+ 1 - 0
ecl/hql/hqlatoms.hpp

@@ -341,6 +341,7 @@ extern HQL_API _ATOM tempAtom;
 extern HQL_API _ATOM templateAtom;
 extern HQL_API _ATOM terminateAtom;
 extern HQL_API _ATOM terminatorAtom;
+extern HQL_API _ATOM escapeAtom;
 extern HQL_API _ATOM thorAtom;
 extern HQL_API _ATOM thresholdAtom;
 extern HQL_API _ATOM timeoutAtom;

+ 7 - 0
ecl/hql/hqlgram.y

@@ -411,6 +411,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   TAN
   TANH
   TERMINATOR
+  ESCAPE
   THEN
   THISNODE
   THOR
@@ -9513,6 +9514,12 @@ csvOption
                             $$.setExpr(createExprAttribute(terminatorAtom, createComma($3.getExpr(), createAttribute(quoteAtom))));
                             $$.setPosition($1);
                         }
+    | ESCAPE '(' expression ')'
+                        {
+                            parser->normalizeExpression($3);
+                            $$.setExpr(createExprAttribute(escapeAtom, $3.getExpr()));
+                            $$.setPosition($1);
+                        }
     | NOTRIM
                         {
                             $$.setExpr(createAttribute(noTrimAtom));

+ 1 - 0
ecl/hql/hqllex.l

@@ -903,6 +903,7 @@ TABLE               { RETURNSYM(TABLE); }
 TAN                 { RETURNSYM(TAN); }
 TANH                { RETURNSYM(TANH); }
 TERMINATOR          { RETURNSYM(TERMINATOR); }
+ESCAPE              { RETURNSYM(ESCAPE); }
 THEN                { RETURNSYM(THEN); }
 THISNODE            { RETURNSYM(THISNODE); }
 THOR                { RETURNSYM(THOR); }

+ 6 - 0
ecl/hqlcpp/hqlhtcpp.cpp

@@ -9141,6 +9141,7 @@ void HqlCppTranslator::buildCsvParameters(BuildCtx & subctx, IHqlExpression * cs
     IHqlExpression * headerAttr = queryProperty(headerAtom, attrs);
     IHqlExpression * terminator = queryProperty(terminatorAtom, attrs);
     IHqlExpression * separator = queryProperty(separatorAtom, attrs);
+    IHqlExpression * escape = queryProperty(escapeAtom, attrs);
     if (headerAttr)
     {
         IHqlExpression * header = queryRealChild(headerAttr, 0);
@@ -9199,11 +9200,16 @@ void HqlCppTranslator::buildCsvParameters(BuildCtx & subctx, IHqlExpression * cs
     buildCsvListFunc(classctx, "queryQuote", queryProperty(quoteAtom, attrs), isReading ? "'" : NULL);
     buildCsvListFunc(classctx, "querySeparator", separator, ",");
     buildCsvListFunc(classctx, "queryTerminator", terminator, isReading ? "\r\n|\n" : "\n");
+    buildCsvListFunc(classctx, "queryEscape", escape, NULL);
 
     StringBuffer flags;
+    // Backward compatible option hasEscape should be deprecated in next major version
+    flags.append("|supportsEscape");
+    // Proper flags
     if (!queryProperty(quoteAtom, attrs))       flags.append("|defaultQuote");
     if (!queryProperty(separatorAtom, attrs))   flags.append("|defaultSeparate");
     if (!queryProperty(terminatorAtom, attrs))  flags.append("|defaultTerminate");
+    if (!queryProperty(escapeAtom, attrs))      flags.append("|defaultEscape");
     if (singleHeader)                           flags.append("|singleHeaderFooter");
     if (manyHeader)                             flags.append("|manyHeaderFooter");
     if (queryProperty(noTrimAtom, attrs))       flags.append("|preserveWhitespace");

+ 4 - 1
ecl/hthor/hthor.cpp

@@ -860,6 +860,7 @@ void CHThorCsvWriteActivity::setFormat(IFileDescriptor * desc)
     desc->queryProperties().setProp("@csvSeparate", separator.str());
     desc->queryProperties().setProp("@csvQuote", csvInfo->queryQuote(0));
     desc->queryProperties().setProp("@csvTerminate", csvInfo->queryTerminator(0));
+    desc->queryProperties().setProp("@csvEscape", csvInfo->queryEscape(0));
     desc->queryProperties().setProp("@format","utf8n");
 }
 
@@ -8489,6 +8490,7 @@ void CHThorCsvReadActivity::gatherInfo(IFileDescriptor * fd)
     const char * quotes = NULL;
     const char * separators = NULL;
     const char * terminators = NULL;
+    const char * escapes = NULL;
     IDistributedFile * dFile = ldFile?ldFile->queryDistributedFile():NULL;
     if (dFile)  //only makes sense for distributed (non local) files
     {
@@ -8496,8 +8498,9 @@ void CHThorCsvReadActivity::gatherInfo(IFileDescriptor * fd)
         quotes = options.queryProp("@csvQuote");
         separators = options.queryProp("@csvSeparate");
         terminators = options.queryProp("@csvTerminate");
+        escapes = options.queryProp("@csvEscape");
     }
-    csvSplitter.init(helper.getMaxColumns(), csvInfo, quotes, separators, terminators);
+    csvSplitter.init(helper.getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
 }
 
 void CHThorCsvReadActivity::calcFixedDiskRecordSize()

+ 3 - 1
ecl/hthor/hthorkey.cpp

@@ -2455,12 +2455,14 @@ public:
         const char * quotes = NULL;
         const char * separators = NULL;
         const char * terminators = NULL;
+        const char * escapes = NULL;
         if (dFile)
         {
             IPropertyTree & options = dFile->queryAttributes();
             quotes = options.queryProp("@csvQuote");
             separators = options.queryProp("@csvSeparate");
             terminators = options.queryProp("@csvTerminate");
+            escapes = options.queryProp("@csvEscape");
             agent.logFileAccess(dFile, "HThor", "READ");
         }
         else
@@ -2471,7 +2473,7 @@ public:
             agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
         }
             
-        csvSplitter.init(_arg.getMaxColumns(), csvInfo, quotes, separators, terminators);
+        csvSplitter.init(_arg.getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
     }
 
     ~CHThorCsvFetchActivity()

+ 6 - 2
roxie/ccd/ccdactivities.cpp

@@ -1455,6 +1455,7 @@ public:
         const char *quotes = NULL;
         const char *separators = NULL;
         const char *terminators = NULL;
+        const char *escapes = NULL;
         CSVSplitter csvSplitter;
         if (datafile)
         {
@@ -1464,9 +1465,10 @@ public:
                 quotes = options->queryProp("@csvQuote");
                 separators = options->queryProp("@csvSeparate");
                 terminators = options->queryProp("@csvTerminate");
+                escapes = options->queryProp("@csvEscape");
             }
         }
-        csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators);
+        csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
         while (!aborted)
         {
             // MORE - there are rumours of a  csvSplitter that operates on a stream... if/when it exists, this should use it
@@ -4383,6 +4385,7 @@ public:
         const char * quotes = NULL;
         const char * separators = NULL;
         const char * terminators = NULL;
+        const char * escapes = NULL;
 
         const IResolvedFile *fileInfo = varFileInfo ? varFileInfo : factory->datafile;
         if (fileInfo)
@@ -4393,12 +4396,13 @@ public:
                 quotes = options->queryProp("@csvQuote");
                 separators = options->queryProp("@csvSeparate");
                 terminators = options->queryProp("@csvTerminate");
+                escapes = options->queryProp("@csvEscape");
             }
         }
 
         IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
         ICsvParameters *csvInfo = h->queryCsvParameters();
-        csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators);
+        csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators, escapes);
     }
 
     virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)

+ 10 - 5
roxie/ccd/ccdserver.cpp

@@ -10921,6 +10921,7 @@ public:
         props.setProp("@csvSeparate", separator.str());
         props.setProp("@csvQuote", csvParameters->queryQuote(0));
         props.setProp("@csvTerminate", csvParameters->queryTerminator(0));
+        props.setProp("@csvEscape", csvParameters->queryEscape(0));
     }
 
     virtual bool isOutputTransformed() const { return true; }
@@ -20036,12 +20037,13 @@ class CRoxieServerCsvReadActivity : public CRoxieServerDiskReadBaseActivity
     const char *quotes;
     const char *separators;
     const char *terminators;
+    const char *escapes;
 public:
     CRoxieServerCsvReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId,
                                 unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager,
-                                const char *_quotes, const char *_separators, const char *_terminators)
+                                const char *_quotes, const char *_separators, const char *_terminators, const char *_escapes)
         : CRoxieServerDiskReadBaseActivity(_factory, _probeManager, _remoteId, _numParts, _isLocal, _sorted, _maySkip, _manager),
-          quotes(_quotes), separators(_separators), terminators(_terminators)
+          quotes(_quotes), separators(_separators), terminators(_terminators), escapes(_escapes)
     {
         compoundHelper = NULL;
         readHelper = (IHThorCsvReadArg *)&helper;
@@ -20073,9 +20075,10 @@ public:
                         quotes = options->queryProp("@csvQuote");
                         separators = options->queryProp("@csvSeparate");
                         terminators = options->queryProp("@csvTerminate");
+                        escapes = options->queryProp("@csvEscape");
                     }
                 }
-                csvSplitter.init(readHelper->getMaxColumns(), csvInfo, quotes, separators, terminators);
+                csvSplitter.init(readHelper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
             }
         }
     }
@@ -20593,6 +20596,7 @@ public:
     const char *quotes;
     const char *separators;
     const char *terminators;
+    const char *escapes;
 
     CRoxieServerDiskReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode)
         : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind), remoteId(_remoteId)
@@ -20602,7 +20606,7 @@ public:
         sorted = (helper->getFlags() & TDRunsorted) == 0;
         variableFileName = (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0;
         maySkip = (helper->getFlags() & (TDRkeyedlimitskips|TDRkeyedlimitcreates|TDRlimitskips|TDRlimitcreates)) != 0;
-        quotes = separators = terminators = NULL;
+        quotes = separators = terminators = escapes = NULL;
         if (!variableFileName)
         {
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
@@ -20624,6 +20628,7 @@ public:
                         quotes = options->queryProp("@csvQuote");
                         separators = options->queryProp("@csvSeparate");
                         terminators = options->queryProp("@csvTerminate");
+                        escapes = options->queryProp("@csvEscape");
                     }
                 }
                 else
@@ -20639,7 +20644,7 @@ public:
         {
         case TAKcsvread:
             return new CRoxieServerCsvReadActivity(this, _probeManager, remoteId, numParts, isLocal, sorted, maySkip, manager,
-                                                   quotes, separators, terminators);
+                                                   quotes, separators, terminators, escapes);
         case TAKxmlread:
             return new CRoxieServerXmlReadActivity(this, _probeManager, remoteId, numParts, isLocal, sorted, maySkip, manager);
         case TAKdiskread:

+ 13 - 1
rtl/include/eclhelper.hpp

@@ -1912,7 +1912,18 @@ typedef IHThorLocalResultWriteArg IHThorLocalResultSpillArg;
 
 struct ICsvParameters
 {
-    enum { defaultQuote = 1, defaultSeparate = 2, defaultTerminate = 4, hasUnicode = 8, singleHeaderFooter = 16, preserveWhitespace = 32, manyHeaderFooter = 64, }; // flags values
+    enum
+    {
+        defaultQuote =        0x0001,
+        defaultSeparate =     0x0002,
+        defaultTerminate =    0x0004,
+        hasUnicode =          0x0008,
+        singleHeaderFooter =  0x0010,
+        preserveWhitespace =  0x0020,
+        manyHeaderFooter =    0x0040,
+        defaultEscape =       0x0080,
+        supportsEscape =      0x0100, // MORE: deprecate on next major version
+    }; // flags values
     virtual unsigned     getFlags() = 0;
     virtual bool         queryEBCDIC() = 0;
     virtual const char * queryHeader()              { return NULL; }
@@ -1921,6 +1932,7 @@ struct ICsvParameters
     virtual const char * queryQuote(unsigned idx) = 0;
     virtual const char * querySeparator(unsigned idx) = 0;
     virtual const char * queryTerminator(unsigned idx) = 0;
+    virtual const char * queryEscape(unsigned idx) = 0;
     virtual const char * queryFooter()              { return NULL; }
 };
 

+ 76 - 0
testing/ecl/csv-escaped.ecl

@@ -0,0 +1,76 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+rec := RECORD
+  string foo;
+  integer id;
+  string bar;
+END;
+
+// Default is no escape
+orig := DATASET([{'this is an \\\'escaped\\\' string', 10, 'while this is not'}], rec);
+OUTPUT(orig, , 'regress::csv-orig', OVERWRITE, CSV);
+escaped := DATASET('regress::csv-orig', rec, CSV);
+OUTPUT(escaped);
+
+// Standard escape
+orig2 := DATASET([{'this is an \\\'escaped\\\' string', 10, 'while this is not'}], rec);
+OUTPUT(orig2, , 'regress::csv-escaped', OVERWRITE, CSV);
+escaped2 := DATASET('regress::csv-escaped', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped2);
+
+// Multi-char escape
+orig3 := DATASET([{'this is an -=-\'escaped-=-\' string', 10, 'while this is not'}], rec);
+OUTPUT(orig3, , 'regress::csv-escaped-multi', OVERWRITE, CSV);
+escaped3 := DATASET('regress::csv-escaped-multi', rec, CSV(ESCAPE('-=-')));
+OUTPUT(escaped3);
+
+// Escape the escape
+orig4 := DATASET([{'escape the \\\\ escape', 10, 'escape at the end \\\\'}], rec);
+OUTPUT(orig4, , 'regress::csv-escaped-escape', OVERWRITE, CSV);
+escaped4 := DATASET('regress::csv-escaped-escape', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped4);
+
+// Multi-escapes in a row
+orig5 := DATASET([{'multiple escapes \\\\\\\\ in a row', 10, 'multiple at end \\\\\\\\'}], rec);
+OUTPUT(orig5, , 'regress::csv-escaped-many', OVERWRITE, CSV);
+escaped5 := DATASET('regress::csv-escaped-many', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped5);
+
+// Many escapes
+orig6 := DATASET([{'many escapes like \\\'\\\' \\\'  \\\' and \\\\\\\\ \\\\ \\\\  \\\\  \\\\ escape', 10, 'escape at the end \\\''}], rec);
+OUTPUT(orig6, , 'regress::csv-escaped-many-more', OVERWRITE, CSV);
+escaped6 := DATASET('regress::csv-escaped-many-more', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped6);
+
+// Escape separator
+orig7 := DATASET([{'escaping \\, the \\,\\, \\, \\, separator', 10, 'escape at the end \\,'}], rec);
+OUTPUT(orig7, , 'regress::csv-escaped-separator', OVERWRITE, CSV);
+escaped7 := DATASET('regress::csv-escaped-separator', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped7);
+
+// Escape with quotes
+orig8 := DATASET([{'\'escaping\'\'the quote\'', 10, 'au naturel'}], rec);
+OUTPUT(orig8, , 'regress::csv-escaped-escaped', OVERWRITE, CSV);
+escaped8 := DATASET('regress::csv-escaped-escaped', rec, CSV);
+OUTPUT(escaped8);
+
+// Escape with quotes with ESCAPE()
+orig9 := DATASET([{'\'escaping\'\'the quote\'', 10, 'with user defined escape'}], rec);
+OUTPUT(orig9, , 'regress::csv-escaped-escaped2', OVERWRITE, CSV);
+escaped9 := DATASET('regress::csv-escaped-escaped2', rec, CSV(ESCAPE('\\')));
+OUTPUT(escaped9);

+ 45 - 0
testing/ecl/key/csv-escaped.xml

@@ -0,0 +1,45 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><foo>this is an \&apos;escaped\&apos; string</foo><id>10</id><bar>while this is not</bar></Row>
+</Dataset>
+<Dataset name='Result 3'>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><foo>this is an &apos;escaped&apos; string</foo><id>10</id><bar>while this is not</bar></Row>
+</Dataset>
+<Dataset name='Result 5'>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><foo>this is an &apos;escaped&apos; string</foo><id>10</id><bar>while this is not</bar></Row>
+</Dataset>
+<Dataset name='Result 7'>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><foo>escape the \ escape</foo><id>10</id><bar>escape at the end \</bar></Row>
+</Dataset>
+<Dataset name='Result 9'>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><foo>multiple escapes \\ in a row</foo><id>10</id><bar>multiple at end \\</bar></Row>
+</Dataset>
+<Dataset name='Result 11'>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><foo>many escapes like &apos;&apos; &apos;  &apos; and \\ \ \  \  \ escape</foo><id>10</id><bar>escape at the end &apos;</bar></Row>
+</Dataset>
+<Dataset name='Result 13'>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><foo>escaping , the ,, , , separator</foo><id>10</id><bar>escape at the end ,</bar></Row>
+</Dataset>
+<Dataset name='Result 15'>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><foo>escaping&apos;the quote</foo><id>10</id><bar>au naturel</bar></Row>
+</Dataset>
+<Dataset name='Result 17'>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><foo>escaping&apos;the quote</foo><id>10</id><bar>with user defined escape</bar></Row>
+</Dataset>

+ 2 - 0
thorlcr/activities/csvread/thcsvread.cpp

@@ -46,6 +46,8 @@ public:
             else dst.append(false);
             if (fileDesc->queryProperties().hasProp("@csvTerminate")) dst.append(true).append(fileDesc->queryProperties().queryProp("@csvTerminate"));
             else dst.append(false);
+            if (fileDesc->queryProperties().hasProp("@csvEscape")) dst.append(true).append(fileDesc->queryProperties().queryProp("@csvEscape"));
+            else dst.append(false);
         }
         if (headerLines)
         {

+ 4 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -36,7 +36,7 @@
 class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDataLink
 {
     IHThorCsvReadArg *helper;
-    StringAttr csvQuote, csvSeparate, csvTerminate;
+    StringAttr csvQuote, csvSeparate, csvTerminate, csvEscape;
     Owned<IRowStream> out;
     rowcount_t limit;
     rowcount_t stopAfter;
@@ -85,7 +85,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
             readFinished = false;
             //Initialise information...
             ICsvParameters * csvInfo = activity.helper->queryCsvParameters();
-            csvSplitter.init(activity.helper->getMaxColumns(), csvInfo, activity.csvQuote, activity.csvSeparate, activity.csvTerminate);
+            csvSplitter.init(activity.helper->getMaxColumns(), csvInfo, activity.csvQuote, activity.csvSeparate, activity.csvTerminate, activity.csvEscape);
         }
         virtual void setPart(IPartDescriptor *partDesc, unsigned partNoSerialized)
         {
@@ -315,6 +315,8 @@ public:
             if (b) data.read(csvSeparate);
             data.read(b);
             if (b) data.read(csvTerminate);
+            data.read(b);
+            if (b) data.read(csvEscape);
         }
         if (headerLines)
         {

+ 1 - 0
thorlcr/activities/diskwrite/thdiskwrite.cpp

@@ -77,6 +77,7 @@ public:
         props.setProp("@csvSeparate", separator.str());
         props.setProp("@csvQuote", csvParameters->queryQuote(0));
         props.setProp("@csvTerminate", csvParameters->queryTerminator(0));
+        props.setProp("@csvEscape", csvParameters->queryEscape(0));
 
         CWriteMasterBase::done(); // will publish
     }

+ 2 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -542,7 +542,8 @@ public:
         const char * quotes = lFProps->hasProp("@csvQuote")?lFProps->queryProp("@csvQuote"):NULL;
         const char * separators = lFProps->hasProp("@csvSeparate")?lFProps->queryProp("@csvSeparate"):NULL;
         const char * terminators = lFProps->hasProp("@csvTerminate")?lFProps->queryProp("@csvTerminate"):NULL;      
-        csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators);
+        const char * escapes = lFProps->hasProp("@csvEscape")?lFProps->queryProp("@csvEscape"):NULL;
+        csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
     }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     {