浏览代码

HPCC-9481 CSV record definition auto discovery

Part I.

Add
- implementation for CSV record structure discover
- new parameter to signal CSV file contains field definition in first row
- new parameter handling to WU enviromnet
- new parameter handling to DFUPlus

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 11 年之前
父节点
当前提交
f104c05c5a

+ 4 - 0
dali/dfu/dfurun.cpp

@@ -1213,6 +1213,10 @@ public:
                         if (options->getFailIfNoSourceFile())
                             opttree->setPropBool("@failIfNoSourceFile", true);
 
+                        if (options->getRecordStructurePresent())
+                            opttree->setPropBool("@recordStructurePresent", true);
+
+
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
                         if (fdesc) {
                             if (options->getSubfileCopy()) {// need to set destination compressed or not

+ 9 - 0
dali/dfu/dfuwu.cpp

@@ -2028,6 +2028,15 @@ public:
         queryRoot()->setPropBool("@failIfNoSourceFile",val);
     }
 
+    bool getRecordStructurePresent() const
+    {
+        return queryRoot()->getPropBool("@recordStructurePresent");
+    }
+
+    void setRecordStructurePresent(bool val)
+    {
+        queryRoot()->setPropBool("@recordStructurePresent",val);
+    }
 };
 
 class CExceptionIterator: public CInterface, implements IExceptionIterator

+ 3 - 0
dali/dfu/dfuwu.hpp

@@ -168,6 +168,8 @@ interface IConstDFUoptions : extends IInterface
 
     virtual IPropertyTree *queryTree() const = 0;                   // used by DFU server
     virtual bool getFailIfNoSourceFile() const = 0;
+
+    virtual bool getRecordStructurePresent() const = 0;
 };
 
 interface IDFUoptions : extends IConstDFUoptions
@@ -203,6 +205,7 @@ interface IDFUoptions : extends IConstDFUoptions
     virtual void setSubfileCopy(bool val=true) = 0;                             // i.e. called by supercopy
     virtual void setEncDec(const char *enc,const char *dec) = 0;
     virtual void setFailIfNoSourceFile(bool val=false) = 0;
+    virtual void setRecordStructurePresent(bool val=false) = 0;
 };
 
 interface IConstDFUfileSpec: extends IInterface

+ 3 - 0
dali/dfuplus/dfuplus.cpp

@@ -433,6 +433,9 @@ bool CDfuPlusHelper::variableSpray(const char* srcxml,const char* srcip,const ch
     if(globals->hasProp("failIfNoSourceFile"))
         req->setFailIfNoSourceFile(globals->getPropBool("failIfNoSourceFile",false));
 
+    if(globals->hasProp("recordStructurePresent"))
+        req->setRecordStructurePresent(globals->getPropBool("recordStructurePresent",false));
+
     if(srcxml == NULL)
         info("\nVariable spraying from %s on %s to %s\n", srcfile, srcip, dstname);
     else

+ 1 - 0
dali/dfuplus/main.cpp

@@ -74,6 +74,7 @@ void handleSyntax()
     out.append("            terminator=<terminator> -- optional, default is \\r,\\r\\n\n");
     out.append("            quote=<quote> -- optional, default is '\n");
     out.append("            escape=<escape> -- optional, no default value \n");
+    out.append("            recordstructurepresent=0|1 -- optional, field names don't present in first row \n");
     out.append("        options for xml:\n");
     out.append("            rowtag=rowTag -- required\n");
     out.append("            encoding=utf8|utf8n|utf16|utf16le|utf16be|utf32|utf32le|utf32be -- optional, default is utf8\n");

+ 46 - 0
dali/ft/daftformat.cpp

@@ -572,6 +572,10 @@ CCsvPartitioner::CCsvPartitioner(const FileFormat & _format) : CInputBasePartiti
 
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
+    recordStructure.append("fileRec := RECORD\n");
+    isRecordStructurePresent = false;
+    fieldCount = 0;
+    isFirstRow = true;
 }
 
 size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer, bool ateof)
@@ -610,6 +614,26 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
             // Quoted separator
             if (quote == 0)
             {
+                if (isFirstRow)
+                {
+                    ++fieldCount;
+                    recordStructure.append("STRING ");
+                    // If record structure present in the first row and we have at least one character
+                    // long string then it will be this field name.
+                    // Otherwise we use "fieldx" (where x is the number of this field) as name.
+                    // This prevents to generate wrong record structure if field name(s) missing:
+                    // e.g: first row -> fieldA,fieldB,,fieldC,\n
+                    if (isRecordStructurePresent && (0 < lastGood-firstGood))
+                    {
+                        recordStructure.append((const char*)firstGood, 0, lastGood-firstGood);
+                    }
+                    else
+                    {
+                        recordStructure.append("field");
+                        recordStructure.append(fieldCount);
+                    }
+                    recordStructure.append(";\n");
+                }
                 lastEscape = false;
                 quoteToStrip = 0;
                 firstGood = cur + matchLen;
@@ -619,6 +643,28 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
         case TERMINATOR:
             if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
             {
+               if (isFirstRow)
+               {
+                   // TODO For further improvement we can use second
+                   // row to check discovered record structure (field count).
+                   isFirstRow = false;
+
+                   // Process last field
+                   ++fieldCount;
+                   recordStructure.append("STRING ");
+                   if (isRecordStructurePresent&& (0 < lastGood-firstGood))
+                   {
+                       recordStructure.append((const char*)firstGood, 0, lastGood-firstGood);
+                   }
+                   else
+                   {
+                       recordStructure.append("field");
+                       recordStructure.append(fieldCount);
+                   }
+                   recordStructure.append(";\n");
+                   recordStructure.append("end;");
+               }
+
                if (processFullBuffer)
                {
                    last = cur + matchLen;

+ 7 - 0
dali/ft/daftformat.ipp

@@ -229,6 +229,9 @@ public:
     CCsvPartitioner(const FileFormat & _format);
 
     virtual void setTarget(IOutputProcessor * _target);
+    virtual void getRecordStructure(StringBuffer & _recordStructure) { _recordStructure = recordStructure; }
+    virtual void setRecordStructurePresent( bool _isRecordStructurePresent) {isRecordStructurePresent = _isRecordStructurePresent;}
+    virtual unsigned getFieldCount(void) { return fieldCount; }
 
 protected:
     virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer, bool ateof);
@@ -243,6 +246,10 @@ protected:
     unsigned        maxElementLength;
     FileFormat      format;
     StringMatcher   matcher;
+    bool            isRecordStructurePresent;
+    StringBuffer    recordStructure;
+    unsigned        fieldCount;
+    bool            isFirstRow;
 };
 
 

+ 18 - 0
dali/ft/filecopy.cpp

@@ -1130,6 +1130,13 @@ void FileSprayer::calculateSprayPartition()
         const SocketEndpoint & ep = cur.filename.queryEndpoint();
         IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);
 
+        // CSV record structure discovery of every source
+        if (srcFormat.type == FFTcsv)
+        {
+            bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
+            ((CCsvQuickPartitioner *)partitioner)->setRecordStructurePresent(isRecordStructurePresent);
+        }
+
         RemoteFilename name;
         name.set(cur.filename);
         setCanAccessDirectly(name);
@@ -1159,6 +1166,17 @@ void FileSprayer::calculateSprayPartition()
 
     ForEachItemIn(idx2, partitioners)
         partitioners.item(idx2).getResults(partition);
+
+    if (srcFormat.type == FFTcsv)
+    {
+        // Store discovered CSV record structure into target logical file.
+        StringBuffer recStru;
+        CCsvQuickPartitioner * partitioner = (CCsvQuickPartitioner *)&partitioners.item(0);
+        partitioner->getRecordStructure(recStru);
+        IDistributedFile * target = distributedTarget.get();
+        target->setECL(recStru.str());
+    }
+
 }
 
 void FileSprayer::calculateOutputOffsets()

文件差异内容过多而无法显示
+ 8 - 8
ecllibrary/std/File.ecl


+ 7 - 1
esp/scm/ws_fs.ecm

@@ -82,6 +82,8 @@ ESPStruct [nil_remove] DFUWorkunit
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 };
 
 ESPStruct DFUException
@@ -303,6 +305,8 @@ ESPrequest [nil_remove] SprayFixed
     bool   wrap(false);
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 
 };
 
@@ -348,6 +352,8 @@ ESPrequest [nil_remove] SprayVariable
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 
 };
 
@@ -599,7 +605,7 @@ ESPresponse [exceptions_inline] UploadFilesResponse
 };
 
 ESPservice [
-    version("1.08"), default_client_version("1.08"),
+    version("1.09"), default_client_version("1.09"),
     exceptions_inline("./smc_xslt/exceptions.xslt")] FileSpray
 {
     ESPuses ESPstruct DFUWorkunit;

+ 6 - 0
esp/services/ws_fs/ws_fsService.cpp

@@ -1960,6 +1960,9 @@ bool CFileSprayEx::onSprayFixed(IEspContext &context, IEspSprayFixed &req, IEspS
         if (req.getFailIfNoSourceFile())
             options->setFailIfNoSourceFile(true);
 
+        if (req.getRecordStructurePresent())
+            options->setRecordStructurePresent(true);
+
         resp.setWuid(wu->queryId());
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
         submitDFUWorkUnit(wu.getClear());
@@ -2123,6 +2126,9 @@ bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req,
         if (req.getFailIfNoSourceFile())
             options->setFailIfNoSourceFile(true);
 
+        if (req.getRecordStructurePresent())
+            options->setRecordStructurePresent(true);
+
         resp.setWuid(wu->queryId());
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
         submitDFUWorkUnit(wu.getClear());

文件差异内容过多而无法显示
+ 20 - 7
plugins/fileservices/fileservices.cpp


文件差异内容过多而无法显示
+ 2 - 0
plugins/fileservices/fileservices.hpp