소스 검색

Merge pull request #5539 from AttilaVamos/HPCC-10961-feature

HPCC-10961 Add new parameter to DFU server called "quotedTerminator" to improve speed of CSV files spray.

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 년 전
부모
커밋
43fd9643f9

+ 2 - 0
dali/dfu/dfurun.cpp

@@ -1219,6 +1219,8 @@ public:
                         if (options->getRecordStructurePresent())
                             opttree->setPropBool("@recordStructurePresent", true);
 
+                        opttree->setPropBool("@quotedTerminator", options->getQuotedTerminator());
+
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
                         if (fdesc) {
                             if (options->getSubfileCopy()) {// need to set destination compressed or not

+ 15 - 2
dali/dfu/dfuwu.cpp

@@ -1323,7 +1323,7 @@ public:
         return CDFUfileformat::decode(queryProperties()->queryProp("@format"));
     }
 
-    virtual void getCsvOptions(StringBuffer &separate,StringBuffer &terminate,StringBuffer &quote,StringBuffer &escape) const
+    virtual void getCsvOptions(StringBuffer &separate,StringBuffer &terminate,StringBuffer &quote,StringBuffer &escape,bool &quotedTerminator) const
     {
         IPropertyTree *t = queryProperties();
         const char *sep=t->queryProp("@csvSeparate");
@@ -1335,9 +1335,10 @@ public:
         const char *esc=t->queryProp("@csvEscape");
         if (esc && *esc)
             escape.set(esc);
+        quotedTerminator = t->getPropBool("@quotedTerminator", true);
     }
 
-    void setCsvOptions(const char *separate,const char *terminate,const char *quote,const char *escape)
+    void setCsvOptions(const char *separate,const char *terminate,const char *quote,const char *escape,bool quotedTerminator)
     {
         IPropertyTree *t = queryUpdateProperties();
         if (separate && *separate)
@@ -1348,6 +1349,7 @@ public:
             t->setProp("@csvQuote",quote);
         if (escape && *escape)
             t->setProp("@csvEscape",escape);
+        t->setPropBool("@quotedTerminator", quotedTerminator);
     }
 
     StringBuffer &getRowTag(StringBuffer &str)const 
@@ -2044,6 +2046,17 @@ public:
     {
         queryRoot()->setPropBool("@recordStructurePresent",val);
     }
+
+    bool getQuotedTerminator() const
+    {
+        return queryRoot()->getPropBool("@quotedTerminator");
+    }
+
+    void setQuotedTerminator(bool val)
+    {
+        queryRoot()->setPropBool("@quotedTerminator",val);
+    }
+
 };
 
 class CExceptionIterator: public CInterface, implements IExceptionIterator

+ 5 - 2
dali/dfu/dfuwu.hpp

@@ -170,6 +170,8 @@ interface IConstDFUoptions : extends IInterface
     virtual bool getFailIfNoSourceFile() const = 0;
 
     virtual bool getRecordStructurePresent() const = 0;
+
+    virtual bool getQuotedTerminator() const = 0;
 };
 
 interface IDFUoptions : extends IConstDFUoptions
@@ -206,6 +208,7 @@ interface IDFUoptions : extends IConstDFUoptions
     virtual void setEncDec(const char *enc,const char *dec) = 0;
     virtual void setFailIfNoSourceFile(bool val=false) = 0;
     virtual void setRecordStructurePresent(bool val=false) = 0;
+    virtual void setQuotedTerminator(bool val=true) = 0;
 };
 
 interface IConstDFUfileSpec: extends IInterface
@@ -224,7 +227,7 @@ interface IConstDFUfileSpec: extends IInterface
     virtual StringBuffer &getPartUrl(unsigned clustnum,unsigned partidx, StringBuffer &url,bool iskey=false) const = 0; // idx 0 based
     virtual RemoteFilename &getPartFilename(unsigned clustnum,unsigned partidx, RemoteFilename &rfn, bool iskey=false) const = 0; // idx 0 based
     virtual IPropertyTree *queryPartProperties(unsigned partidx) const = 0;
-    virtual void getCsvOptions(StringBuffer &separate,StringBuffer &terminate,StringBuffer &quote,StringBuffer &escape) const = 0;
+    virtual void getCsvOptions(StringBuffer &separate,StringBuffer &terminate,StringBuffer &quote,StringBuffer &escape,bool &quotedTerminator) const = 0;
     virtual StringBuffer &getRowTag(StringBuffer &str)const =0;
     virtual void setForeignDali(const SocketEndpoint &ep)=0; // only used for source of copy (for inter-dali copy)
     virtual bool getForeignDali(SocketEndpoint &ep) const =0;
@@ -262,7 +265,7 @@ interface IDFUfileSpec: extends IConstDFUfileSpec
     virtual void setRecordSize(size32_t size) = 0; // may need to be supplied for non 1-1 splits
     virtual void setMaxRecordSize(size32_t size) = 0; 
     virtual void setFormat(DFUfileformat format) = 0; 
-    virtual void setCsvOptions(const char *separate=NULL,const char *terminate=NULL,const char *quote=NULL,const char *escape=NULL) = 0;  // NULL for default
+    virtual void setCsvOptions(const char *separate=NULL,const char *terminate=NULL,const char *quote=NULL,const char *escape=NULL,bool quotedTerminator=true) = 0;  // NULL for default
     virtual void setRowTag(const char *str) = 0;
     virtual void setFromXML(const char *xml) = 0;
     virtual void setCompressed(bool set) = 0;

+ 3 - 0
dali/dfuplus/dfuplus.cpp

@@ -436,6 +436,9 @@ bool CDfuPlusHelper::variableSpray(const char* srcxml,const char* srcip,const ch
     if(globals->hasProp("recordStructurePresent"))
         req->setRecordStructurePresent(globals->getPropBool("recordStructurePresent",false));
 
+    if(globals->hasProp("quotedTerminator"))
+        req->setQuotedTerminator(globals->getPropBool("quotedTerminator",true));
+
     if(srcxml == NULL)
         info("\nVariable spraying from %s on %s to %s\n", srcfile, srcip, dstname);
     else

+ 1 - 0
dali/dfuplus/main.cpp

@@ -76,6 +76,7 @@ void handleSyntax()
     out.append("            quote=<quote> -- optional, default is '\n");
     out.append("            escape=<escape> -- optional, no default value \n");
     out.append("            recordstructurepresent=0|1 -- optional, default is 0 (no field names in first row) \n");
+    out.append("            quotedTerminator=1|0 -- optional, default is 1 (quoted terminators in rows) \n");
     out.append("        options for xml:\n");
     out.append("            rowtag=rowTag -- required\n");
     out.append("            encoding=utf8|utf8n|utf16|utf16le|utf16be|utf32|utf32le|utf32be -- optional, default is utf8\n");

+ 2 - 2
dali/ft/daftformat.cpp

@@ -2081,7 +2081,7 @@ IFormatPartitioner * createFormatPartitioner(const SocketEndpoint & ep, const Fi
         case FFTblocked:
             return new CSimpleBlockedPartitioner(sameFormats);
         case FFTcsv:
-            if (srcFormat.hasQuote())
+            if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
                 return new CCsvPartitioner(srcFormat);
             else
                 return new CCsvQuickPartitioner(srcFormat, sameFormats);
@@ -2091,7 +2091,7 @@ IFormatPartitioner * createFormatPartitioner(const SocketEndpoint & ep, const Fi
                 return new CXmlQuickPartitioner(srcFormat, sameFormats);
             else
             {
-                if (srcFormat.hasQuote())
+                if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
                     return new CUtfPartitioner(srcFormat);
                 else
                     return new CUtfQuickPartitioner(srcFormat, sameFormats);

+ 5 - 5
dali/ft/daftformat.ipp

@@ -268,8 +268,8 @@ class DALIFT_API CCsvQuickPartitioner : public CCsvPartitioner
 {
 public:
     CCsvQuickPartitioner(const FileFormat & _format, bool _noTranslation) 
-        : CCsvPartitioner(_format) 
-    { 
+        : CCsvPartitioner(_format)
+    {
         noTranslation = _noTranslation;
     }
 
@@ -289,7 +289,7 @@ public:
     CUtfPartitioner(const FileFormat & _format);
 
     virtual void setTarget(IOutputProcessor * _target);
-    
+
     virtual void getRecordStructure(StringBuffer & _recordStructure) { _recordStructure = recordStructure; }
     virtual void setRecordStructurePresent( bool _isRecordStructurePresent) {isRecordStructurePresent = _isRecordStructurePresent;}
 
@@ -300,7 +300,7 @@ protected:
     {
         return getSplitRecordSize(record,maxToRead,processFullBuffer,true);
     }
-    
+
 private:
     void storeFieldName(const char * start, unsigned len);
 
@@ -311,7 +311,7 @@ protected:
     StringMatcher   matcher;
     unsigned        unitSize;
     UtfReader::UtfFormat utfFormat;
-    
+
     bool            isRecordStructurePresent;
     StringBuffer    recordStructure;
     unsigned        fieldCount;

+ 1 - 0
dali/ft/filecopy.cpp

@@ -1132,6 +1132,7 @@ void FileSprayer::calculateSprayPartition()
         FilePartInfo & cur = sources.item(idx);
         cur.filename.getRemotePath(remoteFilename.clear());
 
+        srcFormat.quotedTerminator = options->getPropBool("@quotedTerminator", true);
         LOG(MCdebugInfoDetail, job, "Partition %d(%s)", idx, remoteFilename.str());
         const SocketEndpoint & ep = cur.filename.queryEndpoint();
         IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);

+ 6 - 1
dali/ft/filecopy.hpp

@@ -43,7 +43,8 @@ enum { FTactionpull, FTactionpush, FTactionpartition, FTactiondirectory, FTactio
 class DALIFT_API FileFormat
 {
 public:
-    FileFormat(FileFormatType _type = FFTunknown, unsigned _recordSize = 0) { set(_type, _recordSize); maxRecordSize = 0;}
+    FileFormat(FileFormatType _type = FFTunknown, unsigned _recordSize = 0)
+            { set(_type, _recordSize); maxRecordSize = 0; quotedTerminator = true;}
 
     void deserialize(MemoryBuffer & in);
     void deserializeExtra(MemoryBuffer & in, unsigned version);
@@ -57,6 +58,7 @@ public:
     void set(FileFormatType _type, unsigned _recordSize = 0) { type = _type, recordSize = _recordSize; }
     void set(const FileFormat & src);
     bool hasQuote() const                           { return (quote == NULL) || (*quote != '\0'); }
+    bool hasQuotedTerminator() const                { return quotedTerminator; }
 
 public:
     FileFormatType      type;
@@ -67,6 +69,9 @@ public:
     StringAttr          terminate;
     StringAttr          escape;
     StringAttr          rowTag;
+
+    //This value isn't serialized/deserialized.
+    bool                quotedTerminator;
 };
 UtfReader::UtfFormat getUtfFormatType(FileFormatType type);
 bool sameEncoding(const FileFormat & src, const FileFormat & tgt);

+ 1 - 0
dali/ft/ftbase.cpp

@@ -431,6 +431,7 @@ void FileFormat::set(const FileFormat & src)
     terminate.set(src.terminate);
     escape.set(src.escape);
     rowTag.set(src.rowTag);
+    quotedTerminator = src.quotedTerminator;
 }
 
 

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 8 - 8
ecllibrary/std/File.ecl


+ 8 - 4
esp/scm/ws_fs.ecm

@@ -82,8 +82,10 @@ ESPStruct [nil_remove] DFUWorkunit
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
-    
+
     [min_ver("1.09")] bool recordStructurePresent(false);
+
+    [min_ver("1.10")] bool quotedTerminator(true);
 };
 
 ESPStruct DFUException
@@ -306,9 +308,10 @@ ESPrequest [nil_remove] SprayFixed
     bool   wrap(false);
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
-    
+
     [min_ver("1.09")] bool recordStructurePresent(false);
 
+    [min_ver("1.10")] bool quotedTerminator(true);
 };
 
 ESPresponse [exceptions_inline] 
@@ -353,9 +356,10 @@ ESPrequest [nil_remove] SprayVariable
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
-    
+
     [min_ver("1.09")] bool recordStructurePresent(false);
 
+    [min_ver("1.10")] bool quotedTerminator(true);
 };
 
 ESPresponse [exceptions_inline] 
@@ -606,7 +610,7 @@ ESPresponse [exceptions_inline] UploadFilesResponse
 };
 
 ESPservice [
-    version("1.09"), default_client_version("1.09"),
+    version("1.10"), default_client_version("1.10"),
     exceptions_inline("./smc_xslt/exceptions.xslt")] FileSpray
 {
     ESPuses ESPstruct DFUWorkunit;

+ 7 - 2
esp/services/ws_fs/ws_fsService.cpp

@@ -355,7 +355,8 @@ static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWork
         if (version >= 1.04 && (file->getFormat() == DFUff_csv))
         {
             StringBuffer separate, terminate, quote, escape;
-            file->getCsvOptions(separate,terminate,quote, escape);
+            bool quotedTerminator;
+            file->getCsvOptions(separate,terminate,quote, escape, quotedTerminator);
             if(separate.length() > 0)
                 dest.setSourceCsvSeparate(separate.str());
             if(terminate.length() > 0)
@@ -364,6 +365,8 @@ static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWork
                 dest.setSourceCsvQuote(quote.str());
             if((version >= 1.05) && (escape.length() > 0))
                 dest.setSourceCsvEscape(escape.str());
+            if(version >=1.10)
+                dest.setQuotedTerminator(quotedTerminator);
         }
     }
 
@@ -2071,7 +2074,9 @@ bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req,
             const char* cq = req.getSourceCsvQuote();
             if(cq== NULL)
                 cq = "'";
-            source->setCsvOptions(cs, ct, cq, req.getSourceCsvEscape());
+            source->setCsvOptions(cs, ct, cq, req.getSourceCsvEscape(), req.getQuotedTerminator());
+
+            options->setQuotedTerminator(req.getQuotedTerminator());
         }
 
         destination->setLogicalName(destname);

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 21 - 9
plugins/fileservices/fileservices.cpp


파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 1 - 0
plugins/fileservices/fileservices.hpp