Browse Source

HPCC-9481 CSV record definition auto discovery

Rework aftewr second review.

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 11 years ago
parent
commit
d986db24ef
6 changed files with 73 additions and 48 deletions
  1. 0 1
      dali/dfu/dfurun.cpp
  2. 1 1
      dali/dfuplus/main.cpp
  3. 56 31
      dali/ft/daftformat.cpp
  4. 2 0
      dali/ft/daftformat.hpp
  5. 7 1
      dali/ft/daftformat.ipp
  6. 7 14
      dali/ft/filecopy.cpp

+ 0 - 1
dali/dfu/dfurun.cpp

@@ -1216,7 +1216,6 @@ public:
                         if (options->getRecordStructurePresent())
                             opttree->setPropBool("@recordStructurePresent", true);
 
-
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
                         if (fdesc) {
                             if (options->getSubfileCopy()) {// need to set destination compressed or not

+ 1 - 1
dali/dfuplus/main.cpp

@@ -74,7 +74,7 @@ void handleSyntax()
     out.append("            terminator=<terminator> -- optional, default is \\r,\\r\\n\n");
     out.append("            quote=<quote> -- optional, default is '\n");
     out.append("            escape=<escape> -- optional, no default value \n");
-    out.append("            recordstructurepresent=0|1 -- optional, field names don't present in first row \n");
+    out.append("            recordstructurepresent=0|1 -- optional, default is 0 (no field names in first row) \n");
     out.append("        options for xml:\n");
     out.append("            rowtag=rowTag -- required\n");
     out.append("            encoding=utf8|utf8n|utf16|utf16le|utf16be|utf32|utf32le|utf32be -- optional, default is utf8\n");

+ 56 - 31
dali/ft/daftformat.cpp

@@ -161,6 +161,17 @@ void CPartitioner::setTarget(IOutputProcessor * _target)
     target.set(_target);
 }
 
+void CPartitioner::setRecordStructurePresent(bool _recordStructurePresent)
+{
+
+}
+
+void CPartitioner::getRecordStructure(StringBuffer & _recordStructure)
+{
+    _recordStructure.clear();
+}
+
+
 //----------------------------------------------------------------------------
 
 
@@ -572,12 +583,42 @@ CCsvPartitioner::CCsvPartitioner(const FileFormat & _format) : CInputBasePartiti
 
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
-    recordStructure.append("fileRec := RECORD\n");
+    recordStructure.append("RECORD\n");
     isRecordStructurePresent = false;
     fieldCount = 0;
     isFirstRow = true;
 }
 
+void CCsvPartitioner::storeFieldName(const char * start, unsigned len)
+{
+    ++fieldCount;
+    recordStructure.append("    STRING ");
+    // If record structure present in the first row and we have at least one character
+    // long string then it will be this field name.
+    // Otherwise we use "fieldx" (where x is the number of this field) as name.
+    // This prevents to generate wrong record structure if field name(s) missing:
+    // e.g: first row -> fieldA,fieldB,,fieldC,\n
+
+    // Check the field name
+    StringBuffer fieldName;
+    fieldName.append(start, 0, len);
+    fieldName.trim();
+
+    if (isRecordStructurePresent && (0 < fieldName.length() ))
+    {
+        fieldName.replace('-', '_');
+        fieldName.replace(' ', '_');
+
+        recordStructure.append(fieldName);
+    }
+    else
+    {
+        recordStructure.append("field");
+        recordStructure.append(fieldCount);
+    }
+    recordStructure.append(";\n");
+}
+
 size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer, bool ateof)
 {
     //more complicated processing of quotes etc....
@@ -616,24 +657,9 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
             {
                 if (isFirstRow)
                 {
-                    ++fieldCount;
-                    recordStructure.append("STRING ");
-                    // If record structure present in the first row and we have at least one character
-                    // long string then it will be this field name.
-                    // Otherwise we use "fieldx" (where x is the number of this field) as name.
-                    // This prevents to generate wrong record structure if field name(s) missing:
-                    // e.g: first row -> fieldA,fieldB,,fieldC,\n
-                    if (isRecordStructurePresent && (0 < lastGood-firstGood))
-                    {
-                        recordStructure.append((const char*)firstGood, 0, lastGood-firstGood);
-                    }
-                    else
-                    {
-                        recordStructure.append("field");
-                        recordStructure.append(fieldCount);
-                    }
-                    recordStructure.append(";\n");
+                    storeFieldName((const char*)firstGood, lastGood-firstGood);
                 }
+
                 lastEscape = false;
                 quoteToStrip = 0;
                 firstGood = cur + matchLen;
@@ -650,19 +676,8 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
                    isFirstRow = false;
 
                    // Process last field
-                   ++fieldCount;
-                   recordStructure.append("STRING ");
-                   if (isRecordStructurePresent&& (0 < lastGood-firstGood))
-                   {
-                       recordStructure.append((const char*)firstGood, 0, lastGood-firstGood);
-                   }
-                   else
-                   {
-                       recordStructure.append("field");
-                       recordStructure.append(fieldCount);
-                   }
-                   recordStructure.append(";\n");
-                   recordStructure.append("end;");
+                   storeFieldName((const char*)firstGood, lastGood-firstGood);
+                   recordStructure.append("END;");
                }
 
                if (processFullBuffer)
@@ -1441,6 +1456,16 @@ void CRemotePartitioner::setSource(unsigned _whichInput, const RemoteFilename &
     decryptKey.set(_decryptKey);
 }
 
+void CRemotePartitioner::setRecordStructurePresent(bool _recordStructurePresent)
+{
+
+}
+
+void CRemotePartitioner::getRecordStructure(StringBuffer & _recordStructure)
+{
+    _recordStructure.clear();
+}
+
 
 //== Output Processors ======================================================
 

+ 2 - 0
dali/ft/daftformat.hpp

@@ -58,6 +58,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts) = 0;
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey) = 0;
     virtual void setTarget(IOutputProcessor * _target) = 0;
+    virtual void setRecordStructurePresent(bool _recordStructurePresent) = 0;
+    virtual void getRecordStructure(StringBuffer & _recordStructure) = 0;
 };
 
 interface IFormatProcessor : public IFormatPartitioner

+ 7 - 1
dali/ft/daftformat.ipp

@@ -37,6 +37,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts);
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey);
     virtual void setTarget(IOutputProcessor * _target);
+    virtual void setRecordStructurePresent(bool _recordStructurePresent);
+    virtual void getRecordStructure(StringBuffer & _recordStructure);
 
 protected:
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor) = 0;
@@ -232,7 +234,6 @@ public:
 
     virtual void getRecordStructure(StringBuffer & _recordStructure) { _recordStructure = recordStructure; }
     virtual void setRecordStructurePresent( bool _isRecordStructurePresent) {isRecordStructurePresent = _isRecordStructurePresent;}
-    virtual unsigned getFieldCount(void) { return fieldCount; }
 
 protected:
     virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer, bool ateof);
@@ -242,6 +243,9 @@ protected:
         return getSplitRecordSize(record,maxToRead,processFullBuffer,true);
     }
 
+private:
+	void storeFieldName(const char * start, unsigned len);
+	
 protected:
     enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned        maxElementLength;
@@ -419,6 +423,8 @@ public:
     virtual void setPartitionRange(offset_t _totalSize, offset_t _thisOffset, offset_t _thisSize, unsigned _thisHeaderSize, unsigned _numParts);
     virtual void setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool compressedInput, const char *decryptKey);
     virtual void setTarget(IOutputProcessor * _target) { UNIMPLEMENTED; }
+    virtual void setRecordStructurePresent(bool _recordStructurePresent);
+    virtual void getRecordStructure(StringBuffer & _recordStructure);
 
 protected:
     void callRemote();

+ 7 - 14
dali/ft/filecopy.cpp

@@ -1132,11 +1132,8 @@ void FileSprayer::calculateSprayPartition()
 
 
         // CSV record structure discovery of every source
-        if (srcFormat.type == FFTcsv)
-        {
-            bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
-            ((CCsvQuickPartitioner *)partitioner)->setRecordStructurePresent(isRecordStructurePresent);
-        }
+        bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
+        partitioner->setRecordStructurePresent(isRecordStructurePresent);
 
         RemoteFilename name;
         name.set(cur.filename);
@@ -1168,15 +1165,11 @@ void FileSprayer::calculateSprayPartition()
     ForEachItemIn(idx2, partitioners)
         partitioners.item(idx2).getResults(partition);
 
-    if (srcFormat.type == FFTcsv)
-    {
-        // Store discovered CSV record structure into target logical file.
-        StringBuffer recStru;
-        CCsvQuickPartitioner * partitioner = (CCsvQuickPartitioner *)&partitioners.item(0);
-        partitioner->getRecordStructure(recStru);
-        IDistributedFile * target = distributedTarget.get();
-        target->setECL(recStru.str());
-    }
+    // Store discovered CSV record structure into target logical file.
+    StringBuffer recStru;
+    partitioners.item(0).getRecordStructure(recStru);
+    IDistributedFile * target = distributedTarget.get();
+    target->setECL(recStru.str());
 
 }