|
@@ -8160,6 +8160,46 @@ void CHThorDiskReadBaseActivity::stop()
|
|
|
CHThorActivityBase::stop();
|
|
|
}
|
|
|
|
|
|
+#define TE_FileTypeMismatch 10138 // NB: duplicated from thorlcr/shared/thexception.hpp, but be moved to common header
|
|
|
+void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file)
|
|
|
+{
|
|
|
+ if (rt_csv == readType)
|
|
|
+ return; // CSV read is permitted to read any type
|
|
|
+ if (!agent.queryWorkUnit()->getDebugValueInt(OPT_VALIDATE_FILE_TYPE, true))
|
|
|
+ return;
|
|
|
+ bool warningOnly = false;
|
|
|
+ const char *expectedType = nullptr;
|
|
|
+ switch (readType)
|
|
|
+ {
|
|
|
+ case rt_binary:
|
|
|
+ if (fixedDiskRecordSize) // we allow fixed width reads of other formats
|
|
|
+ return;
|
|
|
+ expectedType = "flat";
|
|
|
+ break;
|
|
|
+ case rt_xml:
|
|
|
+ expectedType = "xml";
|
|
|
+ warningOnly = true;
|
|
|
+ break;
|
|
|
+ case rt_json:
|
|
|
+ expectedType = "json";
|
|
|
+ warningOnly = true;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
+ const char *kind = queryFileKind(file);
|
|
|
+ if (isEmptyString(kind)) // file has no published kind, can't validate
|
|
|
+ return;
|
|
|
+ if (!strieq(kind, expectedType))
|
|
|
+ {
|
|
|
+ Owned<IException> e = makeStringExceptionV(TE_FileTypeMismatch, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
|
|
|
+ if (!warningOnly)
|
|
|
+ throw e.getClear();
|
|
|
+ StringBuffer tmp;
|
|
|
+ agent.addWuException(e->errorMessage(tmp), e->errorCode(), SeverityWarning, "eclagent");
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void CHThorDiskReadBaseActivity::resolve()
|
|
|
{
|
|
|
OwnedRoxieString fileName(helper.getFileName());
|
|
@@ -8190,6 +8230,8 @@ void CHThorDiskReadBaseActivity::resolve()
|
|
|
IDistributedFile *dFile = ldFile->queryDistributedFile();
|
|
|
if (dFile) //only makes sense for distributed (non local) files
|
|
|
{
|
|
|
+ checkFileType(dFile); // throws an exception if file types mismatch
|
|
|
+
|
|
|
persistent = dFile->queryAttributes().getPropBool("@persistent");
|
|
|
dfsParts.setown(dFile->getIterator());
|
|
|
if (helper.getFlags() & TDRfilenamecallback)
|
|
@@ -9352,7 +9394,7 @@ void CHThorCsvReadActivity::checkOpenNext()
|
|
|
|
|
|
CHThorXmlReadActivity::CHThorXmlReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node) : CHThorDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg)
|
|
|
{
|
|
|
- readType = rt_xml;
|
|
|
+ readType = (kind==TAKjsonread) ? rt_json : rt_xml;
|
|
|
}
|
|
|
|
|
|
void CHThorXmlReadActivity::ready()
|