瀏覽代碼

Merge pull request #10333 from dcamper/hpcc-18173-couchbase-memory-mgmt

HPCC-18173 Couchbase Plugin: Solve various memory leaks

Reviewed-By: Rodrigo Pastrana <rodrigo.pastrana@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父節點
當前提交
69deb3f817
共有 2 個文件被更改,包括 68 次插入34 次删除
  1. 54 22
      plugins/couchbase/couchbaseembed.cpp
  2. 14 12
      plugins/couchbase/couchbaseembed.hpp

+ 54 - 22
plugins/couchbase/couchbaseembed.cpp

@@ -80,18 +80,17 @@ namespace couchbaseembed
     }
 
     CouchbaseRowStream::CouchbaseRowStream(IEngineRowAllocator* resultAllocator, Couchbase::Query * cbaseQuery)
-       :   m_CouchBaseQuery(cbaseQuery),
-           m_resultAllocator(resultAllocator)
+       :   m_resultAllocator(resultAllocator)
     {
         m_currentRow = 0;
         m_shouldRead = true;
 
         //iterating over result rows and copying them to stringarray
         //is there a way to independently step through original result rows?
-        for (auto cbrow : *m_CouchBaseQuery)
+        for (auto cbrow : *cbaseQuery)
             m_Rows.append(cbrow.json().to_string().c_str());
 
-        reportIfQueryFailure(m_CouchBaseQuery);
+        reportIfQueryFailure(cbaseQuery);
     }
 
     CouchbaseRowStream::~CouchbaseRowStream() {}
@@ -105,12 +104,12 @@ namespace couchbaseembed
             Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json,ipt_caseInsensitive);
             if (contentTree)
             {
-                CouchbaseRowBuilder * cbRowBuilder = new CouchbaseRowBuilder(contentTree);
+                CouchbaseRowBuilder cbRowBuilder(contentTree);
                 RtlDynamicRowBuilder rowBuilder(m_resultAllocator);
                 const RtlTypeInfo *typeInfo = m_resultAllocator->queryOutputMeta()->queryTypeInfo();
                 assertex(typeInfo);
                 RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
-                size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, *cbRowBuilder);
+                size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cbRowBuilder);
                 return rowBuilder.finalizeRowClear(len);
             }
             else
@@ -128,7 +127,7 @@ namespace couchbaseembed
    Couchbase::Query * CouchbaseConnection::query(Couchbase::QueryCommand * qcommand)
    {
        Couchbase::Status queryStatus;
-       Couchbase::Query * pQuery = new Couchbase::Query(*m_pCouchbaseClient, *qcommand, queryStatus);
+       Couchbase::Query * pQuery = new Couchbase::Query(*m_pCouchbaseClient, *qcommand, queryStatus); // will be owned by method caller
 
        if (!queryStatus)
            failx("Couldn't issue query: %s", queryStatus.description());
@@ -137,7 +136,7 @@ namespace couchbaseembed
            failx("Couldn't execute query, reason: %s\nBody is: ", pQuery->meta().body().data());
 
        if (pQuery->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
-           failx("Query execution error: %s", m_pQuery->meta().body().data());
+           failx("Query execution error: %s", pQuery->meta().body().data());
 
        return pQuery;
    }
@@ -364,8 +363,6 @@ namespace couchbaseembed
     CouchbaseEmbedFunctionContext::CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags)
     : logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags)
     {
-        cbQueryIterator = NULL;
-        m_pCouchbaseClient = nullptr;
         m_pQuery = nullptr;
         m_pQcmd = nullptr;
 
@@ -416,6 +413,21 @@ namespace couchbaseembed
         m_oCBConnection->connect();
     }
 
+    CouchbaseEmbedFunctionContext::~CouchbaseEmbedFunctionContext()
+    {
+        if (m_pQcmd)
+        {
+            delete m_pQcmd;
+            m_pQcmd = nullptr;
+        }
+
+        if (m_pQuery)
+        {
+            delete m_pQuery;
+            m_pQuery = nullptr;
+        }
+    }
+
     IPropertyTree * CouchbaseEmbedFunctionContext::nextResultRowTree()
     {
         for (auto cbrow : *m_pQuery)
@@ -449,15 +461,16 @@ namespace couchbaseembed
 
     const char * CouchbaseEmbedFunctionContext::nextResultScalar()
     {
-        auto resultrow = nextResultRowIterator();
-        if (resultrow)
+        m_resultrow.setown(nextResultRowIterator());
+
+        if (m_resultrow)
         {
-            resultrow->first();
-            if(resultrow->isValid() == true)
+            m_resultrow->first();
+            if(m_resultrow->isValid() == true)
             {
-                if (resultrow->query().hasChildren())
+                if (m_resultrow->query().hasChildren())
                     typeError("scalar", "");
-                return resultrow->query().queryProp("");
+                return m_resultrow->query().queryProp("");
             }
             else
                 failx("Could not fetch next result column.");
@@ -562,14 +575,14 @@ namespace couchbaseembed
     IRowStream * CouchbaseEmbedFunctionContext::getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
         Owned<CouchbaseRowStream> cbaseRowStream;
-        cbaseRowStream.set(new CouchbaseRowStream(_resultAllocator, m_pQuery));
+        cbaseRowStream.setown(new CouchbaseRowStream(_resultAllocator, m_pQuery));
         return cbaseRowStream.getLink();
     }
 
     byte * CouchbaseEmbedFunctionContext::getRowResult(IEngineRowAllocator * _resultAllocator)
     {
         Owned<CouchbaseRowStream> cbaseRowStream;
-        cbaseRowStream.set(new CouchbaseRowStream(_resultAllocator, m_pQuery));
+        cbaseRowStream.setown(new CouchbaseRowStream(_resultAllocator, m_pQuery));
         return (byte *)cbaseRowStream->nextRow();
     }
 
@@ -733,12 +746,28 @@ namespace couchbaseembed
     {
         if (script && *script)
         {
-            m_pQcmd = new Couchbase::QueryCommand(script);
+            // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
 
-            if ((m_scriptFlags & EFnoparams) == 0)
-                m_numParams = countParameterPlaceholders(script);
+            size32_t len = rtlUtf8Size(chars, script);
+
+            if (len > 0)
+            {
+                StringAttr queryScript;
+                queryScript.set(script, len);
+                const char * terminatedScript = queryScript.get(); // Now null terminated
+
+                if (m_pQcmd)
+                    delete m_pQcmd;
+
+                m_pQcmd = new Couchbase::QueryCommand(terminatedScript);
+
+                if ((m_scriptFlags & EFnoparams) == 0)
+                    m_numParams = countParameterPlaceholders(terminatedScript);
+                else
+                    m_numParams = 0;
+            }
             else
-                m_numParams = 0;
+                failx("Empty N1QL query detected");
         }
         else
             failx("Empty N1QL query detected");
@@ -755,6 +784,9 @@ namespace couchbaseembed
             m_oInputStream->executeAll(m_oCBConnection);
         else
         {
+            if (m_pQuery)
+                delete m_pQuery;
+
             m_pQuery = m_oCBConnection->query(m_pQcmd);
 
             reportIfQueryFailure(m_pQuery);

+ 14 - 12
plugins/couchbase/couchbaseembed.hpp

@@ -175,8 +175,6 @@ namespace couchbaseembed
         virtual const void* nextRow();
         virtual void stop();
     private:
-        Couchbase::Query *              m_CouchBaseQuery;   //!< pointer to couchbase query (holds results and metadata)
-
         Linked<IEngineRowAllocator>     m_resultAllocator;  //!< Pointer to allocator used when building result rows
         bool                            m_shouldRead;       //!< If true, we should continue trying to read more messages
         StringArray                     m_Rows;             //!< Local copy of result rows
@@ -191,11 +189,15 @@ namespace couchbaseembed
         {
             m_connectionString.setf("couchbase%s://%s:%d/%s%s", useSSL ? "s" : "", host, port, bucketname, connOptions);
             m_pCouchbaseClient = new Couchbase::Client(m_connectionString.str(), password);
-            m_pQuery = nullptr;
         }
 
-        inline ~CouchbaseConnection()
+        virtual ~CouchbaseConnection()
         {
+            if (m_pCouchbaseClient)
+            {
+                delete m_pCouchbaseClient;
+                m_pCouchbaseClient = nullptr;
+            }
         }
 
         inline void connect()
@@ -211,7 +213,6 @@ namespace couchbaseembed
         StringBuffer m_connectionString;
         Couchbase::Client * m_pCouchbaseClient;
         Couchbase::Status  m_connectionStatus;
-        Couchbase::Query  * m_pQuery;
 
         CouchbaseConnection(const CouchbaseConnection &);
     };
@@ -249,6 +250,7 @@ namespace couchbaseembed
             m_oResultRow.set(resultrow);
             if (!m_oResultRow)
                 failx("Missing result row data");
+            m_pathStack.reserve(10);
         }
 
         virtual bool getBooleanResult(const RtlFieldInfo *field);
@@ -361,14 +363,14 @@ namespace couchbaseembed
         {
             while (bindNext())
             {
-                auto m_pQuery = conn->query(m_pQcmd);
+                std::unique_ptr<Couchbase::Query> query(conn->query(m_pQcmd));
 
-                if (m_pQuery->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
-                    failx("Query execution error: %s", m_pQuery->meta().body().to_string().c_str());
+                if (query->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
+                    failx("Query execution error: %s", query->meta().body().to_string().c_str());
 
                 //consider parsing json result
-                if (strstr(m_pQuery->meta().body().data(), "\"status\": \"errors\""))
-                    failx("Err: %s", m_pQuery->meta().body().data());
+                if (strstr(query->meta().body().data(), "\"status\": \"errors\""))
+                    failx("Err: %s", query->meta().body().data());
             }
         }
 
@@ -380,6 +382,7 @@ namespace couchbaseembed
     {
        public:
            CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags);
+           virtual ~CouchbaseEmbedFunctionContext();
            IPropertyTree * nextResultRowTree();
            IPropertyTreeIterator * nextResultRowIterator();
            const char * nextResultScalar();
@@ -443,14 +446,13 @@ namespace couchbaseembed
 
            const IContextLogger &logctx;
            Owned<CouchbaseConnection>    m_oCBConnection;
-           Couchbase::Client           * m_pCouchbaseClient;
            Couchbase::Query            * m_pQuery;
            Couchbase::QueryCommand     * m_pQcmd;
+           Owned<IPropertyTreeIterator>  m_resultrow;
 
            StringArray m_Rows;
            int m_NextRow;
            Owned<CouchbaseDatasetBinder> m_oInputStream;
-           Couchbase::Internal::RowIterator<Couchbase::QueryRow> * cbQueryIterator;
            TokenDeserializer m_tokenDeserializer;
            TokenSerializer m_tokenSerializer;
            unsigned m_nextParam;