Parcourir la source

HPCC-17757 Couchbase Plugin: Improve parsing of complex query result

Signed-off-by: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Dan S. Camper il y a 8 ans
Parent
commit
e796a3c01d
2 fichiers modifiés avec 158 ajouts et 37 suppressions
  1. 128 33
      plugins/couchbase/couchbaseembed.cpp
  2. 30 4
      plugins/couchbase/couchbaseembed.hpp

+ 128 - 33
plugins/couchbase/couchbaseembed.cpp

@@ -71,7 +71,7 @@ namespace couchbaseembed
             else if (status.isTemporary())
                 failx("TempErr: %s", status.description());
             else
-                failx("Couchbase err: %s", status.description());
+                failx("Couchbase err: %s (%d)", status.description(), status.errcode());
         }
 
         //consider parsing json result
@@ -899,68 +899,163 @@ namespace couchbaseembed
 
     void CouchbaseRowBuilder::processBeginDataset(const RtlFieldInfo * field)
     {
-        /*
-         *
-         *childRec := RECORD real x; real y; END;
-         *parentRec := RECORD
-         * childRec child1,                        <-- flatens out the childrec, this function would receive a field of name x
-         * dataset(childRec) child2;               <-- keeps nested structure, this funciton would receive a field of name child2
-         *END;
-        */
+        const char * xpath = xpathOrName(field);
 
-        if (getNumFields(field->type->queryChildType()) > 0)
-            m_oNestedField.set(m_oResultRow->queryBranch(field->name->queryStr()));
+        if (xpath && *xpath)
+        {
+            PathTracker     newPathNode(xpath, true);
+            StringBuffer    newXPath;
+
+            constructNewXPath(newXPath, xpath);
+
+            newPathNode.childDatasetRowCount = m_oResultRow->getCount(newXPath);
+            m_pathStack.push_back(newPathNode);
+        }
+        else
+        {
+            failx("processBeginDataset: Field name or xpath missing");
+        }
     }
 
     void CouchbaseRowBuilder::processBeginRow(const RtlFieldInfo * field)
     {
-        m_fieldsProcessedCount = 0;
-        m_rowFieldCount = getNumFields(field->type);
+        const char * xpath = xpathOrName(field);
+
+        if (xpath && *xpath)
+        {
+            if (strncmp(xpath, "<nested row>", 12) == 0)
+            {
+                // Row within child dataset
+                if (m_pathStack.back().isDataset)
+                {
+                    m_pathStack.back().currentDatasetRecord++;
+                }
+                else
+                {
+                    failx("<nested row> received with no outer dataset designated");
+                }
+            }
+            else
+            {
+                m_pathStack.push_back(PathTracker(xpath, false));
+            }
+        }
+        else
+        {
+            failx("processBeginRow: Field name or xpath missing");
+        }
     }
 
     bool CouchbaseRowBuilder::processNextRow(const RtlFieldInfo * field)
     {
-        return m_fieldsProcessedCount + 1 == m_rowFieldCount;
+        return m_pathStack.back().childDatasetRowsProcessed < m_pathStack.back().childDatasetRowCount;
     }
 
     void CouchbaseRowBuilder::processEndDataset(const RtlFieldInfo * field)
     {
-        if(m_oNestedField)
-            m_oNestedField.clear();
+        const char * xpath = xpathOrName(field);
+
+        if (xpath && *xpath)
+        {
+        	if (!m_pathStack.empty() && strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
+        	{
+            	m_pathStack.pop_back();
+            }
+        }
+        else
+        {
+            failx("processEndDataset: Field name or xpath missing");
+        }
     }
 
     void CouchbaseRowBuilder::processEndRow(const RtlFieldInfo * field)
     {
-        if(m_oNestedField)
-            m_oNestedField.clear();
+        const char * xpath = xpathOrName(field);
+
+        if (xpath && *xpath)
+        {
+        	if (!m_pathStack.empty())
+        	{
+				if (m_pathStack.back().isDataset)
+				{
+					m_pathStack.back().childDatasetRowsProcessed++;
+				}
+				else if (strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
+				{
+					m_pathStack.pop_back();
+				}
+            }
+        }
+        else
+        {
+            failx("processEndRow: Field name or xpath missing");
+        }
     }
 
     const char * CouchbaseRowBuilder::nextField(const RtlFieldInfo * field)
     {
-        m_fieldsProcessedCount++;
-        if (!m_oResultRow)
-            failx("Missing result row data");
+        const char * xpath = xpathOrName(field);
+
+        if (!xpath || !*xpath)
+        {
+            failx("nextField: Field name or xpath missing");
+        }
+
+        StringBuffer fullXPath;
+        constructNewXPath(fullXPath, xpath);
+
+        return m_oResultRow->queryProp(fullXPath.str());
+    }
+
+    const char * CouchbaseRowBuilder::xpathOrName(const RtlFieldInfo * field) const
+    {
+        const char * xpath = NULL;
+
+        if (field->xpath)
+        {
+            if (field->xpath[0] == xpathCompoundSeparatorChar)
+            {
+                xpath = field->xpath + 1;
+            }
+            else
+            {
+                xpath = field->xpath;
+            }
+        }
+        else
+        {
+            xpath = str(field->name);
+        }
 
-        const char * fieldname = field->name->queryStr();
-        if (!fieldname || !*fieldname)
-            failx("Missing result column metadata (name)");
+        return xpath;
+    }
 
-        if (!m_oResultRow->hasProp(fieldname))
+    void CouchbaseRowBuilder::constructNewXPath(StringBuffer& outXPath, const char * nextNode) const
+    {
+        for (std::vector<PathTracker>::const_iterator iter = m_pathStack.begin(); iter != m_pathStack.end(); iter++)
         {
-            VStringBuffer nxpath("locationData/%s", fieldname);
-            if (m_oNestedField)
+            if (strncmp(iter->nodeName, "<row>", 5) != 0)
             {
-                if (!m_oNestedField->hasProp(fieldname))
+                if (!outXPath.isEmpty())
+                {
+                    outXPath.append("/");
+                }
+                outXPath.append(iter->nodeName);
+                if (iter->isDataset)
                 {
-                    StringBuffer xml;
-                    toXML(m_oResultRow, xml);
-                    failx("Result row does not contain field: %s: %s", fieldname, xml.str());
+                    outXPath.appendf("[%d]", iter->currentDatasetRecord);
                 }
+            }
+        }
 
-                return m_oNestedField->queryProp(fieldname);
+        if (nextNode && *nextNode)
+        {
+            if (!outXPath.isEmpty())
+            {
+                outXPath.append("/");
             }
+            outXPath.append(nextNode);
         }
-        return m_oResultRow->queryProp(fieldname);
     }
 
     class CouchbaseEmbedContext : public CInterfaceOf<IEmbedContext>

+ 30 - 4
plugins/couchbase/couchbaseembed.hpp

@@ -43,6 +43,7 @@
 #include "rtlfield_imp.hpp"
 #include "roxiemem.hpp"
 
+#include <vector>
 
 namespace couchbaseembed
 {
@@ -215,12 +216,37 @@ namespace couchbaseembed
         CouchbaseConnection(const CouchbaseConnection &);
     };
 
+    struct PathTracker
+    {
+        StringBuffer    nodeName;
+        bool            isDataset;
+        unsigned int    currentDatasetRecord;
+        unsigned int    childDatasetRowCount;
+        unsigned int    childDatasetRowsProcessed;
+
+        // Simple constructor
+        PathTracker()
+        {}
+
+        // Constructor given node name and dataset bool
+        PathTracker(const StringBuffer& _nodeName, bool _isDataset)
+            :   nodeName(_nodeName), isDataset(_isDataset), currentDatasetRecord(0), childDatasetRowCount(0), childDatasetRowsProcessed(0)
+        {}
+
+        // Copy constructor
+        PathTracker(const PathTracker& other)
+            :   nodeName(other.nodeName), isDataset(other.isDataset), currentDatasetRecord(other.currentDatasetRecord), childDatasetRowCount(other.childDatasetRowCount), childDatasetRowsProcessed(other.childDatasetRowsProcessed)
+        {}
+    };
+
     class CouchbaseRowBuilder : public CInterfaceOf<IFieldSource>
     {
     public:
-        CouchbaseRowBuilder(IPropertyTree * resultrow) :  m_fieldsProcessedCount(0), m_rowFieldCount(0)
+        CouchbaseRowBuilder(IPropertyTree * resultrow)
         {
             m_oResultRow.set(resultrow);
+            if (!m_oResultRow)
+                failx("Missing result row data");
         }
 
         virtual bool getBooleanResult(const RtlFieldInfo *field);
@@ -253,12 +279,12 @@ namespace couchbaseembed
 
     protected:
         const char * nextField(const RtlFieldInfo * field);
+        const char * xpathOrName(const RtlFieldInfo * field) const;
+        void constructNewXPath(StringBuffer& outXPath, const char * nextNode) const;
     private:
         TokenDeserializer m_tokenDeserializer;
         Owned<IPropertyTree> m_oResultRow;
-        Owned<IPropertyTree> m_oNestedField;
-        int m_fieldsProcessedCount;
-        int m_rowFieldCount;
+        std::vector<PathTracker> m_pathStack;
     };
 
     // Bind Couchbase columns from an ECL record