Procházet zdrojové kódy

HPCC-22322 Propagate published rowTag and allow client to specify

XML and JSON published row tags were not being passed through
to dafilesrv, and there was no mechanism to allow the client to
override the row tag to be used if they needed (e.g. if there
is no published row tag).

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith před 6 roky
rodič
revize
5ae0d4b157

+ 61 - 10
fs/dafsserver/dafsserver.cpp

@@ -1542,28 +1542,28 @@ class CRemoteCsvReadActivity : public CRemoteExternalFormatReadActivity
 public:
 public:
     CRemoteCsvReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
     CRemoteCsvReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
     {
     {
-        maxRowSize = config.getPropInt64("maxRowSize", defaultMaxCsvRowSize) * 1024 * 1024;
-        preserveWhitespace = config.getPropBool("preserveWhitespace");
+        maxRowSize = config.getPropInt64("ActivityOptions/maxRowSize", defaultMaxCsvRowSize) * 1024 * 1024;
+        preserveWhitespace = config.getPropBool("ActivityOptions/preserveWhitespace");
 
 
-        if (!config.getProp("csvQuote", csvQuote))
+        if (!config.getProp("ActivityOptions/csvQuote", csvQuote))
         {
         {
             if (!fileDesc->queryProperties().getProp("@csvQuote", csvQuote))
             if (!fileDesc->queryProperties().getProp("@csvQuote", csvQuote))
                 csvQuote.append("\"");
                 csvQuote.append("\"");
         }
         }
-        if (!config.getProp("csvSeparate", csvSeparate))
+        if (!config.getProp("ActivityOptions/csvSeparate", csvSeparate))
         {
         {
             if (!fileDesc->queryProperties().getProp("@csvSeparate", csvSeparate))
             if (!fileDesc->queryProperties().getProp("@csvSeparate", csvSeparate))
                 csvSeparate.append("\\,");
                 csvSeparate.append("\\,");
         }
         }
-        if (!config.getProp("csvTerminate", csvTerminate))
+        if (!config.getProp("ActivityOptions/csvTerminate", csvTerminate))
         {
         {
             if (!fileDesc->queryProperties().getProp("@csvTerminate", csvTerminate))
             if (!fileDesc->queryProperties().getProp("@csvTerminate", csvTerminate))
                 csvTerminate.append("\\n,\\r\\n");
                 csvTerminate.append("\\n,\\r\\n");
         }
         }
-        if (!config.getProp("csvEscape", csvEscape))
+        if (!config.getProp("ActivityOptions/csvEscape", csvEscape))
             fileDesc->queryProperties().getProp("@csvEscape", csvEscape);
             fileDesc->queryProperties().getProp("@csvEscape", csvEscape);
 
 
-        headerLines = config.getPropInt64("headerLines"); // really this should be a published attribute too
+        headerLines = config.getPropInt64("ActivityOptions/headerLines"); // really this should be a published attribute too
     }
     }
     virtual StringBuffer &getInfoStr(StringBuffer &out) const override
     virtual StringBuffer &getInfoStr(StringBuffer &out) const override
     {
     {
@@ -1615,7 +1615,6 @@ class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, impl
     Linked<IColumnProvider> lastMatch;
     Linked<IColumnProvider> lastMatch;
     Owned<IXMLParse> xmlParser;
     Owned<IXMLParse> xmlParser;
 
 
-    StringBuffer xpath;
     bool noRoot = false;
     bool noRoot = false;
     bool useXmlContents = false;
     bool useXmlContents = false;
 
 
@@ -1820,12 +1819,15 @@ class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, impl
 
 
         return true;
         return true;
     }
     }
+protected:
+    StringBuffer xpath;
+    StringBuffer customRowTag;
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(PARENT);
     IMPLEMENT_IINTERFACE_USING(PARENT);
 
 
     CRemoteMarkupReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc, ThorActivityKind _kind) : PARENT(config, fileDesc), kind(_kind)
     CRemoteMarkupReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc, ThorActivityKind _kind) : PARENT(config, fileDesc), kind(_kind)
     {
     {
-        config.getProp("xpath", xpath);
+        config.getProp("ActivityOptions/rowTag", customRowTag);
         noRoot = config.getPropBool("noRoot");
         noRoot = config.getPropBool("noRoot");
     }
     }
     IColumnProvider *queryMatch() const { return lastMatch; }
     IColumnProvider *queryMatch() const { return lastMatch; }
@@ -1882,6 +1884,11 @@ class CRemoteXmlReadActivity : public CRemoteMarkupReadActivity
 public:
 public:
     CRemoteXmlReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKxmlread)
     CRemoteXmlReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKxmlread)
     {
     {
+        xpath.set("/Dataset/");
+        if (customRowTag.isEmpty()) // no override
+            fileDesc->queryProperties().getProp("@rowTag", xpath);
+        else
+            xpath.append(customRowTag);
     }
     }
 };
 };
 
 
@@ -1892,6 +1899,11 @@ class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
 public:
 public:
     CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
     CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
     {
     {
+        xpath.set("/");
+        if (customRowTag.isEmpty()) // no override
+            fileDesc->queryProperties().getProp("@rowTag", xpath);
+        else
+            xpath.append(customRowTag);
     }
     }
 };
 };
 
 
@@ -4308,7 +4320,46 @@ public:
          *    "f1" : "real"
          *    "f1" : "real"
          *   }
          *   }
          *  }
          *  }
-         * }
+         * OR
+         * {
+         *  "format" : "xml",
+         *  "node" : {
+         *   "kind" : "xmlread",
+         *   "fileName": "examplefilename",
+         *   "keyFilter" : "f1='1    '",
+         *   "input" : {
+         *    "f1" : "string5",
+         *    "f2" : "string5"
+         *   },
+         *   "output" : {
+         *    "f2" : "string",
+         *    "f1" : "real"
+         *   }
+         *   "ActivityOptions" : { // usually not required, options here may override file meta info.
+         *    "rowTag" : "/Dataset/OtherRow"
+         *   }
+         *  }
+         * OR
+         * {
+         *  "format" : "xml",
+         *  "node" : {
+         *   "kind" : "csvread",
+         *   "fileName": "examplefilename",
+         *   "keyFilter" : "f1='1    '",
+         *   "input" : {
+         *    "f1" : "string5",
+         *    "f2" : "string5"
+         *   },
+         *   "output" : {
+         *    "f2" : "string",
+         *    "f1" : "real"
+         *   }
+         *   "ActivityOptions" : { // usually not required, options here may override file meta info.
+         *    "csvQuote" : "\"",
+         *    "csvSeparate" : ","
+         *    "csvTerminate" : "\\n,\\r\\n",
+         *   }
+         *  }
          * OR
          * OR
          * {
          * {
          *  "format" : "xml",
          *  "format" : "xml",

+ 14 - 11
fs/dafsstream/dafsstream.cpp

@@ -138,6 +138,7 @@ class CDFUFile : public CSimpleInterfaceOf<IDFUFileAccess>, implements IDFUFileA
     DFUFileOption fileOptions = defaultFileOptions;
     DFUFileOption fileOptions = defaultFileOptions;
     StringAttr commCompType = defaultCompCompression;
     StringAttr commCompType = defaultCompCompression;
     unsigned rowStreamReplyLimitKb = 1024; // 1MB
     unsigned rowStreamReplyLimitKb = 1024; // 1MB
+    Owned<IPropertyTree> options;
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(PARENT);
     IMPLEMENT_IINTERFACE_USING(PARENT);
@@ -302,6 +303,10 @@ public:
     {
     {
         return rowStreamReplyLimitKb;
         return rowStreamReplyLimitKb;
     }
     }
+    const IPropertyTree *queryOptions() const
+    {
+        return options;
+    }
 // IDFUFileAccessExt
 // IDFUFileAccessExt
     virtual IOutputMetaData *queryMeta() const override
     virtual IOutputMetaData *queryMeta() const override
     {
     {
@@ -465,6 +470,12 @@ public:
     {
     {
         expirySecs = secs;
         expirySecs = secs;
     }
     }
+    virtual void setOption(const char *key, const char *value) override
+    {
+        if (!options)
+            options.setown(createPTree());
+        options->setProp(key, value);
+    }
 // NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
 // NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
     virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy, IOutputMetaData *outMeta, bool preserveGrouping) override;
     virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy, IOutputMetaData *outMeta, bool preserveGrouping) override;
     virtual IDFUFilePartWriter *createFilePartWriter(unsigned p) override;
     virtual IDFUFilePartWriter *createFilePartWriter(unsigned p) override;
@@ -597,7 +608,6 @@ public:
             requestTree->setProp("commCompression", file->queryCommCompressionType());
             requestTree->setProp("commCompression", file->queryCommCompressionType());
         requestNode = requestTree->setPropTree("node");
         requestNode = requestTree->setPropTree("node");
 
 
-
         // NB: these are 1 based
         // NB: these are 1 based
         requestNode->setPropInt("filePart", part+1);
         requestNode->setPropInt("filePart", part+1);
         requestNode->setPropInt("filePartCopy", copy+1);
         requestNode->setPropInt("filePartCopy", copy+1);
@@ -606,6 +616,9 @@ public:
         JBASE64_Encode(binLayout.toByteArray(), binLayout.length(), typeInfoStr, false);
         JBASE64_Encode(binLayout.toByteArray(), binLayout.length(), typeInfoStr, false);
         requestNode->setProp("inputBin", typeInfoStr.str()); // on disk meta
         requestNode->setProp("inputBin", typeInfoStr.str()); // on disk meta
         requestNode->setProp("metaInfo", file->queryMetaInfoBlob());
         requestNode->setProp("metaInfo", file->queryMetaInfoBlob());
+        const IPropertyTree *options = file->queryOptions();
+        if (options)
+            requestNode->addPropTree("ActivityOptions", createPTreeFromIPT(options));
     }
     }
     virtual void beforeDispose() override
     virtual void beforeDispose() override
     {
     {
@@ -888,16 +901,6 @@ public:
             requestNode->setPropBool("outputGrouped", preserveGrouping);
             requestNode->setPropBool("outputGrouped", preserveGrouping);
         }
         }
 
 
-        // JCSMORE these are defaults, but should be picked up from file->queryFileDescriptor()
-        switch (file->queryType())
-        {
-            case dft_xml:
-                requestNode->setProp("xpath", "/Dataset/Row");
-                break;
-            case dft_json:
-                requestNode->setProp("xpath", "/Row");
-                break;
-        }
         switch (file->queryType())
         switch (file->queryType())
         {
         {
             case dft_xml:
             case dft_xml:

+ 1 - 0
fs/dafsstream/dafsstream.hpp

@@ -120,6 +120,7 @@ interface DAFSCLIENT_API IDFUFileAccess : extends IInterface
 // NB: these changes effect future creation of IDFUFilePartReader or IDFUFilePartWriter instances
 // NB: these changes effect future creation of IDFUFilePartReader or IDFUFilePartWriter instances
     virtual void setStreamReplyLimitK(unsigned k) = 0;
     virtual void setStreamReplyLimitK(unsigned k) = 0;
     virtual void setExpirySecs(unsigned secs) = 0;
     virtual void setExpirySecs(unsigned secs) = 0;
+    virtual void setOption(const char *key, const char *value) = 0;
 
 
 // NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
 // NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
     virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy=0, IOutputMetaData *outMeta=nullptr, bool preserveGrouping=false) = 0;
     virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy=0, IOutputMetaData *outMeta=nullptr, bool preserveGrouping=false) = 0;