Prechádzať zdrojové kódy

HPCC-13799 Cassandra plugin should delay/overlap waiting for results

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 rokov pred
rodič
commit
c9608eecb9
1 zmenil súbory, kde vykonal 32 pridanie a 9 odobranie
  1. 32 9
      plugins/cassandra/cassandraembed.cpp

+ 32 - 9
plugins/cassandra/cassandraembed.cpp

@@ -139,7 +139,7 @@ public:
     {
         return future;
     }
-    void wait(const char *why)
+    void wait(const char *why) const
     {
         cass_future_wait(future);
         CassError rc = cass_future_error_code(future);
@@ -152,11 +152,38 @@ public:
             rtlFail(0, err.str());
         }
     }
-private:
+protected:
     CassandraFuture(const CassandraFuture &);
     CassFuture *future;
 };
 
+class CassandraFutureResult : public CassandraFuture
+{
+public:
+    CassandraFutureResult(CassFuture *_future) : CassandraFuture(_future)
+    {
+        result = NULL;
+    }
+    ~CassandraFutureResult()
+    {
+        if (result)
+            cass_result_free(result);
+    }
+    inline operator const CassResult *() const
+    {
+        if (!result)
+        {
+            wait("FutureResult");
+            result = cass_future_get_result(future);
+        }
+        return result;
+    }
+private:
+    CassandraFutureResult(const CassandraFutureResult &);
+    mutable const CassResult *result;
+
+};
+
 class CassandraSession : public CInterface
 {
 public:
@@ -347,9 +374,7 @@ public:
     {
         if (batch)
         {
-            CassandraFuture future(cass_session_execute_batch(*session, *batch));
-            future.wait("execute");
-            result.setown(new CassandraResult(cass_future_get_result(future)));
+            result.setown(new CassandraFutureResult (cass_session_execute_batch(*session, *batch)));
             assertex (rowCount() == 0);
         }
     }
@@ -363,9 +388,7 @@ public:
         }
         else
         {
-            CassandraFuture future(cass_session_execute(*session, *statement));
-            future.wait("execute");
-            result.setown(new CassandraResult(cass_future_get_result(future)));
+            result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
             if (rowCount() > 0)
                 iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
         }
@@ -393,7 +416,7 @@ protected:
     Linked<CassandraPrepared> prepared;
     Owned<CassandraBatch> batch;
     Owned<CassandraStatement> statement;
-    Owned<CassandraResult> result;
+    Owned<CassandraFutureResult> result;
     Owned<CassandraIterator> iterator;
     unsigned numBindings;
     CassBatchType(batchMode);