Преглед на файлове

HPCC-8032 Roxie and hthor support for grouped hash aggregate

Support the new ,GROUPED attribute on hash aggregate activity. The same
change was made in roxie and hthor, and the new regression suite test now
passes.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman преди 12 години
родител
ревизия
655e83690c
променени са 7 файла, в които са добавени 61 реда и са изтрити 27 реда
  1. 1 1
      ecl/eclagent/eclgraph.cpp
  2. 26 9
      ecl/hthor/hthor.cpp
  3. 1 1
      ecl/hthor/hthor.hpp
  4. 2 1
      ecl/hthor/hthor.ipp
  5. 1 1
      roxie/ccd/ccdquery.cpp
  6. 29 13
      roxie/ccd/ccdserver.cpp
  7. 1 1
      roxie/ccd/ccdserver.hpp

+ 1 - 1
ecl/eclagent/eclgraph.cpp

@@ -67,7 +67,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKcountaggregate:
     case TAKcountaggregate:
         return createAggregateActivity(agent, activityId, subgraphId, (IHThorAggregateArg &)arg, kind);
         return createAggregateActivity(agent, activityId, subgraphId, (IHThorAggregateArg &)arg, kind);
     case TAKhashaggregate:
     case TAKhashaggregate:
-        return createHashAggregateActivity(agent, activityId, subgraphId, (IHThorHashAggregateArg &)arg, kind);
+        return createHashAggregateActivity(agent, activityId, subgraphId, (IHThorHashAggregateArg &)arg, kind, isGrouped);
     case TAKfirstn:
     case TAKfirstn:
         return createFirstNActivity(agent, activityId, subgraphId, (IHThorFirstNArg &)arg, kind);
         return createFirstNActivity(agent, activityId, subgraphId, (IHThorFirstNArg &)arg, kind);
     case TAKsample:
     case TAKsample:

+ 26 - 9
ecl/hthor/hthor.cpp

@@ -3111,8 +3111,10 @@ const void * CHThorAggregateActivity::nextInGroup()
 
 
 //=====================================================================================================
 //=====================================================================================================
 
 
-CHThorHashAggregateActivity::CHThorHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &_arg, ThorActivityKind _kind) 
-: CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), aggregated(_arg, _arg)
+CHThorHashAggregateActivity::CHThorHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &_arg, ThorActivityKind _kind, bool _isGroupedAggregate)
+: CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg),
+  isGroupedAggregate(_isGroupedAggregate),
+  aggregated(_arg, _arg)
 {
 {
 }
 }
 
 
@@ -3121,7 +3123,6 @@ void CHThorHashAggregateActivity::ready()
     CHThorSimpleActivityBase::ready();
     CHThorSimpleActivityBase::ready();
     eof = false;
     eof = false;
     gathered = false;
     gathered = false;
-    aggregated.start(rowAllocator);
 }
 }
 
 
 void CHThorHashAggregateActivity::done()
 void CHThorHashAggregateActivity::done()
@@ -3138,16 +3139,27 @@ const void * CHThorHashAggregateActivity::nextInGroup()
 
 
     if (!gathered)
     if (!gathered)
     {
     {
+        bool eog = true;
+        aggregated.start(rowAllocator);
         loop
         loop
         {
         {
             OwnedConstHThorRow next(input->nextInGroup());
             OwnedConstHThorRow next(input->nextInGroup());
             if (!next)
             if (!next)
             {
             {
-                next.setown(input->nextInGroup());
-                if (!next)
-                    break;
+                if (isGroupedAggregate)
+                {
+                    if (eog)
+                        eof = true;
+                }
+                else
+                {
+                    next.setown(input->nextInGroup());
+                    if (!next)
+                        eof = true;
+                }
+                break;
             }
             }
-
+            eog = false;
             try
             try
             {
             {
                 aggregated.addRow(next);
                 aggregated.addRow(next);
@@ -3166,7 +3178,8 @@ const void * CHThorHashAggregateActivity::nextInGroup()
         processed++;
         processed++;
         return next->finalizeRowClear();
         return next->finalizeRowClear();
     }
     }
-    eof = true;
+    aggregated.reset();
+    gathered = false;
     return NULL;
     return NULL;
 }
 }
 
 
@@ -9701,7 +9714,11 @@ extern HTHOR_API IHThorActivity *createChildIfActivity(IAgentContext &_agent, un
     return new CHThorIfActivity(_agent, _activityId, _subgraphId, arg, kind);
     return new CHThorIfActivity(_agent, _activityId, _subgraphId, arg, kind);
 }
 }
 
 
-MAKEFACTORY(HashAggregate);
+extern HTHOR_API IHThorActivity *createHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &arg, ThorActivityKind kind, bool _isGroupedAggregate)
+{
+    return new CHThorHashAggregateActivity(_agent, _activityId, _subgraphId, arg, kind, _isGroupedAggregate);
+}
+
 MAKEFACTORY(Null);
 MAKEFACTORY(Null);
 MAKEFACTORY(SideEffect);
 MAKEFACTORY(SideEffect);
 MAKEFACTORY(Action);
 MAKEFACTORY(Action);

+ 1 - 1
ecl/hthor/hthor.hpp

@@ -130,7 +130,7 @@ extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsi
 extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createChildIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createChildIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &arg, ThorActivityKind kind);
+extern HTHOR_API IHThorActivity *createHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &arg, ThorActivityKind kind, bool _isGroupedHashAggregate);
 extern HTHOR_API IHThorActivity *createNullActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNullArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createNullActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNullArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createSideEffectActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSideEffectArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createSideEffectActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSideEffectArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorActionArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorActionArg &arg, ThorActivityKind kind);

+ 2 - 1
ecl/hthor/hthor.ipp

@@ -865,8 +865,9 @@ class CHThorHashAggregateActivity : public CHThorSimpleActivityBase
 
 
     bool eof;
     bool eof;
     bool gathered;
     bool gathered;
+    bool isGroupedAggregate;
 public:
 public:
-    CHThorHashAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &_arg, ThorActivityKind _kind);
+    CHThorHashAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &_arg, ThorActivityKind _kind, bool _isGroupedAggregate);
 
 
     virtual void ready();
     virtual void ready();
     virtual void done();
     virtual void done();

+ 1 - 1
roxie/ccd/ccdquery.cpp

@@ -324,7 +324,7 @@ protected:
         case TAKgroup:
         case TAKgroup:
             return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
             return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKhashaggregate:
         case TAKhashaggregate:
-            return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKif:
         case TAKif:
         case TAKchildif:
         case TAKchildif:
             return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
             return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));

+ 29 - 13
roxie/ccd/ccdserver.cpp

@@ -10274,10 +10274,12 @@ class CRoxieServerHashAggregateActivity : public CRoxieServerActivity
 
 
     bool eof;
     bool eof;
     bool gathered;
     bool gathered;
-
+    bool isGroupedAggregate;
 public:
 public:
-    CRoxieServerHashAggregateActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerActivity(_factory, _probeManager), helper((IHThorHashAggregateArg &)basehelper), aggregated(helper, helper)
+    CRoxieServerHashAggregateActivity(const IRoxieServerActivityFactory *_factory, bool _isGroupedAggregate, IProbeManager *_probeManager)
+        : CRoxieServerActivity(_factory, _probeManager), helper((IHThorHashAggregateArg &)basehelper),
+          isGroupedAggregate(_isGroupedAggregate),
+          aggregated(helper, helper)
     {
     {
         eof = false;
         eof = false;
         gathered = false;
         gathered = false;
@@ -10288,7 +10290,6 @@ public:
         eof = false;
         eof = false;
         gathered = false;
         gathered = false;
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
-        aggregated.start(rowAllocator);
     }
     }
 
 
     virtual void reset()
     virtual void reset()
@@ -10307,16 +10308,27 @@ public:
 
 
         if (!gathered)
         if (!gathered)
         {
         {
+            aggregated.start(rowAllocator);
+            bool eog = true;
             loop
             loop
             {
             {
                 const void * next = input->nextInGroup();
                 const void * next = input->nextInGroup();
                 if (!next)
                 if (!next)
                 {
                 {
-                    next = input->nextInGroup();
-                    if (!next)
-                        break;
+                    if (isGroupedAggregate)
+                    {
+                        if (eog)
+                            eof = true;
+                    }
+                    else
+                    {
+                        next = input->nextInGroup();
+                        if (!next)
+                            eof = true;
+                    }
+                    break;
                 }
                 }
-
+                eog = false;
                 aggregated.addRow(next);
                 aggregated.addRow(next);
                 ReleaseRoxieRow(next);
                 ReleaseRoxieRow(next);
             }
             }
@@ -10329,7 +10341,8 @@ public:
             processed++;
             processed++;
             return next->finalizeRowClear();
             return next->finalizeRowClear();
         }
         }
-        eof = true;
+        aggregated.reset();
+        gathered = false;
         return NULL;
         return NULL;
     }
     }
 };
 };
@@ -10337,20 +10350,23 @@ public:
 class CRoxieServerHashAggregateActivityFactory : public CRoxieServerActivityFactory
 class CRoxieServerHashAggregateActivityFactory : public CRoxieServerActivityFactory
 {
 {
 public:
 public:
-    CRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+    CRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
         : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
         : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
     {
     {
+        isGroupedAggregate = _graphNode.getPropBool("att[@name='grouped']/@value");
     }
     }
 
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
     {
-        return new CRoxieServerHashAggregateActivity(this, _probeManager);
+        return new CRoxieServerHashAggregateActivity(this, isGroupedAggregate, _probeManager);
     }
     }
+protected:
+    bool isGroupedAggregate;
 };
 };
 
 
-IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
 {
 {
-    return new CRoxieServerHashAggregateActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+    return new CRoxieServerHashAggregateActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
 }
 }
 
 
 //=================================================================================
 //=================================================================================

+ 1 - 1
roxie/ccd/ccdserver.hpp

@@ -453,7 +453,7 @@ extern IRoxieServerActivityFactory *createRoxieServerChooseSetsEnthActivityFacto
 extern IRoxieServerActivityFactory *createRoxieServerChooseSetsLastActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerChooseSetsLastActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerDegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerDegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSpillReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSpillReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);