Przeglądaj źródła

HPCC-8862 Add support for csv escape characters to sprayVariable

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 12 lat temu
rodzic
commit
e7461362d2

+ 4 - 4
ecllibrary/std/File.ecl

@@ -353,8 +353,8 @@ EXPORT DfuPlusExec(varstring cmdline) :=
 EXPORT SprayFixed(varstring sourceIP, varstring sourcePath, integer4 recordSize, varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
     lib_fileservices.FileServices.SprayFixed(sourceIP, sourcePath, recordSize, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);
 
-EXPORT SprayVariable(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceCsvSeparate='\\,', varstring sourceCsvTerminate='\\n,\\r\\n', varstring sourceCsvQuote='\'', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
-    lib_fileservices.FileServices.SprayVariable(sourceIP, sourcePath, sourceMaxRecordSize, sourceCsvSeparate, sourceCsvTerminate, sourceCsvQuote, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);
+EXPORT SprayVariable(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceCsvSeparate='\\,', varstring sourceCsvTerminate='\\n,\\r\\n', varstring sourceCsvQuote='\'', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false, varstring sourceCsvEscape='') :=
+    lib_fileservices.FileServices.SprayVariable(sourceIP, sourcePath, sourceMaxRecordSize, sourceCsvSeparate, sourceCsvTerminate, sourceCsvQuote, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress, sourceCsvEscape);
 
 EXPORT SprayXml(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceRowTag, varstring sourceEncoding='utf8', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
     lib_fileservices.FileServices.SprayXml(sourceIP, sourcePath, sourceMaxRecordSize, sourceRowTag, sourceEncoding, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);
@@ -371,8 +371,8 @@ EXPORT Replicate(varstring logicalName, integer4 timeOut=-1, varstring espServer
 EXPORT varstring fSprayFixed(varstring sourceIP, varstring sourcePath, integer4 recordSize, varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
     lib_fileservices.FileServices.fSprayFixed(sourceIP, sourcePath, recordSize, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);
 
-EXPORT varstring fSprayVariable(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceCsvSeparate='\\,', varstring sourceCsvTerminate='\\n,\\r\\n', varstring sourceCsvQuote='\'', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
-    lib_fileservices.FileServices.fSprayVariable(sourceIP, sourcePath, sourceMaxRecordSize, sourceCsvSeparate, sourceCsvTerminate, sourceCsvQuote, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);
+EXPORT varstring fSprayVariable(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceCsvSeparate='\\,', varstring sourceCsvTerminate='\\n,\\r\\n', varstring sourceCsvQuote='\'', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false, varstring sourceCsvEscape='') :=
+    lib_fileservices.FileServices.fSprayVariable(sourceIP, sourcePath, sourceMaxRecordSize, sourceCsvSeparate, sourceCsvTerminate, sourceCsvQuote, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress, sourceCsvEscape);
 
 EXPORT varstring fSprayXml(varstring sourceIP, varstring sourcePath, integer4 sourceMaxRecordSize=8192, varstring sourceRowTag, varstring sourceEncoding='utf8', varstring destinationGroup, varstring destinationLogicalName, integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowoverwrite=false, boolean replicate=false, boolean compress=false) :=
     lib_fileservices.FileServices.fSprayXml(sourceIP, sourcePath, sourceMaxRecordSize, sourceRowTag, sourceEncoding, destinationGroup, destinationLogicalName, timeOut, espServerIpPort, maxConnections, allowoverwrite, replicate, compress);

Plik diff jest za duży
+ 25 - 8
plugins/fileservices/fileservices.cpp


+ 2 - 0
plugins/fileservices/fileservices.hpp

@@ -51,6 +51,7 @@ FILESERVICES_API char * FILESERVICES_CALL fsCmdProcess(const char *prog, const c
 FILESERVICES_API void FILESERVICES_CALL fsCmdProcess2(unsigned & tgtLen, char * & tgt, const char *prog, unsigned srcLen, const char * src);
 FILESERVICES_API void FILESERVICES_CALL fsSprayFixed(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int recordSize, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false, bool replicate=false, bool compress=false);
 FILESERVICES_API void FILESERVICES_CALL fsSprayVariable(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char * sourceCsvSeparate, const char * sourceCsvTerminate, const char * sourceCsvQuote, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false, bool replicate=false, bool compress=false);
+FILESERVICES_API void FILESERVICES_CALL fsSprayVariable2(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char * sourceCsvSeparate, const char * sourceCsvTerminate, const char * sourceCsvQuote, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress, const char * sourceCsvEscape);
 FILESERVICES_API void FILESERVICES_CALL fsSprayXml(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char *sourceRowTag, const char *sourceEncoding, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress=false);
 FILESERVICES_API void FILESERVICES_CALL fsDespray(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false);
 FILESERVICES_API void FILESERVICES_CALL fsCopy(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize);
@@ -76,6 +77,7 @@ FILESERVICES_API void  FILESERVICES_CALL fsMonitorLogicalFileName(ICodeContext *
 FILESERVICES_API void  FILESERVICES_CALL fsMonitorFile(ICodeContext *ctx, const char *eventname, const char *ip, const char *filename, bool sub, int shotcount, const char * espServerIpPort);
 FILESERVICES_API char * FILESERVICES_CALL fsfSprayFixed(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int recordSize, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false, bool replicate=false, bool compress=false);
 FILESERVICES_API char * FILESERVICES_CALL fsfSprayVariable(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char * sourceCsvSeparate, const char * sourceCsvTerminate, const char * sourceCsvQuote, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false, bool replicate=false, bool compress=false);
+FILESERVICES_API char * FILESERVICES_CALL fsfSprayVariable2(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char * sourceCsvSeparate, const char * sourceCsvTerminate, const char * sourceCsvQuote, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress, const char * sourceCsvEscape);
 FILESERVICES_API char * FILESERVICES_CALL fsfSprayXml(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char *sourceRowTag, const char *sourceEncoding, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress=false);
 FILESERVICES_API char * FILESERVICES_CALL fsfDespray(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false);
 FILESERVICES_API char * FILESERVICES_CALL fsfCopy(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize);

+ 5 - 0
testing/ecl/key/spray_test2.xml

@@ -0,0 +1,5 @@
+<Dataset name='Result 1'>
+ <Row><name>foo</name><age>10</age><good>true</good></Row>
+ <Row><name>bar</name><age>12</age><good>false</good></Row>
+ <Row><name>baz</name><age>32</age><good>true</good></Row>
+</Dataset>

+ 41 - 0
testing/ecl/spray_test2.ecl

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+import Std.File AS FileServices;
+
+// This is not an engine test, but a DFU.
+// Doesn't matter much which engine does it, so we restrict to only one
+
+//noRoxie
+//noThorLCR
+//noThor
+
+SrcIP := 'localhost';
+File := tempFiles + '/spray_test.txt';
+ClusterName := 'mythor';
+DestFile := '~regress::spray_test2.txt';
+ESPportIP := 'http://127.0.0.1:8010/FileSpray';
+
+FileServices.SprayVariable(SrcIP,
+                        File,
+                        destinationGroup := ClusterName,
+                        destinationLogicalName := DestFile,
+                        espServerIpPort := ESPportIP,
+                        ALLOWOVERWRITE := true);
+
+ds := DATASET(DestFile, { string name, unsigned age, boolean good }, csv);
+output(ds);