瀏覽代碼

HPCC-13794 Cassandra plugin should use paging

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
052ac0d8a0

+ 32 - 8
plugins/cassandra/cassandraembed.cpp

@@ -340,11 +340,14 @@ class CassandraStatementInfo : public CInterface
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode)
+    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize)
     : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
     {
         assertex(prepared && *prepared);
         statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
+        if (pageSize)
+            cass_statement_set_paging_size(*statement, pageSize);
+
     }
     ~CassandraStatementInfo()
     {
@@ -358,9 +361,27 @@ public:
     }
     bool next()
     {
-        if (!iterator)
-            return false;
-        return cass_iterator_next(*iterator);
+        loop
+        {
+            if (!iterator)
+            {
+                if (result)
+                    iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
+                else
+                    return false;
+            }
+            if (cass_iterator_next(*iterator))
+                return true;
+            iterator.clear();
+            if (!cass_result_has_more_pages(*result))
+            {
+                result.clear();
+                break;
+            }
+            cass_statement_set_paging_state(*statement, *result);
+            result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
+        }
+        return false;
     }
     void startStream()
     {
@@ -389,8 +410,6 @@ public:
         else
         {
             result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
-            if (rowCount() > 0)
-                iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
         }
     }
     inline size_t rowCount() const
@@ -1143,7 +1162,7 @@ class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
     CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
-      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
+      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
     {
         cluster.setown(new CassandraCluster(cass_cluster_new()));
         const char *contact_points = "localhost";
@@ -1177,6 +1196,10 @@ public:
                     else if (stricmp(val, "COUNTER")==0)
                         batchMode = CASS_BATCH_TYPE_COUNTER;
                 }
+                else if (stricmp(optName, "pageSize")==0)
+                {
+                    pageSize=getUnsignedOption(val, "pageSize");
+                }
                 else if (stricmp(optName, "port")==0)
                 {
                     unsigned port = getUnsignedOption(val, "port");
@@ -1742,7 +1765,7 @@ public:
                 numParams = countBindings(script);
             else
                 numParams = 0;
-            stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode));
+            stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize));
         }
     }
     virtual void callFunction()
@@ -1900,6 +1923,7 @@ protected:
     unsigned flags;
     unsigned nextParam;
     unsigned numParams;
+    unsigned pageSize;
     CassBatchType batchMode;
     StringAttr queryString;
 

+ 12 - 0
testing/regress/ecl/cassandra-simple.ecl

@@ -240,7 +240,16 @@ integer testCassandraCount() := EMBED(cassandra : user('rchapman'),keyspace('tes
   SELECT COUNT(*) from tbl1;
 ENDEMBED;
 
+dataset(childrec) testCassandraCountPaged0() := EMBED(cassandra : user('rchapman'),keyspace('test'),pageSize(0))
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1 from tbl1;
+ENDEMBED;
 
+dataset(childrec) testCassandraCountPaged101() := EMBED(cassandra : user('rchapman'),keyspace('test'),pageSize(101))
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1 from tbl1;
+ENDEMBED;
+dataset(childrec) testCassandraCountPaged100000() := EMBED(cassandra : user('rchapman'),keyspace('test'),pageSize(100000))
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1 from tbl1;
+ENDEMBED;
 // Execute the tests
 
 sequential (
@@ -268,5 +277,8 @@ sequential (
   OUTPUT(testCassandraDSParam(PROJECT(init, extractName(LEFT)))),
   testCassandraBulk,
   OUTPUT(testCassandraCount()),
+  OUTPUT(COUNT(testCassandraCountPaged0())),
+  OUTPUT(COUNT(testCassandraCountPaged101())),
+  OUTPUT(COUNT(testCassandraCountPaged100000())),
   OUTPUT('Done');
 );

+ 10 - 1
testing/regress/ecl/key/cassandra-simple.xml

@@ -55,5 +55,14 @@
  <Row><Result_17>25001</Result_17></Row>
 </Dataset>
 <Dataset name='Result 18'>
- <Row><Result_18>Done</Result_18></Row>
+ <Row><Result_18>25001</Result_18></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><Result_19>25001</Result_19></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><Result_20>25001</Result_20></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>Done</Result_21></Row>
 </Dataset>