|
@@ -215,7 +215,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
~CHThorIndexReadActivityBase();
|
|
|
|
|
|
virtual void ready();
|
|
@@ -321,9 +321,19 @@ protected:
|
|
|
IConstPointerArrayOf<IDynamicTransform> layoutTransArray;
|
|
|
IPointerArrayOf<IOutputMetaData> actualLayouts;
|
|
|
bool gotLayoutTrans;
|
|
|
+ RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
|
|
|
+
|
|
|
+ RecordTranslationMode getLayoutTranslationMode()
|
|
|
+ {
|
|
|
+ if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
|
|
|
+ return recordTranslationModeHint;
|
|
|
+ return agent.getLayoutTranslationMode();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
};
|
|
|
|
|
|
-CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
|
|
|
+CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), df(LINK(_df)), activityRecordMetaBuff(NULL)
|
|
|
{
|
|
|
singlePart = false;
|
|
@@ -354,6 +364,12 @@ CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent,
|
|
|
helper.setCallback(&callback);
|
|
|
limitTransformExtra = nullptr;
|
|
|
gotLayoutTrans = false;
|
|
|
+ if (_node)
|
|
|
+ {
|
|
|
+ const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
|
|
|
+ if (recordTranslationModeHintText)
|
|
|
+ recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
CHThorIndexReadActivityBase::~CHThorIndexReadActivityBase()
|
|
@@ -674,10 +690,10 @@ void CHThorIndexReadActivityBase::getLayoutTranslators()
|
|
|
|
|
|
const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDistributedFile * f)
|
|
|
{
|
|
|
- if(agent.getLayoutTranslationMode() == RecordTranslationMode::AlwaysECL)
|
|
|
+ if(getLayoutTranslationMode() == RecordTranslationMode::AlwaysECL)
|
|
|
return NULL;
|
|
|
|
|
|
- if(agent.getLayoutTranslationMode() == RecordTranslationMode::None)
|
|
|
+ if(getLayoutTranslationMode() == RecordTranslationMode::None)
|
|
|
{
|
|
|
verifyFormatCrc(helper.getDiskFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, true);
|
|
|
return NULL;
|
|
@@ -729,7 +745,7 @@ class CHThorIndexReadActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
~CHThorIndexReadActivity();
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -762,8 +778,8 @@ protected:
|
|
|
bool keyedLimitRowCreated;
|
|
|
};
|
|
|
|
|
|
-CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg)
|
|
|
+CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg)
|
|
|
{
|
|
|
limitTransformExtra = &helper;
|
|
|
steppedExtra = helper.querySteppingExtra();
|
|
@@ -1065,7 +1081,7 @@ ILocalOrDistributedFile *resolveLFNIndex(IAgentContext &agent, const char *logic
|
|
|
}
|
|
|
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
// A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
OwnedRoxieString lfn(arg.getFileName());
|
|
@@ -1081,7 +1097,7 @@ extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent,
|
|
|
}
|
|
|
_agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
|
|
|
+ return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1091,7 +1107,7 @@ class CHThorIndexNormalizeActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
~CHThorIndexNormalizeActivity();
|
|
|
|
|
|
virtual void ready();
|
|
@@ -1114,7 +1130,7 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), outBuilder(NULL)
|
|
|
+CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), outBuilder(NULL)
|
|
|
{
|
|
|
limitTransformExtra = &helper;
|
|
|
keyedLimit = (unsigned __int64)-1;
|
|
@@ -1248,7 +1264,7 @@ const void * CHThorIndexNormalizeActivity::createNextRow()
|
|
|
|
|
|
}
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
// A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
OwnedRoxieString lfn(arg.getFileName());
|
|
@@ -1264,7 +1280,7 @@ extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_ag
|
|
|
}
|
|
|
_agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
|
|
|
+ return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1274,7 +1290,7 @@ class CHThorIndexAggregateActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
~CHThorIndexAggregateActivity();
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -1294,8 +1310,8 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), outBuilder(NULL)
|
|
|
+CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), outBuilder(NULL)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -1366,7 +1382,7 @@ const void *CHThorIndexAggregateActivity::nextRow()
|
|
|
}
|
|
|
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
// A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
OwnedRoxieString lfn(arg.getFileName());
|
|
@@ -1382,7 +1398,7 @@ extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_ag
|
|
|
}
|
|
|
_agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
|
|
|
+ return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1391,7 +1407,7 @@ class CHThorIndexCountActivity : public CHThorIndexReadActivityBase
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
|
|
|
//interface IHThorInput
|
|
|
virtual void ready();
|
|
@@ -1407,8 +1423,8 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
|
|
|
- : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg)
|
|
|
+CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node)
|
|
|
+ : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg)
|
|
|
{
|
|
|
choosenLimit = (unsigned __int64)-1;
|
|
|
finished = false;
|
|
@@ -1472,7 +1488,7 @@ const void *CHThorIndexCountActivity::nextRow()
|
|
|
}
|
|
|
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
// A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
OwnedRoxieString lfn(arg.getFileName());
|
|
@@ -1488,7 +1504,7 @@ extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent,
|
|
|
}
|
|
|
_agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
|
|
|
+ return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -1497,7 +1513,7 @@ class CHThorIndexGroupAggregateActivity : public CHThorIndexReadActivityBase, im
|
|
|
{
|
|
|
|
|
|
public:
|
|
|
- CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
|
|
|
+ CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df, IPropertyTree *_node);
|
|
|
IMPLEMENT_IINTERFACE
|
|
|
|
|
|
//interface IHThorInput
|
|
@@ -1518,7 +1534,7 @@ protected:
|
|
|
};
|
|
|
|
|
|
|
|
|
-CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), aggregated(_arg, _arg)
|
|
|
+CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df, _node), helper(_arg), aggregated(_arg, _arg)
|
|
|
{
|
|
|
eof = false;
|
|
|
gathered = false;
|
|
@@ -1584,7 +1600,7 @@ const void *CHThorIndexGroupAggregateActivity::nextRow()
|
|
|
}
|
|
|
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
// A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
|
|
|
OwnedRoxieString lfn(arg.getFileName());
|
|
@@ -1600,7 +1616,7 @@ extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext
|
|
|
}
|
|
|
_agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
enterSingletonSuperfiles(dFile);
|
|
|
- return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
|
|
|
+ return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile, _node);
|
|
|
}
|
|
|
|
|
|
//-------------------------------------------------------------------------------------------------------------
|
|
@@ -2228,12 +2244,18 @@ protected:
|
|
|
class CHThorFetchActivityBase : public CHThorThreadedActivityBase, public IFetchHandlerFactory<SimpleFetchPartHandlerBase>
|
|
|
{
|
|
|
public:
|
|
|
- CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize)
|
|
|
+ CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, IPropertyTree *_node)
|
|
|
: CHThorThreadedActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize)
|
|
|
{
|
|
|
pendingSeq = 0;
|
|
|
signalSeq = 0;
|
|
|
dequeuedSeq = 0;
|
|
|
+ if (_node)
|
|
|
+ {
|
|
|
+ const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
|
|
|
+ if (recordTranslationModeHintText)
|
|
|
+ recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
~CHThorFetchActivityBase()
|
|
@@ -2361,13 +2383,22 @@ private:
|
|
|
Owned<DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest> > parts;
|
|
|
offset_t pendingSeq, signalSeq, dequeuedSeq;
|
|
|
QueueOf<const void *, true> pending;
|
|
|
+ RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
|
|
|
+
|
|
|
+protected:
|
|
|
+ RecordTranslationMode getLayoutTranslationMode()
|
|
|
+ {
|
|
|
+ if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
|
|
|
+ return recordTranslationModeHint;
|
|
|
+ return agent.getLayoutTranslationMode();
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CHThorFlatFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
|
|
|
{
|
|
|
public:
|
|
|
- CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, MemoryAttr &encryptionkey)
|
|
|
- : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize), helper(_arg)
|
|
|
+ CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, IPropertyTree *_node, MemoryAttr &encryptionkey)
|
|
|
+ : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize, _node), helper(_arg)
|
|
|
{}
|
|
|
|
|
|
~CHThorFlatFetchActivity()
|
|
@@ -2442,7 +2473,7 @@ protected:
|
|
|
{
|
|
|
actualDiskMeta.set(helper.queryDiskRecordSize());
|
|
|
translator.clear();
|
|
|
- if (agent.getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
+ if (getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
{
|
|
|
::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
}
|
|
@@ -2458,7 +2489,7 @@ protected:
|
|
|
translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
|
|
|
if (translator->canTranslate())
|
|
|
{
|
|
|
- if (agent.getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
+ if (getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
|
|
|
#ifdef _DEBUG
|
|
|
translator->describe();
|
|
@@ -2478,14 +2509,14 @@ protected:
|
|
|
IHThorFetchArg & helper;
|
|
|
};
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
size32_t kl;
|
|
|
void *k;
|
|
|
arg.getFileEncryptKey(kl,k);
|
|
|
MemoryAttr encryptionkey;
|
|
|
encryptionkey.setOwn(kl,k);
|
|
|
- return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, arg.queryDiskRecordSize(),encryptionkey);
|
|
|
+ return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, arg.queryDiskRecordSize(), _node, encryptionkey);
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------------------
|
|
@@ -2493,8 +2524,8 @@ extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsi
|
|
|
class CHThorCsvFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
|
|
|
{
|
|
|
public:
|
|
|
- CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind)
|
|
|
- : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL), helper(_arg)
|
|
|
+ CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL, _node), helper(_arg)
|
|
|
{
|
|
|
//MORE: I have no idea what should be passed for recordSize in the line above, either something that reads a fixed size, or
|
|
|
//reads a record based on the csv information
|
|
@@ -2583,9 +2614,9 @@ protected:
|
|
|
IHThorCsvFetchArg & helper;
|
|
|
};
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
+ return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------------------
|
|
@@ -2662,8 +2693,8 @@ protected:
|
|
|
class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
|
|
|
{
|
|
|
public:
|
|
|
- CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind)
|
|
|
- : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL), helper(_arg)
|
|
|
+ CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+ : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL, _node), helper(_arg)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -2722,9 +2753,9 @@ protected:
|
|
|
IHThorXmlFetchArg & helper;
|
|
|
};
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
+ return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------------------
|
|
@@ -3395,8 +3426,9 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
|
|
|
IPointerArrayOf<IOutputMetaData> actualLayouts; // all the index layouts are saved in here to ensure their lifetime is adequate
|
|
|
Owned<IOutputMetaData> actualDiskMeta; // only one disk layout is permitted
|
|
|
Owned<const IDynamicTransform> translator;
|
|
|
+ RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
|
|
|
public:
|
|
|
- CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind)
|
|
|
+ CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _arg.queryDiskRecordSize()), helper(_arg)
|
|
|
{
|
|
|
atomic_set(&prefiltered, 0);
|
|
@@ -3405,6 +3437,12 @@ public:
|
|
|
seeks = 0;
|
|
|
scans = 0;
|
|
|
eclKeySize.set(helper.queryIndexRecordSize());
|
|
|
+ if (_node)
|
|
|
+ {
|
|
|
+ const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
|
|
|
+ if (recordTranslationModeHintText)
|
|
|
+ recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
~CHThorKeyedJoinActivity()
|
|
@@ -4055,14 +4093,21 @@ public:
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
+ RecordTranslationMode getLayoutTranslationMode()
|
|
|
+ {
|
|
|
+ if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
|
|
|
+ return recordTranslationModeHint;
|
|
|
+ return agent.getLayoutTranslationMode();
|
|
|
+ }
|
|
|
+
|
|
|
virtual const IDynamicTransform * getLayoutTranslator(IDistributedFile * f) override
|
|
|
{
|
|
|
- if(agent.getLayoutTranslationMode() == RecordTranslationMode::AlwaysECL)
|
|
|
+ if(getLayoutTranslationMode() == RecordTranslationMode::AlwaysECL)
|
|
|
{
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- if(agent.getLayoutTranslationMode() == RecordTranslationMode::None)
|
|
|
+ if(getLayoutTranslationMode() == RecordTranslationMode::None)
|
|
|
{
|
|
|
verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, true);
|
|
|
return NULL;
|
|
@@ -4110,7 +4155,7 @@ protected:
|
|
|
{
|
|
|
actualDiskMeta.set(helper.queryDiskRecordSize());
|
|
|
translator.clear();
|
|
|
- if (agent.getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
+ if (getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
{
|
|
|
::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
}
|
|
@@ -4126,7 +4171,7 @@ protected:
|
|
|
translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
|
|
|
if (translator->canTranslate())
|
|
|
{
|
|
|
- if (agent.getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
+ if (getLayoutTranslationMode()==RecordTranslationMode::None)
|
|
|
throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
|
|
|
}
|
|
|
else
|
|
@@ -4149,7 +4194,7 @@ protected:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind)
|
|
|
+extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
{
|
|
|
- return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind);
|
|
|
+ return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
|
|
|
}
|