Pārlūkot izejas kodu

Merge pull request #5609 from ghalliday/issue10135

HPCC-10135 Restructure TAKxxx

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 gadi atpakaļ
vecāks
revīzija
7c85c730b2

+ 2 - 1
common/thorhelper/commonext.cpp

@@ -203,9 +203,10 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKsmartjoin] = "smartjoin";
     kindArray[TAKsmartjoin] = "smartjoin";
     kindArray[TAKsmartdenormalize] = "smartdenormalize";
     kindArray[TAKsmartdenormalize] = "smartdenormalize";
     kindArray[TAKsmartdenormalizegroup] = "smartdenormalizegroup";
     kindArray[TAKsmartdenormalizegroup] = "smartdenormalizegroup";
+    kindArray[TAKselfdenormalize] = "selfdenormalize";
+    kindArray[TAKselfdenormalizegroup] = "selfdenormalizegroup";
 
 
 //Non standard
 //Non standard
-    kindArray[TAKcountdisk] = "countdisk";
     kindArray[TAKsubgraph] = "subgraph";
     kindArray[TAKsubgraph] = "subgraph";
 
 
     return true;
     return true;

+ 2 - 1
common/thorhelper/thorcommon.cpp

@@ -660,7 +660,6 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKcsvfetch:               return "Csv Fetch";
     case TAKcsvfetch:               return "Csv Fetch";
     case TAKxmlwrite:               return "Xml Write";
     case TAKxmlwrite:               return "Xml Write";
     case TAKparse:                  return "Parse";
     case TAKparse:                  return "Parse";
-    case TAKcountdisk:              return "Count Disk";
     case TAKsideeffect:             return "Simple Action";
     case TAKsideeffect:             return "Simple Action";
     case TAKtopn:                   return "Top N";
     case TAKtopn:                   return "Top N";
     case TAKmerge:                  return "Merge";
     case TAKmerge:                  return "Merge";
@@ -775,6 +774,8 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKsmartjoin:              return "Smart Join";
     case TAKsmartjoin:              return "Smart Join";
     case TAKsmartdenormalize:       return "Smart Denormalize";
     case TAKsmartdenormalize:       return "Smart Denormalize";
     case TAKsmartdenormalizegroup:  return "Smart Denormalize Group";
     case TAKsmartdenormalizegroup:  return "Smart Denormalize Group";
+    case TAKselfdenormalize:       return "Self Denormalize";
+    case TAKselfdenormalizegroup:  return "Self Denormalize Group";
     }
     }
     throwUnexpected();
     throwUnexpected();
 }
 }

+ 0 - 2
ecl/eclagent/eclgraph.cpp

@@ -184,8 +184,6 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
         return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind);
         return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind);
     case TAKmerge: 
     case TAKmerge: 
         return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);
         return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);
-    case TAKcountdisk: 
-        return NULL; // NOTE - activity gets created when needed
     case TAKhttp_rowdataset:
     case TAKhttp_rowdataset:
         return createHttpRowCallActivity(agent, activityId, subgraphId, (IHThorHttpCallArg &)arg, kind);
         return createHttpRowCallActivity(agent, activityId, subgraphId, (IHThorHttpCallArg &)arg, kind);
     case TAKsoap_rowdataset:
     case TAKsoap_rowdataset:

+ 0 - 2
roxie/ccd/ccdquery.cpp

@@ -449,8 +449,6 @@ protected:
                 RemoteActivityId remoteId(id, hashValue);
                 RemoteActivityId remoteId(id, hashValue);
                 return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
                 return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
             }
             }
-        case TAKcountdisk:
-            return createRoxieServerDiskCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKhashdedup:
         case TAKhashdedup:
             return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
             return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKhashdenormalize:
         case TAKhashdenormalize:

+ 0 - 149
roxie/ccd/ccdserver.cpp

@@ -23156,155 +23156,6 @@ IRoxieServerActivityFactory *createRoxieServerIndexNormalizeActivityFactory(unsi
 
 
 //=================================================================================
 //=================================================================================
 
 
-class CRoxieServerCountDiskActivity : public CRoxieServerActivity, implements IRoxieServerErrorHandler
-{
-    unsigned __int64 answer;
-
-public:
-    CRoxieServerCountDiskActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned __int64 _answer)
-        : CRoxieServerActivity(_factory, _probeManager), 
-          answer(_answer)
-    {
-    }
-
-    virtual const void *nextInGroup()
-    {
-        throwUnexpected();
-    }
-
-    virtual void setInput(unsigned idx, IRoxieInput *_in)
-    {
-        throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() called for source activity");
-    }
-
-    virtual __int64 evaluate() 
-    {
-        return answer;
-    }
-
-    virtual void onLimitExceeded(bool isKeyed) 
-    {
-        if (traceLevel > 4)
-            DBGLOG("activityid = %d  isKeyed = %d  line = %d", activityId, isKeyed, __LINE__);
-        throwUnexpected();
-    }
-
-    virtual const void *createLimitFailRow(bool isKeyed)
-    {
-        throwUnexpected();
-    }
-
-};
-
-class CRoxieServerVariableCountDiskActivity : public CRoxieServerActivity, implements IRoxieServerErrorHandler
-{
-
-public:
-    CRoxieServerVariableCountDiskActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerActivity(_factory, _probeManager)
-    {
-    }
-
-    virtual const void *nextInGroup()
-    {
-        throwUnexpected();
-    }
-
-    virtual void setInput(unsigned idx, IRoxieInput *_in)
-    {
-        throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() called for source activity");
-    }
-
-    virtual __int64 evaluate() 
-    {
-        IHThorCountFileArg &helper = (IHThorCountFileArg &) basehelper;
-        bool isOpt = (helper.getFlags() & TDRoptional) != 0;
-        unsigned recsize = helper.queryRecordSize()->getFixedSize();
-        assertex(recsize);
-        OwnedRoxieString fname(helper.getFileName());
-        Owned<const IResolvedFile> varFileInfo = resolveLFN(fname, isOpt);
-        return varFileInfo->getFileSize() / recsize; 
-    }
-
-    virtual void onLimitExceeded(bool isKeyed) 
-    {
-        if (traceLevel > 4)
-            DBGLOG("activityid = %d  isKeyed = %d  line = %d", activityId, isKeyed, __LINE__);
-        throwUnexpected();
-    }
-    virtual const void *createLimitFailRow(bool isKeyed)
-    {
-        throwUnexpected();
-    }
-};
-
-class CRoxieServerCountDiskActivityFactory : public CRoxieServerActivityFactory
-{
-public:
-    unsigned __int64 answer;
-    bool variableFileName;
-    Owned<const IResolvedFile> datafile;
-
-    CRoxieServerCountDiskActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
-        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
-    {
-        Owned<IHThorCountFileArg> helper = (IHThorCountFileArg *) helperFactory();
-        variableFileName = allFilesDynamic || _queryFactory.isDynamic() ||  ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
-        assertex(helper->queryRecordSize()->isFixedSize());
-        if (!variableFileName)
-        {
-            unsigned recsize = helper->queryRecordSize()->getFixedSize();
-            assertex(recsize);
-            OwnedRoxieString fileName(helper->getFileName());
-            bool isOpt = (helper->getFlags() & TDRoptional) != 0;
-            datafile.setown(queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, queryFactory.queryWorkUnit()));
-            offset_t filesize = datafile ? datafile->getFileSize() : 0;
-            if (filesize % recsize != 0)
-                throw MakeStringException(ROXIE_MISMATCH, "Record size mismatch for file %s - %"I64F"d is not a multiple of fixed record size %d", fileName.get(), filesize, recsize);
-            answer = filesize / recsize; 
-        }
-        else
-            answer = 0;
-    }
-
-    virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
-    {
-        if (datafile)
-            addXrefFileInfo(reply, datafile);
-    }
-
-    virtual IRoxieServerActivity *createFunction(IHThorArg &arg, IProbeManager *_probeManager) const
-    {
-        arg.Release();
-        if (variableFileName)
-            return new CRoxieServerVariableCountDiskActivity(this, _probeManager);
-        else
-            return new CRoxieServerCountDiskActivity(this, _probeManager, answer);
-    }
-
-    virtual void setInput(unsigned idx, unsigned source, unsigned sourceidx)
-    {
-        throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() should not be called for CountDisk activity");
-    }
-
-    virtual bool isFunction() const
-    {
-        return true;
-    }
-
-    virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
-    {
-        return NULL;
-    }
-};
-
-IRoxieServerActivityFactory *createRoxieServerDiskCountActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
-{
-    return new CRoxieServerCountDiskActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
-}
-
-//=================================================================================
-
 class CRoxieServerFetchActivity : public CRoxieServerActivity, implements IRecordPullerCallback, implements IRoxieServerErrorHandler
 class CRoxieServerFetchActivity : public CRoxieServerActivity, implements IRecordPullerCallback, implements IRoxieServerErrorHandler
 {
 {
     IHThorFetchBaseArg &helper;
     IHThorFetchBaseArg &helper;

+ 45 - 33
rtl/include/eclhelper.hpp

@@ -39,8 +39,8 @@ if the supplied pointer was not from the roxiemem heap. Usually an OwnedRoxieStr
 
 
 //Should be incremented whenever the virtuals in the context or a helper are changed, so
 //Should be incremented whenever the virtuals in the context or a helper are changed, so
 //that a work unit can't be rerun.  Try as hard as possible to retain compatibility.
 //that a work unit can't be rerun.  Try as hard as possible to retain compatibility.
-#define ACTIVITY_INTERFACE_VERSION      151
-#define MIN_ACTIVITY_INTERFACE_VERSION  151             //minimum value that is compatible with current interface - without using selectInterface
+#define ACTIVITY_INTERFACE_VERSION      152
+#define MIN_ACTIVITY_INTERFACE_VERSION  152             //minimum value that is compatible with current interface - without using selectInterface
 
 
 typedef unsigned char byte;
 typedef unsigned char byte;
 
 
@@ -506,7 +506,6 @@ protected:
 
 
 interface IXmlToRowTransformer;
 interface IXmlToRowTransformer;
 interface ICsvToRowTransformer;
 interface ICsvToRowTransformer;
-interface IHThorCountFileArg;
 interface IThorDiskCallback;
 interface IThorDiskCallback;
 interface IThorIndexCallback;
 interface IThorIndexCallback;
 interface IIndexReadContext;                    // this is misnamed!
 interface IIndexReadContext;                    // this is misnamed!
@@ -736,11 +735,6 @@ enum ThorActivityKind
     TAKfirstn,
     TAKfirstn,
     TAKsample,
     TAKsample,
     TAKdegroup,
     TAKdegroup,
-    TAKjoin,
-    TAKhashjoin,
-    TAKlookupjoin,
-    TAKselfjoin,
-    TAKkeyedjoin,
     TAKgroup,
     TAKgroup,
     TAKworkunitwrite,
     TAKworkunitwrite,
     TAKfunnel,
     TAKfunnel,
@@ -750,7 +744,6 @@ enum ThorActivityKind
     TAKnormalize,
     TAKnormalize,
     TAKremoteresult,
     TAKremoteresult,
     TAKpull,
     TAKpull,
-    TAKdenormalize,
     TAKnormalizechild,
     TAKnormalizechild,
     TAKchilddataset,
     TAKchilddataset,
     TAKselectn,
     TAKselectn,
@@ -768,7 +761,6 @@ enum ThorActivityKind
     TAKchoosesetsenth,
     TAKchoosesetsenth,
     TAKchoosesetslast,
     TAKchoosesetslast,
     TAKfetch,
     TAKfetch,
-    TAKhashdenormalize,
     TAKworkunitread,
     TAKworkunitread,
     TAKthroughaggregate,
     TAKthroughaggregate,
     TAKspill,
     TAKspill,
@@ -783,15 +775,12 @@ enum ThorActivityKind
     TAKxmlfetch,
     TAKxmlfetch,
     TAKxmlparse,
     TAKxmlparse,
     TAKkeyeddistribute,
     TAKkeyeddistribute,
-    TAKjoinlight,           // lightweight, local, presorted join.
-    TAKalljoin,
     TAKsoap_rowdataset,     // a source activity
     TAKsoap_rowdataset,     // a source activity
     TAKsoap_rowaction,      // source and sink activity
     TAKsoap_rowaction,      // source and sink activity
     TAKsoap_datasetdataset,     // a through activity
     TAKsoap_datasetdataset,     // a through activity
     TAKsoap_datasetaction,      // sink activity
     TAKsoap_datasetaction,      // sink activity
     TAKkeydiff,
     TAKkeydiff,
     TAKkeypatch,
     TAKkeypatch,
-    TAKkeyeddenormalize,
     TAKsequential,
     TAKsequential,
     TAKparallel,
     TAKparallel,
     TAKchilditerator,
     TAKchilditerator,
@@ -802,21 +791,24 @@ enum ThorActivityKind
     TAKlocalgraph,
     TAKlocalgraph,
     TAKifaction,
     TAKifaction,
     TAKemptyaction,
     TAKemptyaction,
-    TAKdiskread,                    // records one at a time. (filter+project)
+    TAKdiskread,                // records one at a time. (filter+project)
     TAKdisknormalize,           // same, but normalize a child dataset (filter+project)
     TAKdisknormalize,           // same, but normalize a child dataset (filter+project)
     TAKdiskaggregate,           // non-grouped aggregate of dataset, or normalized dataset (filter/project input)
     TAKdiskaggregate,           // non-grouped aggregate of dataset, or normalized dataset (filter/project input)
     TAKdiskcount,               // non-grouped count of dataset (not child), (may filter input)
     TAKdiskcount,               // non-grouped count of dataset (not child), (may filter input)
     TAKdiskgroupaggregate,      // grouped aggregate on dataset (filter) (may work on project of input)
     TAKdiskgroupaggregate,      // grouped aggregate on dataset (filter) (may work on project of input)
+    TAKdiskexists,              // non-grouped count of dataset (not child), (may filter input)
     TAKindexread,
     TAKindexread,
     TAKindexnormalize,
     TAKindexnormalize,
     TAKindexaggregate,
     TAKindexaggregate,
     TAKindexcount,
     TAKindexcount,
     TAKindexgroupaggregate,
     TAKindexgroupaggregate,
+    TAKindexexists,
     TAKchildread,
     TAKchildread,
     TAKchildnormalize,
     TAKchildnormalize,
     TAKchildaggregate,
     TAKchildaggregate,
     TAKchildcount,
     TAKchildcount,
     TAKchildgroupaggregate,
     TAKchildgroupaggregate,
+    TAKchildexists,
     TAKskiplimit,
     TAKskiplimit,
     TAKchildthroughnormalize,
     TAKchildthroughnormalize,
     TAKcsvread,
     TAKcsvread,
@@ -827,13 +819,6 @@ enum ThorActivityKind
     TAKregroup,
     TAKregroup,
     TAKrollupgroup,
     TAKrollupgroup,
     TAKcombinegroup,
     TAKcombinegroup,
-    TAKlookupdenormalize,
-    TAKalldenormalize,
-    TAKdenormalizegroup,
-    TAKhashdenormalizegroup,
-    TAKlookupdenormalizegroup,
-    TAKkeyeddenormalizegroup,
-    TAKalldenormalizegroup,
     TAKlocalresultspill,
     TAKlocalresultspill,
     TAKsimpleaction,
     TAKsimpleaction,
     TAKloopcount,
     TAKloopcount,
@@ -871,9 +856,6 @@ enum ThorActivityKind
     TAKlinkedrawiterator,
     TAKlinkedrawiterator,
     TAKnormalizelinkedchild,
     TAKnormalizelinkedchild,
     TAKfilterproject,
     TAKfilterproject,
-    TAKdiskexists,              // non-grouped count of dataset (not child), (may filter input)
-    TAKindexexists,
-    TAKchildexists,
     TAKcatch,
     TAKcatch,
     TAKskipcatch,
     TAKskipcatch,
     TAKcreaterowcatch,
     TAKcreaterowcatch,
@@ -885,23 +867,60 @@ enum ThorActivityKind
     TAKindexgroupexists,
     TAKindexgroupexists,
     TAKindexgroupcount,
     TAKindexgroupcount,
     TAKhashdistributemerge,
     TAKhashdistributemerge,
-    TAKselfjoinlight,
     TAKhttp_rowdataset,     // a source activity
     TAKhttp_rowdataset,     // a source activity
     TAKinlinetable,
     TAKinlinetable,
-    TAKcountdisk,
     TAKstreamediterator,
     TAKstreamediterator,
     TAKexternalsource,
     TAKexternalsource,
     TAKexternalsink,
     TAKexternalsink,
     TAKexternalprocess,
     TAKexternalprocess,
     TAKdictionaryworkunitwrite,
     TAKdictionaryworkunitwrite,
     TAKdictionaryresultwrite,
     TAKdictionaryresultwrite,
+    //Joins
+    TAKjoin,
+    TAKhashjoin,
+    TAKlookupjoin,
+    TAKselfjoin,
+    TAKkeyedjoin,
+    TAKalljoin,
     TAKsmartjoin,
     TAKsmartjoin,
+    TAKunknownjoin1, // place holders to make it easy to insert new join kinds
+    TAKunknownjoin2,
+    TAKunknownjoin3,
+    TAKjoinlight,           // lightweight, local, presorted join.
+    TAKselfjoinlight,
+    TAKlastjoin,
+    //Denormalize
+    TAKdenormalize,
+    TAKhashdenormalize,
+    TAKlookupdenormalize,
+    TAKselfdenormalize,
+    TAKkeyeddenormalize,
+    TAKalldenormalize,
     TAKsmartdenormalize,
     TAKsmartdenormalize,
+    TAKunknowndenormalize1,
+    TAKunknowndenormalize2,
+    TAKunknowndenormalize3,
+    TAKlastdenormalize,
+    //DenormalizeGroup
+    TAKdenormalizegroup,
+    TAKhashdenormalizegroup,
+    TAKlookupdenormalizegroup,
+    TAKselfdenormalizegroup,
+    TAKkeyeddenormalizegroup,
+    TAKalldenormalizegroup,
     TAKsmartdenormalizegroup,
     TAKsmartdenormalizegroup,
+    TAKunknowndenormalizegroup1,
+    TAKunknowndenormalizegroup2,
+    TAKunknowndenormalizegroup3,
+    TAKlastdenormalizegroup,
 
 
     TAKlast
     TAKlast
 };
 };
 
 
+inline bool isSimpleJoin(ThorActivityKind kind) { return (kind >= TAKjoin) && (kind <= TAKlastjoin); }
+inline bool isDenormalizeJoin(ThorActivityKind kind) { return (kind >= TAKdenormalize) && (kind <= TAKlastdenormalize); }
+inline bool isDenormalizeGroupJoin(ThorActivityKind kind) { return (kind >= TAKdenormalizegroup) && (kind <= TAKlastdenormalizegroup); }
+
 enum ActivityInterfaceEnum
 enum ActivityInterfaceEnum
 {
 {
     TAInone,
     TAInone,
@@ -1794,13 +1813,6 @@ struct IHThorKeyedDistributeArg : public IHThorArg
 };
 };
 
 
 
 
-struct IHThorCountFileArg : public IHThorArg
-{
-    virtual const char * getFileName() = 0;
-    virtual IOutputMetaData * queryRecordSize() = 0;
-    virtual unsigned getFlags() = 0;
-};
-
 struct IHThorFetchBaseArg : public IHThorArg
 struct IHThorFetchBaseArg : public IHThorArg
 {
 {
     virtual unsigned __int64 getRowLimit() { return (unsigned __int64) -1; }
     virtual unsigned __int64 getRowLimit() { return (unsigned __int64) -1; }

+ 0 - 19
rtl/include/eclhelper_base.hpp

@@ -1806,25 +1806,6 @@ class CThorKeyedDistributeArg : public CThorArg, implements IHThorKeyedDistribut
 };
 };
 
 
 
 
-class CThorCountFileArg : public CThorArg, implements IHThorCountFileArg
-{
-    virtual void Link() const { RtlCInterface::Link(); }
-    virtual bool Release() const { return RtlCInterface::Release(); }
-    virtual void onCreate(ICodeContext * _ctx, IHThorArg *, MemoryBuffer * in) { ctx = _ctx; }
-
-    virtual IInterface * selectInterface(ActivityInterfaceEnum which)
-    {
-        switch (which)
-        {
-        case TAIarg:
-        case TAIcountfilearg_1:
-            return static_cast<IHThorCountFileArg *>(this);
-        }
-        return NULL;
-    }
-    virtual unsigned getFlags() { return 0; }
-};
-
 class CThorFetchArg : public CThorArg, implements IHThorFetchArg
 class CThorFetchArg : public CThorArg, implements IHThorFetchArg
 {
 {
     virtual void Link() const { RtlCInterface::Link(); }
     virtual void Link() const { RtlCInterface::Link(); }

+ 0 - 1
thorlcr/graph/thgraph.cpp

@@ -969,7 +969,6 @@ bool isGlobalActivity(CGraphElementBase &container)
             if (!container.queryLocal())
             if (!container.queryLocal())
                 return true;
                 return true;
 // always local
 // always local
-        case TAKcountdisk:
         case TAKfilter:
         case TAKfilter:
         case TAKfilterproject:
         case TAKfilterproject:
         case TAKsplit:
         case TAKsplit:

+ 0 - 1
thorlcr/master/thactivitymaster.cpp

@@ -106,7 +106,6 @@ public:
         CActivityBase *ret = NULL;
         CActivityBase *ret = NULL;
         switch (kind)
         switch (kind)
         {
         {
-            case TAKcountdisk:
             case TAKfiltergroup:
             case TAKfiltergroup:
             case TAKlocalresultread:
             case TAKlocalresultread:
             case TAKchildif:
             case TAKchildif:

+ 0 - 2
thorlcr/slave/slave.cpp

@@ -685,8 +685,6 @@ public:
             case TAKsoap_datasetaction:
             case TAKsoap_datasetaction:
                 ret = createSoapDatasetActionSlave(this);
                 ret = createSoapDatasetActionSlave(this);
                 break;
                 break;
-            case TAKcountdisk:
-                return new CSlaveActivity(this); 
             case TAKkeydiff:
             case TAKkeydiff:
                 ret = createKeyDiffSlave(this);
                 ret = createKeyDiffSlave(this);
                 break;
                 break;