Переглянути джерело

HPCC-17927 Couchbase plugin: Add support for parsing SET datatypes in query results

Signed-off-by: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Dan S. Camper 8 роки тому
батько
коміт
ad1e215094
2 змінених файлів з 81 додано та 42 видалено
  1. 69 22
      plugins/couchbase/couchbaseembed.cpp
  2. 12 20
      plugins/couchbase/couchbaseembed.hpp

+ 69 - 22
plugins/couchbase/couchbaseembed.cpp

@@ -897,18 +897,45 @@ namespace couchbaseembed
         }
     }
 
+    void CouchbaseRowBuilder::processBeginSet(const RtlFieldInfo * field, bool &isAll)
+    {
+        isAll = false; // ALL not supported
+
+        const char * xpath = xpathOrName(field);
+
+        if (xpath && *xpath)
+        {
+            PathTracker     newPathNode(xpath, CPNTSet);
+            StringBuffer    newXPath;
+
+            constructNewXPath(newXPath, xpath);
+
+            newPathNode.childCount = m_oResultRow->getCount(newXPath);
+            m_pathStack.push_back(newPathNode);
+        }
+        else
+        {
+            failx("processBeginSet: Field name or xpath missing");
+        }
+    }
+
+    bool CouchbaseRowBuilder::processNextSet(const RtlFieldInfo * field)
+    {
+        return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount;
+    }
+
     void CouchbaseRowBuilder::processBeginDataset(const RtlFieldInfo * field)
     {
         const char * xpath = xpathOrName(field);
 
         if (xpath && *xpath)
         {
-            PathTracker     newPathNode(xpath, true);
+            PathTracker     newPathNode(xpath, CPNTDataset);
             StringBuffer    newXPath;
 
             constructNewXPath(newXPath, xpath);
 
-            newPathNode.childDatasetRowCount = m_oResultRow->getCount(newXPath);
+            newPathNode.childCount = m_oResultRow->getCount(newXPath);
             m_pathStack.push_back(newPathNode);
         }
         else
@@ -926,9 +953,9 @@ namespace couchbaseembed
             if (strncmp(xpath, "<nested row>", 12) == 0)
             {
                 // Row within child dataset
-                if (m_pathStack.back().isDataset)
+                if (m_pathStack.back().nodeType == CPNTDataset)
                 {
-                    m_pathStack.back().currentDatasetRecord++;
+                    m_pathStack.back().currentChildIndex++;
                 }
                 else
                 {
@@ -937,7 +964,7 @@ namespace couchbaseembed
             }
             else
             {
-                m_pathStack.push_back(PathTracker(xpath, false));
+                m_pathStack.push_back(PathTracker(xpath, CPNTScalar));
             }
         }
         else
@@ -948,7 +975,17 @@ namespace couchbaseembed
 
     bool CouchbaseRowBuilder::processNextRow(const RtlFieldInfo * field)
     {
-        return m_pathStack.back().childDatasetRowsProcessed < m_pathStack.back().childDatasetRowCount;
+        return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount;
+    }
+
+    void CouchbaseRowBuilder::processEndSet(const RtlFieldInfo * field)
+    {
+        const char * xpath = xpathOrName(field);
+
+        if (xpath && *xpath && !m_pathStack.empty() && strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
+        {
+            m_pathStack.pop_back();
+        }
     }
 
     void CouchbaseRowBuilder::processEndDataset(const RtlFieldInfo * field)
@@ -957,9 +994,9 @@ namespace couchbaseembed
 
         if (xpath && *xpath)
         {
-        	if (!m_pathStack.empty() && strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
-        	{
-            	m_pathStack.pop_back();
+            if (!m_pathStack.empty() && strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
+            {
+                m_pathStack.pop_back();
             }
         }
         else
@@ -974,16 +1011,16 @@ namespace couchbaseembed
 
         if (xpath && *xpath)
         {
-        	if (!m_pathStack.empty())
-        	{
-				if (m_pathStack.back().isDataset)
-				{
-					m_pathStack.back().childDatasetRowsProcessed++;
-				}
-				else if (strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
-				{
-					m_pathStack.pop_back();
-				}
+            if (!m_pathStack.empty())
+            {
+                if (m_pathStack.back().nodeType == CPNTDataset)
+                {
+                    m_pathStack.back().childrenProcessed++;
+                }
+                else if (strcmp(xpath, m_pathStack.back().nodeName.str()) == 0)
+                {
+                    m_pathStack.pop_back();
+                }
             }
         }
         else
@@ -1002,7 +1039,17 @@ namespace couchbaseembed
         }
 
         StringBuffer fullXPath;
-        constructNewXPath(fullXPath, xpath);
+
+        if (!m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet && strncmp(xpath, "<set element>", 13) == 0)
+        {
+            m_pathStack.back().currentChildIndex++;
+            constructNewXPath(fullXPath, NULL);
+            m_pathStack.back().childrenProcessed++;
+        }
+        else
+        {
+            constructNewXPath(fullXPath, xpath);
+        }
 
         return m_oResultRow->queryProp(fullXPath.str());
     }
@@ -1041,9 +1088,9 @@ namespace couchbaseembed
                     outXPath.append("/");
                 }
                 outXPath.append(iter->nodeName);
-                if (iter->isDataset)
+                if (iter->nodeType == CPNTDataset || iter->nodeType == CPNTSet)
                 {
-                    outXPath.appendf("[%d]", iter->currentDatasetRecord);
+                    outXPath.appendf("[%d]", iter->currentChildIndex);
                 }
             }
         }

+ 12 - 20
plugins/couchbase/couchbaseembed.hpp

@@ -216,26 +216,28 @@ namespace couchbaseembed
         CouchbaseConnection(const CouchbaseConnection &);
     };
 
+    enum PathNodeType {CPNTScalar, CPNTDataset, CPNTSet};
+
     struct PathTracker
     {
         StringBuffer    nodeName;
-        bool            isDataset;
-        unsigned int    currentDatasetRecord;
-        unsigned int    childDatasetRowCount;
-        unsigned int    childDatasetRowsProcessed;
+        PathNodeType    nodeType;
+        unsigned int    currentChildIndex;
+        unsigned int    childCount;
+        unsigned int    childrenProcessed;
 
         // Simple constructor
         PathTracker()
         {}
 
         // Constructor given node name and dataset bool
-        PathTracker(const StringBuffer& _nodeName, bool _isDataset)
-            :   nodeName(_nodeName), isDataset(_isDataset), currentDatasetRecord(0), childDatasetRowCount(0), childDatasetRowsProcessed(0)
+        PathTracker(const StringBuffer& _nodeName, PathNodeType _nodeType)
+            :   nodeName(_nodeName), nodeType(_nodeType), currentChildIndex(0), childCount(0), childrenProcessed(0)
         {}
 
         // Copy constructor
         PathTracker(const PathTracker& other)
-            :   nodeName(other.nodeName), isDataset(other.isDataset), currentDatasetRecord(other.currentDatasetRecord), childDatasetRowCount(other.childDatasetRowCount), childDatasetRowsProcessed(other.childDatasetRowsProcessed)
+            :   nodeName(other.nodeName), nodeType(other.nodeType), currentChildIndex(other.currentChildIndex), childCount(other.childCount), childrenProcessed(other.childrenProcessed)
         {}
     };
 
@@ -258,22 +260,12 @@ namespace couchbaseembed
         virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result);
         virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result);
         virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value);
-        virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
-        {
-            UNSUPPORTED("Embedded Couchbase support error: processBeginSet() not supported");
-        }
-        virtual bool processNextSet(const RtlFieldInfo * field)
-        {
-            UNSUPPORTED("Embedded Couchbase support error: processNextSet() not supported");
-            return false;
-        }
+        virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll);
+        virtual bool processNextSet(const RtlFieldInfo * field);
         virtual void processBeginDataset(const RtlFieldInfo * field);
         virtual void processBeginRow(const RtlFieldInfo * field);
         virtual bool processNextRow(const RtlFieldInfo * field);
-        virtual void processEndSet(const RtlFieldInfo * field)
-        {
-            UNSUPPORTED("Embedded Couchbase support error: processEndSet() not supported");
-        }
+        virtual void processEndSet(const RtlFieldInfo * field);
         virtual void processEndDataset(const RtlFieldInfo * field);
         virtual void processEndRow(const RtlFieldInfo * field);