|
@@ -4072,160 +4072,3 @@ extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent,
|
|
|
{
|
|
|
return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
}
|
|
|
-
|
|
|
-//-------------------------------------------------------------------------------------------------------------
|
|
|
-
|
|
|
-class CHThorCountIndexActivity : public CHThorActivityBase
|
|
|
-{
|
|
|
-protected:
|
|
|
- IHThorCountIndexArg &helper;
|
|
|
- Owned<IKeyIndex> keyIndex;
|
|
|
- Owned<IKeyManager> klManager;
|
|
|
-
|
|
|
-public:
|
|
|
- IMPLEMENT_SINKACTIVITY;
|
|
|
-
|
|
|
- CHThorCountIndexActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountIndexArg &_arg, ThorActivityKind _kind, IDistributedFilePart *keyFile)
|
|
|
- : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
|
|
|
- {
|
|
|
- if (keyFile)
|
|
|
- {
|
|
|
- Owned<IDistributedFilePart> kf = keyFile;
|
|
|
- keyIndex.setown(openKeyFile(*kf));
|
|
|
- klManager.setown(createKeyManager(keyIndex, 0, NULL));
|
|
|
- helper.createSegmentMonitors(klManager);
|
|
|
- klManager->finishSegmentMonitors();
|
|
|
- klManager->reset();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- virtual __int64 getCount()
|
|
|
- {
|
|
|
- if (!helper.canMatchAny())
|
|
|
- return 0;
|
|
|
- else if (helper.hasPostFilter())
|
|
|
- {
|
|
|
- __int64 ret = 0;
|
|
|
- loop
|
|
|
- {
|
|
|
- agent.reportProgress(NULL);
|
|
|
- if (!klManager->lookup(true))
|
|
|
- break;
|
|
|
- offset_t recptr;
|
|
|
- byte const * keyRow = klManager->queryKeyBuffer(recptr);
|
|
|
- KLBlobProviderAdapter adapter(klManager);
|
|
|
- if (helper.isValid(keyRow, recptr, &adapter))
|
|
|
- ret++;
|
|
|
- }
|
|
|
- return ret;
|
|
|
- }
|
|
|
- else
|
|
|
- return klManager->getCount();
|
|
|
- }
|
|
|
-
|
|
|
-};
|
|
|
-
|
|
|
-class CHThorMultiCountIndexBase : public CHThorCountIndexActivity
|
|
|
-{
|
|
|
-public:
|
|
|
- CHThorMultiCountIndexBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountIndexArg &_arg, ThorActivityKind _kind, IDistributedFilePart *_tlk)
|
|
|
- : CHThorCountIndexActivity(_agent, _activityId, _subgraphId, _arg, _kind, _tlk)
|
|
|
- {
|
|
|
- }
|
|
|
-
|
|
|
- virtual __int64 getCount()
|
|
|
- {
|
|
|
- __int64 ret = 0;
|
|
|
- while (subKey)
|
|
|
- {
|
|
|
- ret += subKey->getCount();
|
|
|
- nextSubKey();
|
|
|
- }
|
|
|
- return ret;
|
|
|
- }
|
|
|
-
|
|
|
-protected:
|
|
|
- Owned<CHThorCountIndexActivity> subKey;
|
|
|
-
|
|
|
- virtual void nextSubKey() = 0;
|
|
|
-};
|
|
|
-
|
|
|
-class CHThorMultiCountIndexActivity : public CHThorMultiCountIndexBase
|
|
|
-{
|
|
|
-public:
|
|
|
- CHThorMultiCountIndexActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountIndexArg &_arg, ThorActivityKind _kind, IDistributedFilePart *_tlk, IDistributedFile *_f)
|
|
|
- : CHThorMultiCountIndexBase(_agent, _activityId, _subgraphId, _arg, _kind, _tlk), subKeys(_f)
|
|
|
- {
|
|
|
- nextSubKey();
|
|
|
- }
|
|
|
-
|
|
|
-protected:
|
|
|
- Owned<IDistributedFile> subKeys;
|
|
|
-
|
|
|
- void nextSubKey()
|
|
|
- {
|
|
|
- subKey.clear();
|
|
|
- while (klManager->lookup(false))
|
|
|
- {
|
|
|
- if (klManager->queryFpos())
|
|
|
- {
|
|
|
- subKey.setown(new CHThorCountIndexActivity(agent, activityId, subgraphId, helper, kind, subKeys->getPart((unsigned)(klManager->queryFpos()-1))));
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-};
|
|
|
-
|
|
|
-class CHThorSuperCountIndexActivity : public CHThorMultiCountIndexBase
|
|
|
-{
|
|
|
-public:
|
|
|
- CHThorSuperCountIndexActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountIndexArg &_arg, ThorActivityKind _kind, IDistributedSuperFile *_super)
|
|
|
- : CHThorMultiCountIndexBase(_agent, _activityId, _subgraphId, _arg, _kind, NULL), superIterator(_super->getSubFileIterator(true))
|
|
|
- {
|
|
|
- superIterator->first();
|
|
|
- nextSubKey();
|
|
|
- }
|
|
|
-
|
|
|
-private:
|
|
|
- Owned<IDistributedFileIterator> superIterator;
|
|
|
-
|
|
|
- void nextSubKey()
|
|
|
- {
|
|
|
- subKey.clear();
|
|
|
- if (superIterator->isValid())
|
|
|
- {
|
|
|
- IDistributedFile &f = superIterator->query();
|
|
|
- unsigned numParts = f.numParts();
|
|
|
- if (numParts==1)
|
|
|
- subKey.setown(new CHThorCountIndexActivity(agent, activityId, subgraphId, helper, kind, f.getPart(0)));
|
|
|
- else
|
|
|
- subKey.setown(new CHThorMultiCountIndexActivity(agent, activityId, subgraphId, helper, kind, f.getPart(numParts-1), LINK(&f)));
|
|
|
- superIterator->next();
|
|
|
- }
|
|
|
- }
|
|
|
-};
|
|
|
-
|
|
|
-extern HTHOR_API IHThorActivity *createCountIndexActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountIndexArg &arg, ThorActivityKind _kind)
|
|
|
-{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- arg.extractLookupFields();
|
|
|
- const char *lfn = arg.getIndexFileName();
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "CountIndex");
|
|
|
- IDistributedFile * dFile = ldFile->queryDistributedFile();
|
|
|
-
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- IDistributedSuperFile *super = dFile->querySuperFile();
|
|
|
- if (super)
|
|
|
- {
|
|
|
- return new CHThorSuperCountIndexActivity(_agent, _activityId, _subgraphId, arg, _kind, super);
|
|
|
- }
|
|
|
- else if (dFile->numParts() == 1)
|
|
|
- {
|
|
|
- return new CHThorCountIndexActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile->getPart(0));
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- return new CHThorMultiCountIndexActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile->getPart(ldFile->numParts()-1), LINK(dFile));
|
|
|
- }
|
|
|
-}
|