소스 검색

Merge pull request #4835 from AttilaVamos/HPCC-9481-feature

HPCC-9481 CSV record definition auto discovery

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 년 전
부모
커밋
52bfef3407

+ 3 - 0
dali/dfu/dfurun.cpp

@@ -1214,6 +1214,9 @@ public:
                         if (options->getFailIfNoSourceFile())
                             opttree->setPropBool("@failIfNoSourceFile", true);
 
+                        if (options->getRecordStructurePresent())
+                            opttree->setPropBool("@recordStructurePresent", true);
+
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
                         if (fdesc) {
                             if (options->getSubfileCopy()) {// need to set destination compressed or not

+ 9 - 0
dali/dfu/dfuwu.cpp

@@ -2028,6 +2028,15 @@ public:
         queryRoot()->setPropBool("@failIfNoSourceFile",val);
     }
 
+    bool getRecordStructurePresent() const
+    {
+        return queryRoot()->getPropBool("@recordStructurePresent");
+    }
+
+    void setRecordStructurePresent(bool val)
+    {
+        queryRoot()->setPropBool("@recordStructurePresent",val);
+    }
 };
 
 class CExceptionIterator: public CInterface, implements IExceptionIterator

+ 3 - 0
dali/dfu/dfuwu.hpp

@@ -168,6 +168,8 @@ interface IConstDFUoptions : extends IInterface
 
     virtual IPropertyTree *queryTree() const = 0;                   // used by DFU server
     virtual bool getFailIfNoSourceFile() const = 0;
+
+    virtual bool getRecordStructurePresent() const = 0;
 };
 
 interface IDFUoptions : extends IConstDFUoptions
@@ -203,6 +205,7 @@ interface IDFUoptions : extends IConstDFUoptions
     virtual void setSubfileCopy(bool val=true) = 0;                             // i.e. called by supercopy
     virtual void setEncDec(const char *enc,const char *dec) = 0;
     virtual void setFailIfNoSourceFile(bool val=false) = 0;
+    virtual void setRecordStructurePresent(bool val=false) = 0;
 };
 
 interface IConstDFUfileSpec: extends IInterface

+ 3 - 0
dali/dfuplus/dfuplus.cpp

@@ -433,6 +433,9 @@ bool CDfuPlusHelper::variableSpray(const char* srcxml,const char* srcip,const ch
     if(globals->hasProp("failIfNoSourceFile"))
         req->setFailIfNoSourceFile(globals->getPropBool("failIfNoSourceFile",false));
 
+    if(globals->hasProp("recordStructurePresent"))
+        req->setRecordStructurePresent(globals->getPropBool("recordStructurePresent",false));
+
     if(srcxml == NULL)
         info("\nVariable spraying from %s on %s to %s\n", srcfile, srcip, dstname);
     else

+ 1 - 0
dali/dfuplus/main.cpp

@@ -75,6 +75,7 @@ void handleSyntax()
     out.append("            terminator=<terminator> -- optional, default is \\r,\\r\\n\n");
     out.append("            quote=<quote> -- optional, default is '\n");
     out.append("            escape=<escape> -- optional, no default value \n");
+    out.append("            recordstructurepresent=0|1 -- optional, default is 0 (no field names in first row) \n");
     out.append("        options for xml:\n");
     out.append("            rowtag=rowTag -- required\n");
     out.append("            encoding=utf8|utf8n|utf16|utf16le|utf16be|utf32|utf32le|utf32be -- optional, default is utf8\n");

+ 71 - 0
dali/ft/daftformat.cpp

@@ -161,6 +161,17 @@ void CPartitioner::setTarget(IOutputProcessor * _target)
     target.set(_target);
 }
 
+void CPartitioner::setRecordStructurePresent(bool _recordStructurePresent)
+{
+
+}
+
+void CPartitioner::getRecordStructure(StringBuffer & _recordStructure)
+{
+    _recordStructure.clear();
+}
+
+
 //----------------------------------------------------------------------------
 
 
@@ -572,6 +583,40 @@ CCsvPartitioner::CCsvPartitioner(const FileFormat & _format) : CInputBasePartiti
 
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
+    recordStructure.append("RECORD\n");
+    isRecordStructurePresent = false;
+    fieldCount = 0;
+    isFirstRow = true;
+}
+
+void CCsvPartitioner::storeFieldName(const char * start, unsigned len)
+{
+    ++fieldCount;
+    recordStructure.append("    STRING ");
+    // If record structure present in the first row and we have at least one character
+    // long string then it will be this field name.
+    // Otherwise we use "fieldx" (where x is the number of this field) as name.
+    // This prevents to generate wrong record structure if field name(s) missing:
+    // e.g: first row -> fieldA,fieldB,,fieldC,\n
+
+    // Check the field name
+    StringBuffer fieldName;
+    fieldName.append(start, 0, len);
+    fieldName.trim();
+
+    if (isRecordStructurePresent && (0 < fieldName.length() ))
+    {
+        fieldName.replace('-', '_');
+        fieldName.replace(' ', '_');
+
+        recordStructure.append(fieldName);
+    }
+    else
+    {
+        recordStructure.append("field");
+        recordStructure.append(fieldCount);
+    }
+    recordStructure.append(";\n");
 }
 
 size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer, bool ateof)
@@ -610,6 +655,11 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
             // Quoted separator
             if (quote == 0)
             {
+                if (isFirstRow)
+                {
+                    storeFieldName((const char*)firstGood, lastGood-firstGood);
+                }
+
                 lastEscape = false;
                 quoteToStrip = 0;
                 firstGood = cur + matchLen;
@@ -619,6 +669,17 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
         case TERMINATOR:
             if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
             {
+               if (isFirstRow)
+               {
+                   // TODO For further improvement we can use second
+                   // row to check discovered record structure (field count).
+                   isFirstRow = false;
+
+                   // Process last field
+                   storeFieldName((const char*)firstGood, lastGood-firstGood);
+                   recordStructure.append("END;");
+               }
+
                if (processFullBuffer)
                {
                    last = cur + matchLen;
@@ -1399,6 +1460,16 @@ void CRemotePartitioner::setSource(unsigned _whichInput, const RemoteFilename &
     decryptKey.set(_decryptKey);
 }
 
+void CRemotePartitioner::setRecordStructurePresent(bool _recordStructurePresent)
+{
+
+}
+
+void CRemotePartitioner::getRecordStructure(StringBuffer & _recordStructure)
+{
+    _recordStructure.clear();
+}
+
 
 //== Output Processors ======================================================
 

+ 2 - 0
dali/ft/daftformat.hpp

@@ -58,6 +58,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts) = 0;
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey) = 0;
     virtual void setTarget(IOutputProcessor * _target) = 0;
+    virtual void setRecordStructurePresent(bool _recordStructurePresent) = 0;
+    virtual void getRecordStructure(StringBuffer & _recordStructure) = 0;
 };
 
 interface IFormatProcessor : public IFormatPartitioner

+ 15 - 0
dali/ft/daftformat.ipp

@@ -37,6 +37,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts);
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey);
     virtual void setTarget(IOutputProcessor * _target);
+    virtual void setRecordStructurePresent(bool _recordStructurePresent);
+    virtual void getRecordStructure(StringBuffer & _recordStructure);
 
 protected:
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor) = 0;
@@ -230,6 +232,9 @@ public:
 
     virtual void setTarget(IOutputProcessor * _target);
 
+    virtual void getRecordStructure(StringBuffer & _recordStructure) { _recordStructure = recordStructure; }
+    virtual void setRecordStructurePresent( bool _isRecordStructurePresent) {isRecordStructurePresent = _isRecordStructurePresent;}
+
 protected:
     virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer, bool ateof);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
@@ -238,11 +243,19 @@ protected:
         return getSplitRecordSize(record,maxToRead,processFullBuffer,true);
     }
 
+private:
+	void storeFieldName(const char * start, unsigned len);
+	
 protected:
     enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned        maxElementLength;
     FileFormat      format;
     StringMatcher   matcher;
+
+    bool            isRecordStructurePresent;
+    StringBuffer    recordStructure;
+    unsigned        fieldCount;
+    bool            isFirstRow;
 };
 
 
@@ -410,6 +423,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts);
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey);
     virtual void setTarget(IOutputProcessor * _target) { UNIMPLEMENTED; }
+    virtual void setRecordStructurePresent(bool _recordStructurePresent);
+    virtual void getRecordStructure(StringBuffer & _recordStructure);
 
 protected:
     void callRemote();

+ 12 - 0
dali/ft/filecopy.cpp

@@ -1131,6 +1131,11 @@ void FileSprayer::calculateSprayPartition()
         const SocketEndpoint & ep = cur.filename.queryEndpoint();
         IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);
 
+
+        // CSV record structure discovery of every source
+        bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
+        partitioner->setRecordStructurePresent(isRecordStructurePresent);
+
         RemoteFilename name;
         name.set(cur.filename);
         setCanAccessDirectly(name);
@@ -1160,6 +1165,13 @@ void FileSprayer::calculateSprayPartition()
 
     ForEachItemIn(idx2, partitioners)
         partitioners.item(idx2).getResults(partition);
+
+    // Store discovered CSV record structure into target logical file.
+    StringBuffer recStru;
+    partitioners.item(0).getRecordStructure(recStru);
+    IDistributedFile * target = distributedTarget.get();
+    target->setECL(recStru.str());
+
 }
 
 void FileSprayer::calculateOutputOffsets()

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 8 - 8
ecllibrary/std/File.ecl


+ 6 - 0
esp/scm/ws_fs.ecm

@@ -82,6 +82,8 @@ ESPStruct [nil_remove] DFUWorkunit
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 };
 
 ESPStruct DFUException
@@ -304,6 +306,8 @@ ESPrequest [nil_remove] SprayFixed
     bool   wrap(false);
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 
 };
 
@@ -349,6 +353,8 @@ ESPrequest [nil_remove] SprayVariable
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
+    
+    [min_ver("1.09")] bool recordStructurePresent(false);
 
 };
 

+ 6 - 0
esp/services/ws_fs/ws_fsService.cpp

@@ -1954,6 +1954,9 @@ bool CFileSprayEx::onSprayFixed(IEspContext &context, IEspSprayFixed &req, IEspS
         if (req.getFailIfNoSourceFile())
             options->setFailIfNoSourceFile(true);
 
+        if (req.getRecordStructurePresent())
+            options->setRecordStructurePresent(true);
+
         resp.setWuid(wu->queryId());
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
         submitDFUWorkUnit(wu.getClear());
@@ -2117,6 +2120,9 @@ bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req,
         if (req.getFailIfNoSourceFile())
             options->setFailIfNoSourceFile(true);
 
+        if (req.getRecordStructurePresent())
+            options->setRecordStructurePresent(true);
+
         resp.setWuid(wu->queryId());
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
         submitDFUWorkUnit(wu.getClear());

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 20 - 7
plugins/fileservices/fileservices.cpp


파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 2 - 0
plugins/fileservices/fileservices.hpp