thactivitymaster.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jprop.hpp"
  15. #include "jstring.hpp"
  16. #include "eclhelper_dyn.hpp"
  17. #include "hqlexpr.hpp"
  18. #include "commonext.hpp"
  19. #include "thgraphmaster.ipp"
  20. #include "thormisc.hpp"
  21. #include "thactivitymaster.ipp"
  22. #include "thexception.hpp"
  23. actmaster_decl CGraphElementBase *createMasterContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph);
  24. MODULE_INIT(INIT_PRIORITY_STANDARD)
  25. {
  26. registerCreateFunc(createMasterContainer);
  27. return true;
  28. }
  29. #include "action/thaction.ipp"
  30. #include "aggregate/thaggregate.ipp"
  31. #include "apply/thapply.ipp"
  32. #include "catch/thcatch.ipp"
  33. #include "choosesets/thchoosesets.ipp"
  34. #include "countproject/thcountproject.ipp"
  35. #include "csvread/thcsvread.ipp"
  36. #include "diskread/thdiskread.ipp"
  37. #include "diskwrite/thdiskwrite.ipp"
  38. #include "distribution/thdistribution.ipp"
  39. #include "enth/thenth.ipp"
  40. #include "external/thexternal.ipp"
  41. #include "filter/thfilter.ipp"
  42. #include "firstn/thfirstn.ipp"
  43. #include "funnel/thfunnel.ipp"
  44. #include "hashdistrib/thhashdistrib.ipp"
  45. #include "indexread/thindexread.ipp"
  46. #include "indexwrite/thindexwrite.ipp"
  47. #include "iterate/thiterate.ipp"
  48. #include "join/thjoin.ipp"
  49. #include "keydiff/thkeydiff.ipp"
  50. #include "keyedjoin/thkeyedjoin.ipp"
  51. #include "keypatch/thkeypatch.ipp"
  52. #include "limit/thlimit.ipp"
  53. #include "lookupjoin/thlookupjoin.ipp"
  54. #include "msort/thmsort.ipp"
  55. #include "nullaction/thnullaction.ipp"
  56. #include "pipewrite/thpipewrite.ipp"
  57. #include "result/thresult.ipp"
  58. #include "rollup/throllup.ipp"
  59. #include "selectnth/thselectnth.ipp"
  60. #include "spill/thspill.ipp"
  61. #include "topn/thtopn.ipp"
  62. #include "when/thwhen.ipp"
  63. #include "wuidread/thwuidread.ipp"
  64. #include "wuidwrite/thwuidwrite.ipp"
  65. #include "xmlread/thxmlread.ipp"
  66. #include "xmlwrite/thxmlwrite.ipp"
  67. #include "merge/thmerge.ipp"
  68. #include "fetch/thfetch.ipp"
  69. #include "loop/thloop.ipp"
  70. CActivityBase *createGroupActivityMaster(CMasterGraphElement *container);
  71. class CGenericMasterGraphElement : public CMasterGraphElement
  72. {
  73. public:
  74. CGenericMasterGraphElement(CGraphBase &owner, IPropertyTree &xgmml) : CMasterGraphElement(owner, xgmml)
  75. {
  76. }
  77. virtual void serializeCreateContext(MemoryBuffer &mb)
  78. {
  79. // bit of hack, need to tell slave if wuidread converted to diskread (see master activity)
  80. CMasterGraphElement::serializeCreateContext(mb);
  81. if (kind == TAKworkunitread)
  82. {
  83. if (!activity)
  84. doCreateActivity();
  85. IHThorArg *helper = activity->queryHelper();
  86. IHThorDiskReadArg *diskHelper = QUERYINTERFACE(helper, IHThorDiskReadArg);
  87. mb.append(NULL != diskHelper); // flag to slaves that they should create diskread
  88. if (diskHelper)
  89. {
  90. OwnedRoxieString fileName(diskHelper->getFileName());
  91. mb.append(fileName);
  92. }
  93. }
  94. }
  95. virtual CActivityBase *factory(ThorActivityKind kind)
  96. {
  97. CActivityBase *ret = NULL;
  98. switch (kind)
  99. {
  100. case TAKfiltergroup:
  101. case TAKlocalresultread:
  102. case TAKif:
  103. case TAKchildif:
  104. case TAKcase:
  105. case TAKchildcase:
  106. case TAKdegroup:
  107. case TAKsplit:
  108. case TAKproject:
  109. case TAKprefetchproject:
  110. case TAKprefetchcountproject:
  111. case TAKxmlparse:
  112. case TAKchilditerator:
  113. case TAKlinkedrawiterator:
  114. case TAKcatch:
  115. case TAKsample:
  116. case TAKnormalize:
  117. case TAKnormalizechild:
  118. case TAKnormalizelinkedchild:
  119. case TAKinlinetable:
  120. case TAKpull:
  121. case TAKnull:
  122. case TAKpiperead:
  123. case TAKpipethrough:
  124. case TAKparse:
  125. case TAKchildaggregate:
  126. case TAKchildgroupaggregate:
  127. case TAKchildthroughnormalize:
  128. case TAKchildnormalize:
  129. case TAKapply:
  130. case TAKfunnel:
  131. case TAKcombine:
  132. case TAKregroup:
  133. case TAKsorted:
  134. case TAKnwayinput:
  135. case TAKnwayselect:
  136. case TAKnwaymerge:
  137. case TAKnwaymergejoin:
  138. case TAKnwayjoin:
  139. case TAKgraphloopresultread:
  140. case TAKstreamediterator:
  141. case TAKsoap_rowdataset:
  142. case TAKsoap_rowaction:
  143. case TAKsoap_datasetdataset:
  144. case TAKsoap_datasetaction:
  145. case TAKhttp_rowdataset:
  146. case TAKdistributed:
  147. case TAKtrace:
  148. case TAKemptyaction:
  149. ret = new CMasterActivity(this);
  150. break;
  151. case TAKskipcatch:
  152. case TAKcreaterowcatch:
  153. ret = createSkipCatchActivityMaster(this);
  154. break;
  155. case TAKdiskread:
  156. case TAKdisknormalize:
  157. case TAKspillread:
  158. ret = createDiskReadActivityMaster(this);
  159. break;
  160. case TAKdiskaggregate:
  161. ret = createDiskAggregateActivityMaster(this);
  162. break;
  163. case TAKdiskcount:
  164. ret = createDiskCountActivityMaster(this);
  165. break;
  166. case TAKdiskgroupaggregate:
  167. ret = createDiskGroupAggregateActivityMaster(this);
  168. break;
  169. case TAKindexread:
  170. ret = createIndexReadActivityMaster(this);
  171. break;
  172. case TAKindexcount:
  173. ret = createIndexCountActivityMaster(this);
  174. break;
  175. case TAKindexnormalize:
  176. ret = createIndexNormalizeActivityMaster(this);
  177. break;
  178. case TAKindexaggregate:
  179. ret = createIndexAggregateActivityMaster(this);
  180. break;
  181. case TAKindexgroupaggregate:
  182. case TAKindexgroupexists:
  183. case TAKindexgroupcount:
  184. ret = createIndexGroupAggregateActivityMaster(this);
  185. break;
  186. case TAKdiskwrite:
  187. case TAKspillwrite:
  188. ret = createDiskWriteActivityMaster(this);
  189. break;
  190. case TAKcsvwrite:
  191. ret = createCsvWriteActivityMaster(this);
  192. break;
  193. case TAKspill:
  194. ret = createSpillActivityMaster(this);
  195. break;
  196. case TAKdedup:
  197. case TAKrollup:
  198. case TAKrollupgroup:
  199. ret = createDedupRollupActivityMaster(this);
  200. break;
  201. case TAKfilter:
  202. case TAKfilterproject:
  203. ret = createFilterActivityMaster(this);
  204. break;
  205. case TAKnonempty:
  206. ret = createNonEmptyActivityMaster(this);
  207. break;
  208. case TAKsort:
  209. ret = createSortActivityMaster(this);
  210. break;
  211. case TAKgroup:
  212. ret = createGroupActivityMaster(this);
  213. break;
  214. case TAKprocess:
  215. case TAKiterate:
  216. ret = createIterateActivityMaster(this);
  217. break;
  218. case TAKthroughaggregate:
  219. ret = createThroughAggregateActivityMaster(this);
  220. break;
  221. case TAKaggregate:
  222. case TAKexistsaggregate:
  223. case TAKcountaggregate:
  224. ret = createAggregateActivityMaster(this);
  225. break;
  226. case TAKnwaydistribute:
  227. ret = createNWayDistributeActivityMaster(this);
  228. break;
  229. case TAKhashdistribute:
  230. case TAKpartition:
  231. ret = createHashDistributeActivityMaster(this);
  232. break;
  233. case TAKhashaggregate:
  234. ret = createHashAggregateActivityMaster(this);
  235. break;
  236. case TAKhashjoin:
  237. case TAKhashdenormalize:
  238. case TAKhashdenormalizegroup:
  239. ret= createHashJoinActivityMaster(this);
  240. break;
  241. case TAKkeyeddistribute:
  242. ret = createKeyedDistributeActivityMaster(this);
  243. break;
  244. case TAKhashdistributemerge:
  245. ret = createDistributeMergeActivityMaster(this);
  246. break;
  247. case TAKhashdedup:
  248. ret = createHashDedupMergeActivityMaster(this);
  249. break;
  250. case TAKfirstn:
  251. ret = createFirstNActivityMaster(this);
  252. break;
  253. case TAKjoin:
  254. case TAKselfjoin:
  255. case TAKselfjoinlight:
  256. case TAKdenormalize:
  257. case TAKdenormalizegroup:
  258. ret = createJoinActivityMaster(this);
  259. break;
  260. case TAKlookupjoin:
  261. case TAKalljoin:
  262. case TAKlookupdenormalize:
  263. case TAKlookupdenormalizegroup:
  264. case TAKsmartjoin:
  265. case TAKsmartdenormalize:
  266. case TAKsmartdenormalizegroup:
  267. case TAKalldenormalize:
  268. case TAKalldenormalizegroup:
  269. ret = createLookupJoinActivityMaster(this);
  270. break;
  271. case TAKkeyedjoin:
  272. case TAKkeyeddenormalize:
  273. case TAKkeyeddenormalizegroup:
  274. ret = createKeyedJoinActivityMaster(this);
  275. break;
  276. case TAKworkunitwrite:
  277. ret = createWorkUnitWriteActivityMaster(this);
  278. break;
  279. case TAKdictionaryworkunitwrite:
  280. ret = createDictionaryWorkunitWriteMaster(this);
  281. break;
  282. case TAKdictionaryresultwrite:
  283. if (!queryOwner().queryOwner() || queryOwner().isGlobal()) // don't need dictionary in master if in local child query
  284. ret = createDictionaryResultActivityMaster(this);
  285. else
  286. ret = new CMasterActivity(this);
  287. break;
  288. break;
  289. case TAKremoteresult:
  290. ret = createResultActivityMaster(this);
  291. break;
  292. case TAKselectn:
  293. ret = createSelectNthActivityMaster(this);
  294. break;
  295. case TAKenth:
  296. ret = createEnthActivityMaster(this);
  297. break;
  298. case TAKdistribution:
  299. ret = createDistributionActivityMaster(this);
  300. break;
  301. case TAKcountproject:
  302. ret = createCountProjectActivityMaster(this);
  303. break;
  304. case TAKchoosesets:
  305. ret = createChooseSetsActivityMaster(this);
  306. break;
  307. case TAKchoosesetsenth:
  308. case TAKchoosesetslast:
  309. ret = createChooseSetsPlusActivityMaster(this);
  310. break;
  311. case TAKpipewrite:
  312. ret = createPipeWriteActivityMaster(this);
  313. break;
  314. case TAKcsvread:
  315. ret = createCCsvReadActivityMaster(this);
  316. break;
  317. case TAKindexwrite:
  318. ret = createIndexWriteActivityMaster(this);
  319. break;
  320. case TAKfetch:
  321. ret = createFetchActivityMaster(this);
  322. break;
  323. case TAKcsvfetch:
  324. ret = createCsvFetchActivityMaster(this);
  325. break;
  326. case TAKxmlfetch:
  327. case TAKjsonfetch:
  328. ret = createXmlFetchActivityMaster(this);
  329. break;
  330. case TAKworkunitread:
  331. ret = createWorkUnitActivityMaster(this);
  332. break;
  333. case TAKsideeffect:
  334. ret = createNullActionActivityMaster(this);
  335. break;
  336. case TAKsimpleaction:
  337. ret = createActionActivityMaster(this);
  338. break;
  339. case TAKtopn:
  340. ret = createTopNActivityMaster(this);
  341. break;
  342. case TAKxmlread:
  343. case TAKjsonread:
  344. ret = createXmlReadActivityMaster(this);
  345. break;
  346. case TAKxmlwrite:
  347. case TAKjsonwrite:
  348. ret = createXmlWriteActivityMaster(this, kind);
  349. break;
  350. case TAKmerge:
  351. ret = createMergeActivityMaster(this);
  352. break;
  353. case TAKkeydiff:
  354. ret = createKeyDiffActivityMaster(this);
  355. break;
  356. case TAKkeypatch:
  357. ret = createKeyPatchActivityMaster(this);
  358. break;
  359. case TAKlimit:
  360. case TAKskiplimit:
  361. case TAKcreaterowlimit:
  362. ret = createLimitActivityMaster(this);
  363. break;
  364. case TAKlooprow:
  365. case TAKloopcount:
  366. case TAKloopdataset:
  367. ret = createLoopActivityMaster(this);
  368. break;
  369. case TAKgraphloop:
  370. case TAKparallelgraphloop:
  371. ret = createGraphLoopActivityMaster(this);
  372. break;
  373. case TAKlocalresultspill:
  374. case TAKlocalresultwrite:
  375. /* NB: create even if non-global child graph, because although the result itself
  376. * won't be used, codegen. graph initialization code, may reference the result on the master
  377. */
  378. ret = createLocalResultActivityMaster(this);
  379. break;
  380. case TAKgraphloopresultwrite:
  381. ret = createGraphLoopResultActivityMaster(this);
  382. break;
  383. case TAKchilddataset:
  384. UNIMPLEMENTED;
  385. case TAKifaction:
  386. throwUnexpected();
  387. case TAKwhen_dataset:
  388. ret = createWhenActivityMaster(this);
  389. break;
  390. case TAKexternalprocess:
  391. case TAKexternalsink:
  392. case TAKexternalsource:
  393. ret = createExternalActivityMaster(this);
  394. break;
  395. default:
  396. throw MakeActivityException(this, TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind));
  397. }
  398. return ret;
  399. }
  400. };
  401. actmaster_decl CGraphElementBase *createMasterContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph)
  402. {
  403. return new CGenericMasterGraphElement(owner, xgmml);
  404. }
  405. void updateActivityResult(IConstWorkUnit &workunit, unsigned helperFlags, unsigned sequence, const char *logicalFilename, unsigned __int64 recordCount)
  406. {
  407. Owned<IWorkUnit> wu = &workunit.lock();
  408. Owned<IWUResult> r;
  409. r.setown(updateWorkUnitResult(wu, logicalFilename, sequence));
  410. r->setResultTotalRowCount(recordCount);
  411. r->setResultStatus(ResultStatusCalculated);
  412. if (TDWresult & helperFlags)
  413. r->setResultFilename(logicalFilename);
  414. else
  415. r->setResultLogicalName(logicalFilename);
  416. r.clear();
  417. wu.clear();
  418. }
  419. void CSlavePartMapping::getParts(unsigned i, IArrayOf<IPartDescriptor> &parts)
  420. {
  421. if (local)
  422. i = 0;
  423. if (i>=maps.ordinality()) return;
  424. CSlaveMap &map = maps.item(i);
  425. ForEachItemIn(m, map)
  426. parts.append(*LINK(&map.item(m)));
  427. }
  428. void CSlavePartMapping::serializeNullMap(MemoryBuffer &mb)
  429. {
  430. mb.append((unsigned)0);
  431. }
  432. void CSlavePartMapping::serializeNullOffsetMap(MemoryBuffer &mb)
  433. {
  434. mb.append((unsigned)0);
  435. }
  436. void CSlavePartMapping::serializeMap(unsigned i, MemoryBuffer &mb, bool countPrefix)
  437. {
  438. if (local)
  439. i = 0;
  440. if (i >= maps.ordinality() || (0 == maps.item(i).ordinality()))
  441. {
  442. if (countPrefix)
  443. mb.append((unsigned)0);
  444. return;
  445. }
  446. CSlaveMap &map = maps.item(i);
  447. unsigned nPos = mb.length();
  448. unsigned n=0;
  449. if (countPrefix)
  450. mb.append(n);
  451. UnsignedArray parts;
  452. ForEachItemIn(m, map)
  453. parts.append(map.item(m).queryPartIndex());
  454. if (countPrefix)
  455. {
  456. n = parts.ordinality();
  457. mb.writeDirect(nPos, sizeof(n), &n);
  458. }
  459. fileDesc->serializeParts(mb, parts);
  460. }
  461. CSlavePartMapping::CSlavePartMapping(const char *_logicalName, IFileDescriptor &_fileDesc, IUserDescriptor *_userDesc, IGroup &localGroup, bool _local, bool index, IHash *hash, IDistributedSuperFile *super)
  462. : fileDesc(&_fileDesc), userDesc(_userDesc), local(_local)
  463. {
  464. unsigned maxWidth = local ? 1 : localGroup.ordinality();
  465. logicalName.set(_logicalName);
  466. fileWidth = fileDesc->numParts();
  467. if (super && fileWidth)
  468. {
  469. bool merge = index;
  470. unsigned _maxWidth = super->querySubFile(0,true).numParts();
  471. if (_maxWidth > 1)
  472. {
  473. if (index)
  474. {
  475. fileWidth -= super->numSubFiles(true); // tlk's
  476. if (merge)
  477. _maxWidth -= 1; // tlk
  478. }
  479. if (merge && _maxWidth < maxWidth)
  480. maxWidth = _maxWidth;
  481. }
  482. }
  483. else if (index && fileWidth>1)
  484. fileWidth -= 1;
  485. unsigned p;
  486. unsigned which = 0;
  487. if (fileWidth<=maxWidth || NULL!=hash)
  488. {
  489. if (fileWidth>maxWidth && 0 != fileWidth % maxWidth)
  490. throw MakeThorException(0, "Unimplemented - attempting to read distributed file (%s), on smaller cluster that is not a factor of original", logicalName.get());
  491. for (p=0; p<fileWidth; p++)
  492. {
  493. Owned<IPartDescriptor> partDesc = fileDesc->getPart(p);
  494. CSlaveMap *map;
  495. if (maps.isItem(which))
  496. map = &maps.item(which);
  497. else
  498. {
  499. map = new CSlaveMap();
  500. maps.append(*map);
  501. }
  502. map->append(*LINK(partDesc));
  503. partToNode.append(which);
  504. which++;
  505. if (which>=maxWidth) which = 0;
  506. }
  507. }
  508. else
  509. {
  510. unsigned tally = 0;
  511. for (p=0; p<fileWidth; p++)
  512. {
  513. Owned<IPartDescriptor> partDesc = fileDesc->getPart(p);
  514. CSlaveMap *map;
  515. if (maps.isItem(which))
  516. map = &maps.item(which);
  517. else
  518. {
  519. map = new CSlaveMap();
  520. maps.append(*map);
  521. }
  522. map->append(*LINK(partDesc));
  523. partToNode.append(which);
  524. tally += maxWidth;
  525. if (tally >= fileWidth)
  526. {
  527. tally -= fileWidth;
  528. which++;
  529. }
  530. }
  531. }
  532. }
  533. #include "../activities/fetch/thfetchcommon.hpp"
  534. void CSlavePartMapping::serializeFileOffsetMap(MemoryBuffer &mb)
  535. {
  536. mb.append(fileWidth);
  537. DelayedSizeMarker sizeMark(mb);
  538. ForEachItemIn(sm, maps)
  539. {
  540. CSlaveMap &map = maps.item(sm);
  541. ForEachItemIn(m, map)
  542. {
  543. IPartDescriptor &partDesc = map.item(m);
  544. IPropertyTree &props = partDesc.queryProperties();
  545. FPosTableEntry entry;
  546. entry.base = props.getPropInt64("@offset"); // should check
  547. entry.top = entry.base+props.getPropInt64("@size"); // was -1?
  548. entry.index = sm;
  549. mb.append(sizeof(FPosTableEntry), &entry);
  550. }
  551. }
  552. sizeMark.write();
  553. }
  554. CSlavePartMapping *getFileSlaveMaps(const char *logicalName, IFileDescriptor &fileDesc, IUserDescriptor *userDesc, IGroup &localGroup, bool local, bool index, IHash *hash, IDistributedSuperFile *super)
  555. {
  556. return new CSlavePartMapping(logicalName, fileDesc, userDesc, localGroup, local, index, hash, super);
  557. }
  558. WUFileKind getDiskOutputKind(unsigned flags)
  559. {
  560. if (TDXtemporary & flags)
  561. return WUFileTemporary;
  562. else if(TDXjobtemp & flags)
  563. return WUFileJobOwned;
  564. else if(TDWowned & flags)
  565. return WUFileOwned;
  566. else
  567. return WUFileStandard;
  568. }
  569. void checkSuperFileOwnership(IDistributedFile &file)
  570. {
  571. if (file.queryAttributes().hasProp("SuperOwner"))
  572. {
  573. StringBuffer owners;
  574. Owned<IPropertyTreeIterator> iter = file.queryAttributes().getElements("SuperOwner");
  575. if (iter->first())
  576. {
  577. for (;;)
  578. {
  579. iter->query().getProp(NULL, owners);
  580. if (!iter->next())
  581. break;
  582. owners.append(", ");
  583. }
  584. }
  585. throw MakeStringException(TE_MemberOfSuperFile, "Cannot write %s, as owned by superfile(s): %s", file.queryLogicalName(), owners.str());
  586. }
  587. }
  588. void checkFormatCrc(CActivityBase *activity, IDistributedFile *file, unsigned expectedFormatCrc, IOutputMetaData *expected,
  589. unsigned projectedFormatCrc, IOutputMetaData *projected, bool index)
  590. {
  591. IDistributedFile *f = file;
  592. IDistributedSuperFile *super = f->querySuperFile();
  593. Owned<IDistributedFileIterator> iter;
  594. if (super)
  595. {
  596. iter.setown(super->getSubFileIterator(true));
  597. verifyex(iter->first());
  598. f = &iter->query();
  599. }
  600. unsigned prevFormatCrc = 0;
  601. StringBuffer kindStr(activityKindStr(activity->queryContainer().getKind()));
  602. for (;;)
  603. {
  604. unsigned dfsCrc = 0;
  605. if (f->getFormatCrc(dfsCrc)) // can't validate if missing
  606. {
  607. if (prevFormatCrc && (prevFormatCrc != dfsCrc)) // NB: all subfiles must have same dfsCrc and will use same translators for now (see HPCC-21834)
  608. {
  609. StringBuffer fileStr;
  610. if (super)
  611. fileStr.append("Superfile: ").append(file->queryLogicalName()).append(", subfile: ");
  612. else fileStr.append("File: ");
  613. fileStr.append(f->queryLogicalName());
  614. Owned<IThorException> e = MakeActivityException(activity, TE_FormatCrcMismatch, "%s: Layout does not match published layout. %s", kindStr.str(), fileStr.str());
  615. if (index && !f->queryAttributes().hasProp("_record_layout")) // Cannot verify if _true_ crc mismatch if soft layout missing anymore
  616. LOG(MCwarning, thorJob, e);
  617. else
  618. {
  619. if (activity->queryContainer().queryJob().getWorkUnitValueBool("skipFileFormatCrcCheck", false))
  620. {
  621. // propagate as warning to workunit
  622. e->setAction(tea_warning);
  623. e->setSeverity(SeverityWarning);
  624. activity->fireException(e);
  625. }
  626. else
  627. throw e.getClear();
  628. }
  629. }
  630. }
  631. prevFormatCrc = dfsCrc;
  632. if (!super||!iter->next())
  633. break;
  634. f = &iter->query();
  635. }
  636. }
  637. void loadMasters()
  638. {
  639. }