|
@@ -75,16 +75,6 @@ static IKeyIndex *openKeyFile(IDistributedFilePart & keyFile)
|
|
|
throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
|
|
|
}
|
|
|
|
|
|
-void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
|
|
|
-{
|
|
|
- IDistributedSuperFile * super = file->querySuperFile();
|
|
|
- while(super && (super->numSubFiles() == 1))
|
|
|
- {
|
|
|
- file.setown(super->getSubFile(0));
|
|
|
- super = file->querySuperFile();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
static void setProgress(IPropertyTree &node, const char *name, const char *value)
|
|
|
{
|
|
|
StringBuffer attr("@");
|
|
@@ -128,6 +118,31 @@ protected:
|
|
|
IKeyManager * keyManager;
|
|
|
};
|
|
|
|
|
|
+
|
|
|
+//-------------------------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
+ILocalOrDistributedFile *resolveLFNIndex(IAgentContext &agent, const char *logicalName, const char *errorTxt, bool optional, bool noteRead, bool write, bool isPrivilegedUser)
|
|
|
+{
|
|
|
+ Owned<ILocalOrDistributedFile> ldFile = agent.resolveLFN(logicalName, errorTxt, optional, noteRead, write, nullptr, isPrivilegedUser);
|
|
|
+ if (!ldFile)
|
|
|
+ return nullptr;
|
|
|
+ IDistributedFile *dFile = ldFile->queryDistributedFile();
|
|
|
+ if (dFile && !isFileKey(dFile))
|
|
|
+ throw MakeStringException(0, "Attempting to read flat file as an index: %s", logicalName);
|
|
|
+ return ldFile.getClear();
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
|
|
|
+{
|
|
|
+ IDistributedSuperFile * super = file->querySuperFile();
|
|
|
+ while(super && (super->numSubFiles() == 1))
|
|
|
+ {
|
|
|
+ file.setown(super->getSubFile(0));
|
|
|
+ super = file->querySuperFile();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
|
|
|
|
class CHThorNullAggregateActivity : public CHThorNullActivity
|
|
@@ -218,7 +233,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
~CHThorIndexReadActivityBase();
|
|
|
|
|
|
virtual void ready();
|
|
@@ -271,11 +286,12 @@ protected:
|
|
|
bool firstPart();
|
|
|
virtual bool nextPart();
|
|
|
virtual void initPart();
|
|
|
+ void resolveIndexFilename();
|
|
|
+ void killPart();
|
|
|
|
|
|
private:
|
|
|
bool firstMultiPart();
|
|
|
bool nextMultiPart();
|
|
|
- void killPart();
|
|
|
bool setCurrentPart(unsigned whichPart);
|
|
|
void clearTlk() { tlk.clear(); tlManager.clear(); }
|
|
|
void openTlk();
|
|
@@ -285,15 +301,13 @@ protected:
|
|
|
IHThorIndexReadBaseArg &helper;
|
|
|
IHThorSourceLimitTransformExtra * limitTransformExtra;
|
|
|
CachedOutputMetaData eclKeySize;
|
|
|
- size32_t keySize;
|
|
|
- void * activityRecordMetaBuff;
|
|
|
- size32_t activityRecordMetaSize;
|
|
|
+ size32_t keySize= 0;
|
|
|
|
|
|
// current part
|
|
|
Owned<IDistributedFilePart> curPart;
|
|
|
Owned<IKeyManager> klManager;
|
|
|
Owned<IKeyIndex> keyIndex;
|
|
|
- unsigned nextPartNumber;
|
|
|
+ unsigned nextPartNumber = 0;
|
|
|
|
|
|
//multi files
|
|
|
Owned<IDistributedFile> df;
|
|
@@ -302,8 +316,8 @@ protected:
|
|
|
|
|
|
//super files:
|
|
|
Owned<IDistributedFileIterator> superIterator;
|
|
|
- unsigned superIndex;
|
|
|
- unsigned superCount;
|
|
|
+ unsigned superIndex = 0;
|
|
|
+ unsigned superCount = 1;
|
|
|
StringBuffer superName;
|
|
|
|
|
|
TransformCallback callback;
|
|
@@ -311,19 +325,19 @@ protected:
|
|
|
//for preopening (when need counts for keyed skip limit):
|
|
|
Owned<IKeyIndexSet> keyIndexCache;
|
|
|
UnsignedArray superIndexCache;
|
|
|
- unsigned keyIndexCacheIdx;
|
|
|
+ unsigned keyIndexCacheIdx = 0;
|
|
|
|
|
|
unsigned seeks;
|
|
|
unsigned scans;
|
|
|
unsigned postFiltered;
|
|
|
- bool singlePart; // a single part index, not part of a super file - optimize so never reload the part.
|
|
|
- bool localSortKey;
|
|
|
+ bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
|
|
|
+ bool localSortKey = false;
|
|
|
+ bool initializedFileInfo = false;
|
|
|
|
|
|
//for layout translation
|
|
|
Owned<const IDynamicTransform> layoutTrans;
|
|
|
IConstPointerArrayOf<IDynamicTransform> layoutTransArray;
|
|
|
IPointerArrayOf<IOutputMetaData> actualLayouts;
|
|
|
- bool gotLayoutTrans;
|
|
|
RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
|
|
|
|
|
|
RecordTranslationMode getLayoutTranslationMode()
|
|
@@ -336,28 +350,10 @@ protected:
|
|
|
|
|
|
};
|
|
|
|
|
|
-CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
- : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), df(LINK(_df)), activityRecordMetaBuff(NULL)
|
|
|
+CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
|
|
|
{
|
|
|
- singlePart = false;
|
|
|
- localSortKey = (df->queryAttributes().hasProp("@local"));
|
|
|
- IDistributedSuperFile *super = df->querySuperFile();
|
|
|
- superCount = 1;
|
|
|
- superIndex = 0;
|
|
|
nextPartNumber = 0;
|
|
|
- if (super)
|
|
|
- {
|
|
|
- superIterator.setown(super->getSubFileIterator(true));
|
|
|
- superCount = super->numSubFiles(true);
|
|
|
- if (helper.getFlags() & TIRsorted)
|
|
|
- throw MakeStringException(1000, "SORTED attribute is not supported when reading from superkey");
|
|
|
- superName.append(df->queryLogicalName());
|
|
|
- df.clear();
|
|
|
- }
|
|
|
- else if (df->numParts() == 1)
|
|
|
- {
|
|
|
- singlePart = true;
|
|
|
- }
|
|
|
|
|
|
eclKeySize.set(helper.queryDiskRecordSize());
|
|
|
|
|
@@ -366,7 +362,6 @@ CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent,
|
|
|
scans = 0;
|
|
|
helper.setCallback(&callback);
|
|
|
limitTransformExtra = nullptr;
|
|
|
- gotLayoutTrans = false;
|
|
|
if (_node)
|
|
|
{
|
|
|
const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
|
|
@@ -378,18 +373,58 @@ CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent,
|
|
|
CHThorIndexReadActivityBase::~CHThorIndexReadActivityBase()
|
|
|
{
|
|
|
// ReleaseRoxieRow(recBuffer);
|
|
|
- rtlFree(activityRecordMetaBuff);
|
|
|
}
|
|
|
|
|
|
void CHThorIndexReadActivityBase::ready()
|
|
|
{
|
|
|
CHThorActivityBase::ready();
|
|
|
- if(!gotLayoutTrans)
|
|
|
+ if(!initializedFileInfo || (helper.getFlags() & TIRvarfilename))
|
|
|
{
|
|
|
+ resolveIndexFilename();
|
|
|
+ layoutTransArray.kill();
|
|
|
getLayoutTranslators();
|
|
|
- gotLayoutTrans = true;
|
|
|
+ initializedFileInfo = true;
|
|
|
}
|
|
|
- firstPart();
|
|
|
+}
|
|
|
+
|
|
|
+void CHThorIndexReadActivityBase::resolveIndexFilename()
|
|
|
+{
|
|
|
+ // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
+ OwnedRoxieString lfn(helper.getFileName());
|
|
|
+ Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(agent, lfn, "IndexRead", 0 != (helper.getFlags() & TIRoptional),true, false, defaultPrivilegedUser);
|
|
|
+ df.set(ldFile ? ldFile->queryDistributedFile() : NULL);
|
|
|
+ if (!df)
|
|
|
+ {
|
|
|
+ StringBuffer buff;
|
|
|
+ buff.append("Skipping OPT index read of nonexistent file ").append(lfn);
|
|
|
+ agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ agent.logFileAccess(df, "HThor", "READ");
|
|
|
+ enterSingletonSuperfiles(df);
|
|
|
+
|
|
|
+ singlePart = false;
|
|
|
+ localSortKey = (df->queryAttributes().hasProp("@local"));
|
|
|
+ IDistributedSuperFile *super = df->querySuperFile();
|
|
|
+ superCount = 1;
|
|
|
+ superIndex = 0;
|
|
|
+ nextPartNumber = 0;
|
|
|
+ if (super)
|
|
|
+ {
|
|
|
+ superIterator.setown(super->getSubFileIterator(true));
|
|
|
+ superCount = super->numSubFiles(true);
|
|
|
+ if (helper.getFlags() & TIRsorted)
|
|
|
+ throw MakeStringException(1000, "SORTED attribute is not supported when reading from superkey");
|
|
|
+ superName.append(df->queryLogicalName());
|
|
|
+ df.clear();
|
|
|
+ }
|
|
|
+ else if (df->numParts() == 1)
|
|
|
+ {
|
|
|
+ singlePart = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ killPart();
|
|
|
}
|
|
|
|
|
|
void CHThorIndexReadActivityBase::stop()
|
|
@@ -400,6 +435,8 @@ void CHThorIndexReadActivityBase::stop()
|
|
|
|
|
|
bool CHThorIndexReadActivityBase::doPreopenLimit(unsigned __int64 limit)
|
|
|
{
|
|
|
+ keyIndexCache.clear();
|
|
|
+ superIndexCache.kill();
|
|
|
if(!helper.canMatchAny())
|
|
|
return false;
|
|
|
keyIndexCache.setown(createKeyIndexSet());
|
|
@@ -505,7 +542,7 @@ const void * CHThorIndexReadActivityBase::createKeyedLimitOnFailRow()
|
|
|
bool CHThorIndexReadActivityBase::firstPart()
|
|
|
{
|
|
|
killPart();
|
|
|
- if (helper.canMatchAny())
|
|
|
+ if ((df || superIterator) && helper.canMatchAny())
|
|
|
{
|
|
|
if(keyIndexCache)
|
|
|
{
|
|
@@ -671,10 +708,12 @@ void CHThorIndexReadActivityBase::getLayoutTranslators()
|
|
|
layoutTransArray.append(layoutTrans.getClear());
|
|
|
} while(superIterator->next());
|
|
|
}
|
|
|
- else
|
|
|
+ else if (df)
|
|
|
{
|
|
|
layoutTrans.setown(getLayoutTranslator(df));
|
|
|
}
|
|
|
+ else
|
|
|
+ layoutTrans.clear();
|
|
|
}
|
|
|
|
|
|
const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDistributedFile * f)
|
|
@@ -749,7 +788,7 @@ class CHThorIndexReadActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
~CHThorIndexReadActivity();
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -782,8 +821,8 @@ protected:
|
|
|
bool keyedLimitRowCreated;
|
|
|
};
|
|
|
|
|
|
-CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg)
|
|
|
+CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg)
|
|
|
{
|
|
|
limitTransformExtra = &helper;
|
|
|
steppedExtra = helper.querySteppingExtra();
|
|
@@ -824,6 +863,7 @@ CHThorIndexReadActivity::~CHThorIndexReadActivity()
|
|
|
|
|
|
void CHThorIndexReadActivity::ready()
|
|
|
{
|
|
|
+ CHThorIndexReadActivityBase::ready();
|
|
|
keyedLimitReached = false;
|
|
|
keyedLimitRowCreated = false;
|
|
|
keyedLimit = helper.getKeyedLimit();
|
|
@@ -832,16 +872,13 @@ void CHThorIndexReadActivity::ready()
|
|
|
rowLimit = (unsigned __int64) -1;
|
|
|
stopAfter = helper.getChooseNLimit();
|
|
|
keyedProcessed = 0;
|
|
|
- if(!gotLayoutTrans)
|
|
|
- {
|
|
|
- getLayoutTranslators();
|
|
|
- gotLayoutTrans = true;
|
|
|
- }
|
|
|
if (seekGEOffset || localSortKey || ((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && !singlePart))
|
|
|
keyedLimitReached = doPreopenLimit(keyedLimit);
|
|
|
- CHThorIndexReadActivityBase::ready();
|
|
|
if (steppedExtra)
|
|
|
steppingMeta.setExtra(steppedExtra);
|
|
|
+
|
|
|
+ firstPart();
|
|
|
+
|
|
|
if(klManager && (keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && singlePart && !seekGEOffset)
|
|
|
{
|
|
|
unsigned __int64 result = klManager->checkCount(keyedLimit);
|
|
@@ -854,6 +891,7 @@ bool CHThorIndexReadActivity::nextPart()
|
|
|
{
|
|
|
if(keyIndexCache && (seekGEOffset || localSortKey))
|
|
|
{
|
|
|
+ killPart();
|
|
|
klManager.setown(createKeyMerger(eclKeySize.queryRecordAccessor(true), keyIndexCache, seekGEOffset, NULL, helper.hasNewSegmentMonitors(), false));
|
|
|
keyIndexCache.clear();
|
|
|
initManager(klManager, false);
|
|
@@ -1076,34 +1114,9 @@ IInputSteppingMeta * CHThorIndexReadActivity::querySteppingMeta()
|
|
|
}
|
|
|
|
|
|
|
|
|
-ILocalOrDistributedFile *resolveLFNIndex(IAgentContext &agent, const char *logicalName, const char *errorTxt, bool optional, bool noteRead, bool write, bool isPrivilegedUser)
|
|
|
-{
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = agent.resolveLFN(logicalName, errorTxt, optional, noteRead, write, nullptr, isPrivilegedUser);
|
|
|
- if (!ldFile)
|
|
|
- return nullptr;
|
|
|
- IDistributedFile *dFile = ldFile->queryDistributedFile();
|
|
|
- if (dFile && !isFileKey(dFile))
|
|
|
- throw MakeStringException(0, "Attempting to read flat file as an index: %s", logicalName);
|
|
|
- return ldFile.getClear();
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- OwnedRoxieString lfn(arg.getFileName());
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(_agent, lfn, "IndexRead", 0 != (arg.getFlags() & TIRoptional),true, false, defaultPrivilegedUser);
|
|
|
- Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
|
|
|
- if (!dFile)
|
|
|
- {
|
|
|
- StringBuffer buff;
|
|
|
- buff.append("Skipping OPT index read of nonexistent file ").append(lfn);
|
|
|
- _agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
- return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
- }
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
+ return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1113,7 +1126,7 @@ class CHThorIndexNormalizeActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
~CHThorIndexNormalizeActivity();
|
|
|
|
|
|
virtual void ready();
|
|
@@ -1136,7 +1149,7 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), outBuilder(NULL)
|
|
|
+CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
|
|
|
{
|
|
|
limitTransformExtra = &helper;
|
|
|
keyedLimit = (unsigned __int64)-1;
|
|
@@ -1153,6 +1166,7 @@ CHThorIndexNormalizeActivity::~CHThorIndexNormalizeActivity()
|
|
|
|
|
|
void CHThorIndexNormalizeActivity::ready()
|
|
|
{
|
|
|
+ CHThorIndexReadActivityBase::ready();
|
|
|
keyedLimit = helper.getKeyedLimit();
|
|
|
skipLimitReached = false;
|
|
|
keyedProcessed = 0;
|
|
@@ -1161,8 +1175,9 @@ void CHThorIndexNormalizeActivity::ready()
|
|
|
rowLimit = (unsigned __int64) -1;
|
|
|
stopAfter = helper.getChooseNLimit();
|
|
|
expanding = false;
|
|
|
- CHThorIndexReadActivityBase::ready();
|
|
|
outBuilder.setAllocator(rowAllocator);
|
|
|
+
|
|
|
+ firstPart();
|
|
|
}
|
|
|
|
|
|
void CHThorIndexNormalizeActivity::stop()
|
|
@@ -1275,20 +1290,7 @@ const void * CHThorIndexNormalizeActivity::createNextRow()
|
|
|
|
|
|
extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- OwnedRoxieString lfn(arg.getFileName());
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(_agent, lfn, "IndexNormalize", 0 != (arg.getFlags() & TIRoptional),true,true,defaultPrivilegedUser);
|
|
|
- Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
|
|
|
- if (!dFile)
|
|
|
- {
|
|
|
- StringBuffer buff;
|
|
|
- buff.append("Skipping OPT index normalize of nonexistent file ").append(lfn);
|
|
|
- _agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
- return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
- }
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
+ return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1298,7 +1300,7 @@ class CHThorIndexAggregateActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
~CHThorIndexAggregateActivity();
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -1318,8 +1320,8 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), outBuilder(NULL)
|
|
|
+CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -1332,6 +1334,8 @@ void CHThorIndexAggregateActivity::ready()
|
|
|
CHThorIndexReadActivityBase::ready();
|
|
|
outBuilder.setAllocator(rowAllocator);
|
|
|
finished = false;
|
|
|
+
|
|
|
+ firstPart();
|
|
|
}
|
|
|
|
|
|
void CHThorIndexAggregateActivity::stop()
|
|
@@ -1392,20 +1396,7 @@ const void *CHThorIndexAggregateActivity::nextRow()
|
|
|
|
|
|
extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- OwnedRoxieString lfn(arg.getFileName());
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(_agent, lfn, "IndexAggregate", 0 != (arg.getFlags() & TIRoptional), true, false, defaultPrivilegedUser);
|
|
|
- Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
|
|
|
- if (!dFile)
|
|
|
- {
|
|
|
- StringBuffer buff;
|
|
|
- buff.append("Skipping OPT index aggregate of nonexistent file ").append(lfn);
|
|
|
- _agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
- return new CHThorNullAggregateActivity(_agent, _activityId, _subgraphId, arg, arg, _kind);
|
|
|
- }
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
+ return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1418,7 +1409,7 @@ class CHThorIndexCountActivity : public CHThorIndexReadActivityBase
|
|
|
unsigned __int64 rowLimit = (unsigned __int64)-1;
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
|
|
|
//interface IHThorInput
|
|
|
virtual void ready();
|
|
@@ -1434,8 +1425,8 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg)
|
|
|
+CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg)
|
|
|
{
|
|
|
choosenLimit = (unsigned __int64)-1;
|
|
|
finished = false;
|
|
@@ -1453,11 +1444,13 @@ void CHThorIndexCountActivity::ready()
|
|
|
finished = false;
|
|
|
choosenLimit = helper.getChooseNLimit();
|
|
|
|
|
|
+ firstPart();
|
|
|
+
|
|
|
if ((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0))
|
|
|
{
|
|
|
if (singlePart)
|
|
|
{
|
|
|
- if (klManager) // NB: opened by base::ready()
|
|
|
+ if (klManager) // NB: opened by firstPart()
|
|
|
{
|
|
|
unsigned __int64 result = klManager->checkCount(keyedLimit);
|
|
|
keyedLimitReached = (result > keyedLimit);
|
|
@@ -1548,20 +1541,7 @@ const void *CHThorIndexCountActivity::nextRow()
|
|
|
|
|
|
extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- OwnedRoxieString lfn(arg.getFileName());
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(_agent, lfn, "IndexCount", 0 != (arg.getFlags() & TIRoptional), true, false, defaultPrivilegedUser);
|
|
|
- Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
|
|
|
- if (!dFile)
|
|
|
- {
|
|
|
- StringBuffer buff;
|
|
|
- buff.append("Skipping OPT index count of nonexistent file ").append(lfn);
|
|
|
- _agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
- return new CHThorNullCountActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
- }
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
+ return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1570,7 +1550,7 @@ class CHThorIndexGroupAggregateActivity : public CHThorIndexReadActivityBase, im
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
+ CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
|
|
|
IMPLEMENT_IINTERFACE
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -1591,7 +1571,7 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), aggregated(_arg, _arg)
|
|
|
+CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), aggregated(_arg, _arg)
|
|
|
{
|
|
|
eof = false;
|
|
|
gathered = false;
|
|
@@ -1604,6 +1584,8 @@ void CHThorIndexGroupAggregateActivity::ready()
|
|
|
gathered = false;
|
|
|
aggregated.reset();
|
|
|
aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
|
|
|
+
|
|
|
+ firstPart();
|
|
|
}
|
|
|
|
|
|
void CHThorIndexGroupAggregateActivity::processRow(const void * next)
|
|
@@ -1659,20 +1641,7 @@ const void *CHThorIndexGroupAggregateActivity::nextRow()
|
|
|
|
|
|
extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
- OwnedRoxieString lfn(arg.getFileName());
|
|
|
- Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(_agent, lfn, "IndexGroupAggregate", 0 != (arg.getFlags() & TIRoptional), true, false, defaultPrivilegedUser);
|
|
|
- Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
|
|
|
- if (!dFile)
|
|
|
- {
|
|
|
- StringBuffer buff;
|
|
|
- buff.append("Skipping OPT index group aggregate of nonexistent file ").append(lfn);
|
|
|
- _agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
|
|
|
- return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
- }
|
|
|
- _agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
- enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
+ return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|