hthorkey.cpp 138 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "hthor.ipp"
  14. #include "rtlkey.hpp"
  15. #include "jhtree.hpp"
  16. #include "eclhelper.hpp"
  17. #include "jthread.hpp"
  18. #include "jqueue.tpp"
  19. #include "dasess.hpp"
  20. #include "thorxmlwrite.hpp"
  21. #include "thorstep.ipp"
  22. #include "roxiedebug.hpp"
  23. #include "thorcommon.hpp"
  24. #include "rtldynfield.hpp"
  25. #define MAX_FETCH_LOOKAHEAD 1000
  26. #define MAX_FILE_READ_FAIL_COUNT 3
  27. using roxiemem::IRowManager;
  28. using roxiemem::OwnedRoxieRow;
  29. using roxiemem::OwnedConstRoxieRow;
  30. using roxiemem::OwnedRoxieString;
  31. static IKeyIndex *openKeyFile(IDistributedFilePart & keyFile)
  32. {
  33. unsigned failcount = 0;
  34. unsigned numCopies = keyFile.numCopies();
  35. assertex(numCopies);
  36. Owned<IException> exc;
  37. for (unsigned copy=0; copy < numCopies && failcount < MAX_FILE_READ_FAIL_COUNT; copy++)
  38. {
  39. RemoteFilename rfn;
  40. try
  41. {
  42. OwnedIFile ifile = createIFile(keyFile.getFilename(rfn,copy));
  43. offset_t thissize = ifile->size();
  44. if (thissize != (offset_t)-1)
  45. {
  46. StringBuffer remotePath;
  47. rfn.getPath(remotePath);
  48. unsigned crc = 0;
  49. keyFile.getCrc(crc);
  50. return createKeyIndex(remotePath.str(), crc, false);
  51. }
  52. }
  53. catch (IException *E)
  54. {
  55. EXCLOG(E, "While opening index file");
  56. if (exc)
  57. E->Release();
  58. else
  59. exc.setown(E);
  60. failcount++;
  61. }
  62. }
  63. if (exc)
  64. throw exc.getClear();
  65. StringBuffer url;
  66. RemoteFilename rfn;
  67. keyFile.getFilename(rfn).getRemotePath(url);
  68. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  69. }
  70. class TransformCallback : public CInterface, implements IThorIndexCallback
  71. {
  72. public:
  73. TransformCallback() { keyManager = NULL; };
  74. IMPLEMENT_IINTERFACE_O
  75. //IThorIndexCallback
  76. virtual const byte * lookupBlob(unsigned __int64 id) override
  77. {
  78. size32_t dummy;
  79. return (byte *) keyManager->loadBlob(id, dummy);
  80. }
  81. public:
  82. void setManager(IKeyManager * _manager)
  83. {
  84. finishedRow();
  85. keyManager = _manager;
  86. }
  87. void finishedRow()
  88. {
  89. if (keyManager)
  90. keyManager->releaseBlobs();
  91. }
  92. protected:
  93. IKeyManager * keyManager;
  94. };
  95. //-------------------------------------------------------------------------------------------------------------
  96. static ILocalOrDistributedFile *resolveLFNIndex(IAgentContext &agent, const char *logicalName, const char *errorTxt, bool optional, bool noteRead, AccessMode accessMode, bool isPrivilegedUser)
  97. {
  98. Owned<ILocalOrDistributedFile> ldFile = agent.resolveLFN(logicalName, errorTxt, optional, noteRead, accessMode, nullptr, isPrivilegedUser);
  99. if (!ldFile)
  100. return nullptr;
  101. IDistributedFile *dFile = ldFile->queryDistributedFile();
  102. if (dFile && !isFileKey(dFile))
  103. throw MakeStringException(0, "Attempting to read flat file as an index: %s", logicalName);
  104. return ldFile.getClear();
  105. }
  106. void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
  107. {
  108. IDistributedSuperFile * super = file->querySuperFile();
  109. while(super && (super->numSubFiles() == 1))
  110. {
  111. file.setown(super->getSubFile(0));
  112. super = file->querySuperFile();
  113. }
  114. }
  115. //-------------------------------------------------------------------------------------------------------------
  116. class CHThorNullAggregateActivity : public CHThorNullActivity
  117. {
  118. public:
  119. CHThorNullAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, IHThorCompoundAggregateExtra &_extra, ThorActivityKind _kind, EclGraph & _graph) : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_extra) {}
  120. //interface IHThorInput
  121. virtual void ready();
  122. virtual const void *nextRow();
  123. virtual bool needsAllocator() const { return true; }
  124. protected:
  125. IHThorCompoundAggregateExtra &helper;
  126. bool finished;
  127. };
  128. void CHThorNullAggregateActivity::ready()
  129. {
  130. CHThorNullActivity::ready();
  131. finished = false;
  132. }
  133. const void *CHThorNullAggregateActivity::nextRow()
  134. {
  135. if (finished) return NULL;
  136. processed++;
  137. finished = true;
  138. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  139. try
  140. {
  141. size32_t newSize = helper.clearAggregate(rowBuilder);
  142. return rowBuilder.finalizeRowClear(newSize);
  143. }
  144. catch(IException * e)
  145. {
  146. throw makeWrappedException(e);
  147. }
  148. }
  149. //-------------------------------------------------------------------------------------------------------------
  150. class CHThorNullCountActivity : public CHThorNullActivity
  151. {
  152. public:
  153. CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind, EclGraph & _graph)
  154. : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind, _graph), finished(false) {}
  155. //interface IHThorInput
  156. virtual void ready();
  157. virtual const void *nextRow();
  158. virtual bool needsAllocator() const { return true; }
  159. protected:
  160. bool finished;
  161. };
  162. void CHThorNullCountActivity::ready()
  163. {
  164. CHThorNullActivity::ready();
  165. finished = false;
  166. }
  167. const void *CHThorNullCountActivity::nextRow()
  168. {
  169. if (finished) return NULL;
  170. processed++;
  171. finished = true;
  172. size32_t outSize = outputMeta.getFixedSize();
  173. void * ret = rowAllocator->createRow(); //meta: outputMeta
  174. if (outSize == 1)
  175. *(byte *)ret = 0;
  176. else
  177. *(unsigned __int64 *)ret = 0;
  178. return rowAllocator->finalizeRow(outSize, ret, outSize);
  179. }
  180. //-------------------------------------------------------------------------------------------------------------
  181. class CHThorIndexReadActivityBase : public CHThorActivityBase
  182. {
  183. public:
  184. CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  185. ~CHThorIndexReadActivityBase();
  186. virtual void ready();
  187. virtual void stop();
  188. IHThorInput *queryOutput(unsigned index) { return this; }
  189. virtual bool needsAllocator() const { return true; }
  190. //interface IHThorInput
  191. virtual bool isGrouped() { return false; }
  192. virtual const char *getFileName() { return NULL; }
  193. virtual bool outputToFile(const char *) { return false; }
  194. virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; }
  195. virtual void updateProgress(IStatisticGatherer &progress) const
  196. {
  197. CHThorActivityBase::updateProgress(progress);
  198. StatsActivityScope scope(progress, activityId);
  199. progress.addStatistic(StNumPostFiltered, queryPostFiltered());
  200. progress.addStatistic(StNumIndexSeeks, querySeeks());
  201. progress.addStatistic(StNumIndexScans, queryScans());
  202. progress.addStatistic(StNumIndexWildSeeks, queryWildSeeks());
  203. }
  204. virtual unsigned querySeeks() const
  205. {
  206. return seeks + (klManager ? klManager->querySeeks() : 0);
  207. }
  208. virtual unsigned queryScans() const
  209. {
  210. return scans + (klManager ? klManager->queryScans() : 0);
  211. }
  212. virtual unsigned queryWildSeeks() const
  213. {
  214. return wildseeks + (klManager ? klManager->queryWildSeeks() : 0);
  215. }
  216. virtual unsigned queryPostFiltered() const
  217. {
  218. return postFiltered;
  219. }
  220. virtual void fail(char const * msg)
  221. {
  222. throw MakeStringExceptionDirect(0, msg);
  223. }
  224. protected:
  225. bool doPreopenLimit(unsigned __int64 limit);
  226. bool doPreopenLimitFile(unsigned __int64 & count, unsigned __int64 limit);
  227. IKeyIndex * doPreopenLimitPart(unsigned __int64 & count, unsigned __int64 limit, unsigned part);
  228. const void * createKeyedLimitOnFailRow();
  229. void getLayoutTranslators();
  230. const IDynamicTransform * getLayoutTranslator(IDistributedFile * f);
  231. void verifyIndex(IKeyIndex * idx);
  232. void initManager(IKeyManager *manager, bool isTlk);
  233. bool firstPart();
  234. virtual bool nextPart();
  235. virtual void initPart();
  236. void resolveIndexFilename();
  237. void killPart();
  238. private:
  239. bool firstMultiPart();
  240. bool nextMultiPart();
  241. bool setCurrentPart(unsigned whichPart);
  242. void clearTlk() { tlk.clear(); tlManager.clear(); }
  243. void openTlk();
  244. bool doNextSuper();
  245. protected:
  246. IHThorIndexReadBaseArg &helper;
  247. IHThorSourceLimitTransformExtra * limitTransformExtra;
  248. CachedOutputMetaData eclKeySize;
  249. size32_t keySize= 0;
  250. // current part
  251. Owned<IDistributedFilePart> curPart;
  252. Owned<IKeyManager> klManager;
  253. Owned<IKeyIndex> keyIndex;
  254. unsigned nextPartNumber = 0;
  255. //multi files
  256. Owned<IDistributedFile> df;
  257. Owned<IKeyIndex> tlk;
  258. Owned<IKeyManager> tlManager;
  259. //super files:
  260. Owned<IDistributedFileIterator> superIterator;
  261. unsigned superIndex = 0;
  262. unsigned superCount = 1;
  263. StringBuffer superName;
  264. TransformCallback callback;
  265. //for preopening (when need counts for keyed skip limit):
  266. Owned<IKeyIndexSet> keyIndexCache;
  267. UnsignedArray superIndexCache;
  268. unsigned keyIndexCacheIdx = 0;
  269. unsigned seeks;
  270. unsigned scans;
  271. unsigned postFiltered;
  272. unsigned wildseeks;
  273. bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
  274. bool localSortKey = false;
  275. bool initializedFileInfo = false;
  276. //for layout translation
  277. Owned<const IDynamicTransform> layoutTrans;
  278. IConstPointerArrayOf<IDynamicTransform> layoutTransArray;
  279. IPointerArrayOf<IOutputMetaData> actualLayouts;
  280. RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
  281. RecordTranslationMode getLayoutTranslationMode()
  282. {
  283. if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
  284. return recordTranslationModeHint;
  285. return agent.getLayoutTranslationMode();
  286. }
  287. };
  288. CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  289. : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
  290. {
  291. nextPartNumber = 0;
  292. eclKeySize.set(helper.queryDiskRecordSize());
  293. postFiltered = 0;
  294. seeks = 0;
  295. scans = 0;
  296. wildseeks = 0;
  297. helper.setCallback(&callback);
  298. limitTransformExtra = nullptr;
  299. if (_node)
  300. {
  301. const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layouttranslation']/@value");
  302. if (recordTranslationModeHintText)
  303. recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText, true);
  304. }
  305. }
  306. CHThorIndexReadActivityBase::~CHThorIndexReadActivityBase()
  307. {
  308. // ReleaseRoxieRow(recBuffer);
  309. }
  310. void CHThorIndexReadActivityBase::ready()
  311. {
  312. CHThorActivityBase::ready();
  313. if(!initializedFileInfo || (helper.getFlags() & TIRvarfilename))
  314. {
  315. resolveIndexFilename();
  316. layoutTransArray.kill();
  317. getLayoutTranslators();
  318. initializedFileInfo = true;
  319. }
  320. }
  321. void CHThorIndexReadActivityBase::resolveIndexFilename()
  322. {
  323. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  324. OwnedRoxieString lfn(helper.getFileName());
  325. Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(agent, lfn, "IndexRead", 0 != (helper.getFlags() & TIRoptional),true, AccessMode::tbdRead, defaultPrivilegedUser);
  326. df.set(ldFile ? ldFile->queryDistributedFile() : NULL);
  327. if (!df)
  328. {
  329. StringBuffer buff;
  330. buff.append("Skipping OPT index read of nonexistent file ").append(lfn);
  331. agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptIndex, SeverityInformation, MSGAUD_user, "hthor");
  332. }
  333. else
  334. {
  335. agent.logFileAccess(df, "HThor", "READ", graph);
  336. enterSingletonSuperfiles(df);
  337. singlePart = false;
  338. localSortKey = (df->queryAttributes().hasProp("@local"));
  339. IDistributedSuperFile *super = df->querySuperFile();
  340. superCount = 1;
  341. superIndex = 0;
  342. nextPartNumber = 0;
  343. if (super)
  344. {
  345. superIterator.setown(super->getSubFileIterator(true));
  346. superCount = super->numSubFiles(true);
  347. if (helper.getFlags() & TIRsorted)
  348. throw MakeStringException(1000, "SORTED attribute is not supported when reading from superkey");
  349. superName.append(df->queryLogicalName());
  350. df.clear();
  351. }
  352. else if (df->numParts() == 1)
  353. {
  354. singlePart = true;
  355. }
  356. }
  357. killPart();
  358. }
  359. void CHThorIndexReadActivityBase::stop()
  360. {
  361. killPart();
  362. CHThorActivityBase::stop();
  363. }
  364. bool CHThorIndexReadActivityBase::doPreopenLimit(unsigned __int64 limit)
  365. {
  366. keyIndexCache.clear();
  367. superIndexCache.kill();
  368. if(!helper.canMatchAny())
  369. return false;
  370. keyIndexCache.setown(createKeyIndexSet());
  371. unsigned __int64 count = 0;
  372. if(superIterator)
  373. {
  374. superIterator->first();
  375. superIndex = 0;
  376. do
  377. {
  378. df.set(&superIterator->query());
  379. if(doPreopenLimitFile(count, limit))
  380. return true;
  381. ++superIndex;
  382. } while(superIterator->next());
  383. return false;
  384. }
  385. else
  386. {
  387. return doPreopenLimitFile(count, limit);
  388. }
  389. }
  390. bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, unsigned __int64 limit)
  391. {
  392. unsigned num = df->numParts()-1;
  393. if(num)
  394. {
  395. if(localSortKey)
  396. {
  397. // MORE - partition support goes here
  398. Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
  399. verifyIndex(tlk);
  400. for(unsigned idx = 0; idx < num; ++idx)
  401. {
  402. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, idx));
  403. if(superIterator)
  404. superIndexCache.append(superIndex);
  405. }
  406. }
  407. else
  408. {
  409. Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
  410. verifyIndex(tlk);
  411. Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false);
  412. initManager(tlman, true);
  413. while(tlman->lookup(false) && (count<=limit))
  414. {
  415. unsigned slavePart = (unsigned)extractFpos(tlman);
  416. if (slavePart)
  417. {
  418. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, slavePart-1));
  419. if(superIterator)
  420. superIndexCache.append(superIndex);
  421. }
  422. }
  423. if (count>limit)
  424. {
  425. if ( agent.queryCodeContext()->queryDebugContext())
  426. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  427. }
  428. }
  429. }
  430. else
  431. {
  432. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, 0));
  433. if(superIterator)
  434. superIndexCache.append(superIndex);
  435. }
  436. return (count>limit);
  437. }
  438. IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & result, unsigned __int64 limit, unsigned part)
  439. {
  440. Owned<IKeyIndex> kidx;
  441. kidx.setown(openKeyFile(df->queryPart(part)));
  442. if(df->numParts() == 1)
  443. verifyIndex(kidx);
  444. if (limit != (unsigned) -1)
  445. {
  446. Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, NULL, helper.hasNewSegmentMonitors(), false);
  447. initManager(kman, false);
  448. result += kman->checkCount(limit-result);
  449. }
  450. return kidx.getClear();
  451. }
  452. void CHThorIndexReadActivityBase::openTlk()
  453. {
  454. tlk.setown(openKeyFile(df->queryPart(df->numParts()-1)));
  455. }
  456. const void * CHThorIndexReadActivityBase::createKeyedLimitOnFailRow()
  457. {
  458. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  459. size32_t newSize = limitTransformExtra->transformOnKeyedLimitExceeded(rowBuilder);
  460. if (newSize)
  461. return rowBuilder.finalizeRowClear(newSize);
  462. return NULL;
  463. }
  464. bool CHThorIndexReadActivityBase::firstPart()
  465. {
  466. killPart();
  467. if ((df || superIterator) && helper.canMatchAny())
  468. {
  469. if(keyIndexCache)
  470. {
  471. keyIndexCacheIdx = 0;
  472. return nextPart();
  473. }
  474. if (singlePart)
  475. {
  476. //part is cached and not reloaded - for efficiency in subqueries.
  477. if (!keyIndex)
  478. return setCurrentPart(0);
  479. initPart();
  480. return true;
  481. }
  482. if (superIterator)
  483. {
  484. superIterator->first();
  485. superIndex = 0;
  486. return doNextSuper();
  487. }
  488. else
  489. return firstMultiPart();
  490. }
  491. return false;
  492. }
  493. bool CHThorIndexReadActivityBase::nextPart()
  494. {
  495. killPart();
  496. if(keyIndexCache)
  497. {
  498. if(keyIndexCacheIdx >= keyIndexCache->numParts())
  499. return false;
  500. keyIndex.set(keyIndexCache->queryPart(keyIndexCacheIdx));
  501. if(superIterator)
  502. {
  503. superIndex = superIndexCache.item(keyIndexCacheIdx);
  504. layoutTrans.set(layoutTransArray.item(superIndex));
  505. keySize = keyIndex->keySize();
  506. }
  507. ++keyIndexCacheIdx;
  508. initPart();
  509. return true;
  510. }
  511. if (singlePart)
  512. return false;
  513. if (nextMultiPart())
  514. return true;
  515. if (superIterator && superIterator->next())
  516. {
  517. ++superIndex;
  518. return doNextSuper();
  519. }
  520. return false;
  521. }
  522. void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk)
  523. {
  524. if(layoutTrans && !isTlk)
  525. manager->setLayoutTranslator(layoutTrans);
  526. helper.createSegmentMonitors(manager);
  527. manager->finishSegmentMonitors();
  528. manager->reset();
  529. }
  530. void CHThorIndexReadActivityBase::initPart()
  531. {
  532. assertex(!keyIndex->isTopLevelKey());
  533. klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, NULL, helper.hasNewSegmentMonitors(), false));
  534. initManager(klManager, false);
  535. callback.setManager(klManager);
  536. }
  537. void CHThorIndexReadActivityBase::killPart()
  538. {
  539. callback.setManager(NULL);
  540. if (klManager)
  541. {
  542. seeks += klManager->querySeeks();
  543. scans += klManager->queryScans();
  544. wildseeks += klManager->queryWildSeeks();
  545. klManager.clear();
  546. }
  547. }
  548. bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
  549. {
  550. keyIndex.setown(openKeyFile(df->queryPart(whichPart)));
  551. if(df->numParts() == 1)
  552. verifyIndex(keyIndex);
  553. initPart();
  554. return true;
  555. }
  556. bool CHThorIndexReadActivityBase::firstMultiPart()
  557. {
  558. if(!tlk)
  559. openTlk();
  560. verifyIndex(tlk);
  561. tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false));
  562. initManager(tlManager, true);
  563. nextPartNumber = 0;
  564. return nextMultiPart();
  565. }
  566. bool CHThorIndexReadActivityBase::nextMultiPart()
  567. {
  568. //tlManager may be null for a single part index within a superfile.
  569. if (tlManager)
  570. {
  571. if (localSortKey)
  572. {
  573. // MORE - partition key support should go here?
  574. if (nextPartNumber<(df->numParts()-1))
  575. return setCurrentPart(nextPartNumber++);
  576. }
  577. else
  578. {
  579. while (tlManager->lookup(false))
  580. {
  581. offset_t node = extractFpos(tlManager);
  582. if (node)
  583. return setCurrentPart((unsigned)node-1);
  584. }
  585. }
  586. }
  587. return false;
  588. }
  589. bool CHThorIndexReadActivityBase::doNextSuper()
  590. {
  591. do
  592. {
  593. clearTlk();
  594. df.set(&superIterator->query());
  595. unsigned numParts = df->numParts();
  596. if (numParts==1)
  597. return setCurrentPart(0);
  598. if (firstMultiPart())
  599. return true;
  600. ++superIndex;
  601. } while (superIterator->next());
  602. return false;
  603. }
  604. void CHThorIndexReadActivityBase::getLayoutTranslators()
  605. {
  606. if(superIterator)
  607. {
  608. superIterator->first();
  609. do
  610. {
  611. IDistributedFile & f = superIterator->query();
  612. layoutTrans.setown(getLayoutTranslator(&f));
  613. layoutTransArray.append(layoutTrans.getClear());
  614. } while(superIterator->next());
  615. }
  616. else if (df)
  617. {
  618. layoutTrans.setown(getLayoutTranslator(df));
  619. }
  620. else
  621. layoutTrans.clear();
  622. }
  623. const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDistributedFile * f)
  624. {
  625. IOutputMetaData * expectedFormat = helper.queryDiskRecordSize();
  626. Linked<IOutputMetaData> actualFormat = expectedFormat;
  627. switch (getLayoutTranslationMode())
  628. {
  629. case RecordTranslationMode::AlwaysECL:
  630. verifyFormatCrc(helper.getDiskFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, false);
  631. break;
  632. case RecordTranslationMode::None:
  633. verifyFormatCrc(helper.getDiskFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, true);
  634. break;
  635. default:
  636. if(!verifyFormatCrc(helper.getDiskFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, false))
  637. {
  638. IPropertyTree &props = f->queryAttributes();
  639. actualFormat.setown(getDaliLayoutInfo(props));
  640. if (!actualFormat)
  641. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - key layout information not found", f->queryLogicalName());
  642. //MORE: We could introduce a more efficient way of checking this that does not create a translator
  643. Owned<const IDynamicTransform> actualTranslator = createRecordTranslator(expectedFormat->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
  644. DBGLOG("Record layout translator created for %s", f->queryLogicalName());
  645. actualTranslator->describe();
  646. if (actualTranslator->keyedTranslated())
  647. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
  648. VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
  649. agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
  650. actualLayouts.append(actualFormat.getLink()); // ensure adequate lifespan
  651. }
  652. break;
  653. }
  654. IOutputMetaData * projectedFormat = helper.queryProjectedDiskRecordSize();
  655. if (projectedFormat == actualFormat)
  656. return nullptr;
  657. Owned<const IDynamicTransform> payloadTranslator = createRecordTranslator(projectedFormat->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
  658. if (!payloadTranslator->canTranslate())
  659. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
  660. if (payloadTranslator->needsTranslate())
  661. return payloadTranslator.getClear();
  662. return nullptr;
  663. }
  664. void CHThorIndexReadActivityBase::verifyIndex(IKeyIndex * idx)
  665. {
  666. if(superIterator)
  667. layoutTrans.set(layoutTransArray.item(superIndex));
  668. if (eclKeySize.isFixedSize())
  669. {
  670. if(layoutTrans)
  671. {
  672. if (!layoutTrans->canTranslate())
  673. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", df->queryLogicalName());
  674. }
  675. else
  676. {
  677. keySize = idx->keySize();
  678. //The index rows always have the filepositions appended, but the ecl may not include a field
  679. unsigned fileposSize = idx->hasSpecialFileposition() && !hasTrailingFileposition(eclKeySize.queryTypeInfo()) ? sizeof(offset_t) : 0;
  680. if (keySize != eclKeySize.getFixedSize() + fileposSize)
  681. throw MakeStringException(0, "Key size mismatch reading index %s: index indicates size %u, ECL indicates size %u", df->queryLogicalName(), keySize, eclKeySize.getFixedSize() + fileposSize);
  682. }
  683. }
  684. }
  685. //-------------------------------------------------------------------------------------------------------------
  686. class CHThorIndexReadActivity : public CHThorIndexReadActivityBase
  687. {
  688. public:
  689. CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  690. ~CHThorIndexReadActivity();
  691. //interface IHThorInput
  692. virtual void ready();
  693. virtual const void *nextRow();
  694. virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
  695. virtual IInputSteppingMeta * querySteppingMeta();
  696. protected:
  697. virtual bool nextPart();
  698. virtual void initPart();
  699. protected:
  700. IHThorIndexReadArg &helper;
  701. IHThorSteppedSourceExtra * steppedExtra;
  702. unsigned __int64 keyedProcessed;
  703. unsigned __int64 keyedLimit;
  704. unsigned __int64 rowLimit;
  705. unsigned __int64 stopAfter;
  706. ISteppingMeta * rawMeta;
  707. ISteppingMeta * projectedMeta;
  708. size32_t seekGEOffset;
  709. unsigned * seekSizes;
  710. CSteppingMeta steppingMeta;
  711. bool needTransform;
  712. bool keyedLimitReached;
  713. bool keyedLimitSkips;
  714. bool keyedLimitCreates;
  715. bool keyedLimitRowCreated;
  716. };
  717. CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  718. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
  719. {
  720. limitTransformExtra = &helper;
  721. steppedExtra = helper.querySteppingExtra();
  722. needTransform = helper.needTransform();
  723. keyedLimit = (unsigned __int64)-1;
  724. rowLimit = (unsigned __int64)-1;
  725. stopAfter = (unsigned __int64)-1;
  726. keyedLimitReached = false;
  727. keyedLimitSkips = ((helper.getFlags() & TIRkeyedlimitskips) != 0);
  728. keyedLimitCreates = ((helper.getFlags() & TIRkeyedlimitcreates) != 0);
  729. keyedLimitRowCreated = false;
  730. keyedProcessed = 0;
  731. rawMeta = helper.queryRawSteppingMeta();
  732. projectedMeta = helper.queryProjectedSteppingMeta();
  733. seekGEOffset = 0;
  734. seekSizes = 0;
  735. if (rawMeta)
  736. {
  737. //should check that no translation, also should check all keys in maxFields list can actually be keyed.
  738. const CFieldOffsetSize * fields = rawMeta->queryFields();
  739. unsigned maxFields = rawMeta->getNumFields();
  740. seekGEOffset = fields[0].offset;
  741. seekSizes = new unsigned[maxFields];
  742. seekSizes[0] = fields[0].size;
  743. for (unsigned i=1; i < maxFields; i++)
  744. seekSizes[i] = seekSizes[i-1] + fields[i].size;
  745. if (projectedMeta)
  746. steppingMeta.init(projectedMeta, false);
  747. else
  748. steppingMeta.init(rawMeta, false);
  749. }
  750. }
  751. CHThorIndexReadActivity::~CHThorIndexReadActivity()
  752. {
  753. delete [] seekSizes;
  754. }
  755. void CHThorIndexReadActivity::ready()
  756. {
  757. CHThorIndexReadActivityBase::ready();
  758. keyedLimitReached = false;
  759. keyedLimitRowCreated = false;
  760. keyedLimit = helper.getKeyedLimit();
  761. rowLimit = helper.getRowLimit();
  762. if (helper.getFlags() & TIRlimitskips)
  763. rowLimit = (unsigned __int64) -1;
  764. stopAfter = helper.getChooseNLimit();
  765. keyedProcessed = 0;
  766. if (seekGEOffset || localSortKey || ((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && !singlePart))
  767. keyedLimitReached = doPreopenLimit(keyedLimit);
  768. if (steppedExtra)
  769. steppingMeta.setExtra(steppedExtra);
  770. firstPart();
  771. if(klManager && (keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && singlePart && !seekGEOffset)
  772. {
  773. unsigned __int64 result = klManager->checkCount(keyedLimit);
  774. keyedLimitReached = (result > keyedLimit);
  775. klManager->reset();
  776. }
  777. }
  778. bool CHThorIndexReadActivity::nextPart()
  779. {
  780. if(keyIndexCache && (seekGEOffset || localSortKey))
  781. {
  782. killPart();
  783. klManager.setown(createKeyMerger(eclKeySize.queryRecordAccessor(true), keyIndexCache, seekGEOffset, NULL, helper.hasNewSegmentMonitors(), false));
  784. keyIndexCache.clear();
  785. initManager(klManager, false);
  786. callback.setManager(klManager);
  787. return true;
  788. }
  789. else if (seekGEOffset || localSortKey)
  790. return false;
  791. else
  792. return CHThorIndexReadActivityBase::nextPart();
  793. }
  794. void CHThorIndexReadActivity::initPart()
  795. {
  796. CHThorIndexReadActivityBase::initPart();
  797. }
  798. const void *CHThorIndexReadActivity::nextRow()
  799. {
  800. if(keyedLimitReached)
  801. {
  802. if (keyedLimitSkips)
  803. return NULL;
  804. if (keyedLimitCreates)
  805. {
  806. if (!keyedLimitRowCreated)
  807. {
  808. keyedLimitRowCreated = true;
  809. const void * ret = createKeyedLimitOnFailRow();
  810. if (ret)
  811. processed++;
  812. return ret;
  813. }
  814. return NULL;
  815. }
  816. helper.onKeyedLimitExceeded(); // should throw exception
  817. }
  818. if((stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  819. return NULL;
  820. for (;;)
  821. {
  822. agent.reportProgress(NULL);
  823. if (klManager->lookup(true))
  824. {
  825. keyedProcessed++;
  826. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  827. helper.onKeyedLimitExceeded();
  828. byte const * keyRow = klManager->queryKeyBuffer();
  829. if (likely(helper.canMatch(keyRow)))
  830. {
  831. if (needTransform)
  832. {
  833. try
  834. {
  835. size32_t recSize;
  836. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  837. recSize = helper.transform(rowBuilder, keyRow);
  838. callback.finishedRow();
  839. if (recSize)
  840. {
  841. processed++;
  842. if ((processed-initialProcessed) > rowLimit)
  843. {
  844. helper.onLimitExceeded();
  845. if ( agent.queryCodeContext()->queryDebugContext())
  846. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  847. }
  848. return rowBuilder.finalizeRowClear(recSize);
  849. }
  850. else
  851. {
  852. postFiltered++;
  853. }
  854. }
  855. catch(IException * e)
  856. {
  857. throw makeWrappedException(e);
  858. }
  859. }
  860. else
  861. {
  862. callback.finishedRow(); // since filter might have accessed a blob
  863. processed++;
  864. if ((processed-initialProcessed) > rowLimit)
  865. {
  866. helper.onLimitExceeded();
  867. if ( agent.queryCodeContext()->queryDebugContext())
  868. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  869. }
  870. try
  871. {
  872. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  873. size32_t finalSize = cloneRow(rowBuilder, keyRow, outputMeta);
  874. return rowBuilder.finalizeRowClear(finalSize);
  875. }
  876. catch(IException * e)
  877. {
  878. throw makeWrappedException(e);
  879. }
  880. }
  881. }
  882. else
  883. {
  884. callback.finishedRow(); // since filter might have accessed a blob
  885. postFiltered++;
  886. }
  887. }
  888. else if (!nextPart())
  889. return NULL;
  890. }
  891. }
  892. const void *CHThorIndexReadActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
  893. {
  894. // MORE - should set wasCompleteMatch
  895. if(keyedLimitReached && !keyedLimitSkips)
  896. helper.onKeyedLimitExceeded(); // should throw exception
  897. if(keyedLimitReached || (stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  898. return NULL;
  899. const byte * rawSeek = (const byte *)seek + seekGEOffset;
  900. unsigned seekSize = seekSizes[numFields-1];
  901. if (projectedMeta)
  902. {
  903. byte *temp = (byte *) alloca(seekSize);
  904. RtlStaticRowBuilder tempBuilder(temp - seekGEOffset, seekSize + seekGEOffset);
  905. helper.mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
  906. rawSeek = (byte *)temp;
  907. }
  908. for (;;)
  909. {
  910. agent.reportProgress(NULL);
  911. if (klManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
  912. {
  913. const byte * row = klManager->queryKeyBuffer();
  914. #ifdef _DEBUG
  915. if (memcmp(row + seekGEOffset, rawSeek, seekSize) < 0)
  916. assertex("smart seek failure");
  917. #endif
  918. keyedProcessed++;
  919. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  920. helper.onKeyedLimitExceeded();
  921. if (likely(helper.canMatch(row)))
  922. {
  923. if (needTransform)
  924. {
  925. try
  926. {
  927. size32_t recSize;
  928. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  929. recSize = helper.transform(rowBuilder, row);
  930. callback.finishedRow();
  931. if (recSize)
  932. {
  933. processed++;
  934. if ((processed-initialProcessed) > rowLimit)
  935. {
  936. helper.onLimitExceeded();
  937. if ( agent.queryCodeContext()->queryDebugContext())
  938. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  939. }
  940. return rowBuilder.finalizeRowClear(recSize);
  941. }
  942. else
  943. {
  944. postFiltered++;
  945. }
  946. }
  947. catch(IException * e)
  948. {
  949. throw makeWrappedException(e);
  950. }
  951. }
  952. else
  953. {
  954. callback.finishedRow(); // since filter might have accessed a blob
  955. processed++;
  956. if ((processed-initialProcessed) > rowLimit)
  957. {
  958. helper.onLimitExceeded();
  959. if ( agent.queryCodeContext()->queryDebugContext())
  960. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  961. }
  962. try
  963. {
  964. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  965. size32_t finalSize = cloneRow(rowBuilder, row, outputMeta);
  966. return rowBuilder.finalizeRowClear(finalSize);
  967. }
  968. catch(IException * e)
  969. {
  970. throw makeWrappedException(e);
  971. }
  972. }
  973. }
  974. else
  975. {
  976. callback.finishedRow(); // since filter might have accessed a blob
  977. postFiltered++;
  978. }
  979. }
  980. else if (!nextPart())
  981. return NULL;
  982. }
  983. }
  984. IInputSteppingMeta * CHThorIndexReadActivity::querySteppingMeta()
  985. {
  986. if (rawMeta)
  987. return &steppingMeta;
  988. return NULL;
  989. }
  990. extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  991. {
  992. return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  993. }
  994. //-------------------------------------------------------------------------------------------------------------
  995. class CHThorIndexNormalizeActivity : public CHThorIndexReadActivityBase
  996. {
  997. public:
  998. CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  999. ~CHThorIndexNormalizeActivity();
  1000. virtual void ready();
  1001. virtual void stop();
  1002. virtual const void *nextRow();
  1003. virtual bool needsAllocator() const { return true; }
  1004. protected:
  1005. const void * createNextRow();
  1006. protected:
  1007. IHThorIndexNormalizeArg &helper;
  1008. unsigned __int64 rowLimit;
  1009. unsigned __int64 stopAfter;
  1010. RtlDynamicRowBuilder outBuilder;
  1011. unsigned __int64 keyedProcessed;
  1012. unsigned __int64 keyedLimit;
  1013. bool skipLimitReached;
  1014. bool expanding;
  1015. };
  1016. CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), outBuilder(NULL)
  1017. {
  1018. limitTransformExtra = &helper;
  1019. keyedLimit = (unsigned __int64)-1;
  1020. skipLimitReached = false;
  1021. keyedProcessed = 0;
  1022. rowLimit = (unsigned __int64)-1;
  1023. stopAfter = (unsigned __int64)-1;
  1024. expanding = false;
  1025. }
  1026. CHThorIndexNormalizeActivity::~CHThorIndexNormalizeActivity()
  1027. {
  1028. }
  1029. void CHThorIndexNormalizeActivity::ready()
  1030. {
  1031. CHThorIndexReadActivityBase::ready();
  1032. keyedLimit = helper.getKeyedLimit();
  1033. skipLimitReached = false;
  1034. keyedProcessed = 0;
  1035. rowLimit = helper.getRowLimit();
  1036. if (helper.getFlags() & TIRlimitskips)
  1037. rowLimit = (unsigned __int64) -1;
  1038. stopAfter = helper.getChooseNLimit();
  1039. expanding = false;
  1040. outBuilder.setAllocator(rowAllocator);
  1041. firstPart();
  1042. }
  1043. void CHThorIndexNormalizeActivity::stop()
  1044. {
  1045. outBuilder.clear();
  1046. CHThorIndexReadActivityBase::stop();
  1047. }
  1048. const void *CHThorIndexNormalizeActivity::nextRow()
  1049. {
  1050. if ((stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  1051. return NULL;
  1052. if (skipLimitReached || (stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  1053. return NULL;
  1054. if ((keyedLimit != (unsigned __int64) -1) && (helper.getFlags() & TIRcountkeyedlimit) != 0)
  1055. {
  1056. unsigned __int64 result = klManager->checkCount(keyedLimit);
  1057. if (result > keyedLimit)
  1058. {
  1059. if((helper.getFlags() & TIRkeyedlimitskips) != 0)
  1060. skipLimitReached = true;
  1061. else if((helper.getFlags() & TIRkeyedlimitcreates) != 0)
  1062. {
  1063. skipLimitReached = true;
  1064. const void * ret = createKeyedLimitOnFailRow();
  1065. if (ret)
  1066. processed++;
  1067. return ret;
  1068. }
  1069. else
  1070. helper.onKeyedLimitExceeded(); // should throw exception
  1071. return NULL;
  1072. }
  1073. klManager->reset();
  1074. keyedLimit = (unsigned __int64) -1; // to avoid checking it again
  1075. }
  1076. assertex(!((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRkeyedlimitskips) != 0)));
  1077. for (;;)
  1078. {
  1079. for (;;)
  1080. {
  1081. if (expanding)
  1082. {
  1083. for (;;)
  1084. {
  1085. expanding = helper.next();
  1086. if (!expanding)
  1087. {
  1088. callback.finishedRow(); // next could filter
  1089. break;
  1090. }
  1091. const void * ret = createNextRow();
  1092. if (ret)
  1093. return ret;
  1094. }
  1095. }
  1096. while (!klManager->lookup(true))
  1097. {
  1098. keyedProcessed++;
  1099. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  1100. helper.onKeyedLimitExceeded();
  1101. if (!nextPart())
  1102. return NULL;
  1103. }
  1104. agent.reportProgress(NULL);
  1105. expanding = helper.first(klManager->queryKeyBuffer());
  1106. if (expanding)
  1107. {
  1108. const void * ret = createNextRow();
  1109. if (ret)
  1110. return ret;
  1111. }
  1112. else
  1113. callback.finishedRow(); // first could filter
  1114. }
  1115. }
  1116. }
  1117. const void * CHThorIndexNormalizeActivity::createNextRow()
  1118. {
  1119. try
  1120. {
  1121. outBuilder.ensureRow();
  1122. size32_t thisSize = helper.transform(outBuilder);
  1123. callback.finishedRow();
  1124. if (thisSize == 0)
  1125. {
  1126. return NULL;
  1127. }
  1128. OwnedConstRoxieRow ret = outBuilder.finalizeRowClear(thisSize);
  1129. if ((processed - initialProcessed) >=rowLimit)
  1130. {
  1131. helper.onLimitExceeded();
  1132. if ( agent.queryCodeContext()->queryDebugContext())
  1133. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  1134. return NULL;
  1135. }
  1136. processed++;
  1137. return ret.getClear();
  1138. }
  1139. catch(IException * e)
  1140. {
  1141. throw makeWrappedException(e);
  1142. }
  1143. }
  1144. extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1145. {
  1146. return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  1147. }
  1148. //-------------------------------------------------------------------------------------------------------------
  1149. class CHThorIndexAggregateActivity : public CHThorIndexReadActivityBase
  1150. {
  1151. public:
  1152. CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  1153. ~CHThorIndexAggregateActivity();
  1154. //interface IHThorInput
  1155. virtual void stop();
  1156. virtual void ready();
  1157. virtual const void *nextRow();
  1158. virtual bool needsAllocator() const { return true; }
  1159. protected:
  1160. void * createNextRow();
  1161. void gather();
  1162. protected:
  1163. IHThorIndexAggregateArg &helper;
  1164. RtlDynamicRowBuilder outBuilder;
  1165. bool finished;
  1166. };
  1167. CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1168. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), outBuilder(NULL)
  1169. {
  1170. }
  1171. CHThorIndexAggregateActivity::~CHThorIndexAggregateActivity()
  1172. {
  1173. }
  1174. void CHThorIndexAggregateActivity::ready()
  1175. {
  1176. CHThorIndexReadActivityBase::ready();
  1177. outBuilder.setAllocator(rowAllocator);
  1178. finished = false;
  1179. firstPart();
  1180. }
  1181. void CHThorIndexAggregateActivity::stop()
  1182. {
  1183. outBuilder.clear();
  1184. CHThorIndexReadActivityBase::stop();
  1185. }
  1186. void CHThorIndexAggregateActivity::gather()
  1187. {
  1188. outBuilder.ensureRow();
  1189. try
  1190. {
  1191. helper.clearAggregate(outBuilder);
  1192. }
  1193. catch(IException * e)
  1194. {
  1195. throw makeWrappedException(e);
  1196. }
  1197. if(!klManager)
  1198. return;
  1199. for (;;)
  1200. {
  1201. while (!klManager->lookup(true))
  1202. {
  1203. if (!nextPart())
  1204. return;
  1205. }
  1206. agent.reportProgress(NULL);
  1207. try
  1208. {
  1209. helper.processRow(outBuilder, klManager->queryKeyBuffer());
  1210. }
  1211. catch(IException * e)
  1212. {
  1213. throw makeWrappedException(e);
  1214. }
  1215. callback.finishedRow();
  1216. }
  1217. }
  1218. const void *CHThorIndexAggregateActivity::nextRow()
  1219. {
  1220. if (finished) return NULL;
  1221. gather();
  1222. processed++;
  1223. finished = true;
  1224. size32_t size = outputMeta.getRecordSize(outBuilder.getSelf());
  1225. return outBuilder.finalizeRowClear(size);
  1226. }
  1227. extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1228. {
  1229. return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  1230. }
  1231. //-------------------------------------------------------------------------------------------------------------
  1232. class CHThorIndexCountActivity : public CHThorIndexReadActivityBase
  1233. {
  1234. bool keyedLimitReached = false;
  1235. bool keyedLimitSkips = false;
  1236. unsigned __int64 keyedLimit = (unsigned __int64)-1;
  1237. unsigned __int64 rowLimit = (unsigned __int64)-1;
  1238. public:
  1239. CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  1240. //interface IHThorInput
  1241. virtual void ready();
  1242. virtual const void *nextRow();
  1243. protected:
  1244. void * createNextRow();
  1245. protected:
  1246. IHThorIndexCountArg &helper;
  1247. unsigned __int64 choosenLimit;
  1248. bool finished;
  1249. };
  1250. CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1251. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
  1252. {
  1253. choosenLimit = (unsigned __int64)-1;
  1254. finished = false;
  1255. keyedLimitSkips = ((helper.getFlags() & TIRkeyedlimitskips) != 0);
  1256. }
  1257. void CHThorIndexCountActivity::ready()
  1258. {
  1259. CHThorIndexReadActivityBase::ready();
  1260. keyedLimitReached = false;
  1261. keyedLimit = helper.getKeyedLimit();
  1262. rowLimit = helper.getRowLimit();
  1263. finished = false;
  1264. choosenLimit = helper.getChooseNLimit();
  1265. firstPart();
  1266. if ((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0))
  1267. {
  1268. if (singlePart)
  1269. {
  1270. if (klManager) // NB: opened by firstPart()
  1271. {
  1272. unsigned __int64 result = klManager->checkCount(keyedLimit);
  1273. keyedLimitReached = (result > keyedLimit);
  1274. klManager->reset();
  1275. }
  1276. }
  1277. else
  1278. keyedLimitReached = doPreopenLimit(keyedLimit);
  1279. }
  1280. }
  1281. const void *CHThorIndexCountActivity::nextRow()
  1282. {
  1283. if (finished) return NULL;
  1284. unsigned __int64 totalCount = 0;
  1285. if (keyedLimitReached)
  1286. {
  1287. if (!keyedLimitSkips)
  1288. helper.onKeyedLimitExceeded(); // should throw exception
  1289. }
  1290. else if (klManager)
  1291. {
  1292. unsigned __int64 keyedProcessed = 0;
  1293. unsigned __int64 rowsProcessed = 0;
  1294. bool limitSkipped = false;
  1295. for (;;)
  1296. {
  1297. if (helper.hasFilter())
  1298. {
  1299. for (;;)
  1300. {
  1301. agent.reportProgress(NULL);
  1302. if (!klManager->lookup(true))
  1303. break;
  1304. ++keyedProcessed;
  1305. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  1306. helper.onKeyedLimitExceeded();
  1307. totalCount += helper.numValid(klManager->queryKeyBuffer());
  1308. callback.finishedRow();
  1309. rowsProcessed++;
  1310. if (rowsProcessed > rowLimit)
  1311. {
  1312. if (0 != (helper.getFlags() & TIRlimitskips))
  1313. {
  1314. totalCount = 0;
  1315. limitSkipped = true;
  1316. break;
  1317. }
  1318. else
  1319. helper.onLimitExceeded();
  1320. }
  1321. if ((totalCount > choosenLimit))
  1322. break;
  1323. }
  1324. }
  1325. else
  1326. totalCount += klManager->getCount();
  1327. if (limitSkipped || (totalCount > choosenLimit) || !nextPart())
  1328. break;
  1329. }
  1330. }
  1331. finished = true;
  1332. processed++;
  1333. if (totalCount > choosenLimit)
  1334. totalCount = choosenLimit;
  1335. size32_t outSize = outputMeta.getFixedSize();
  1336. void * ret = rowAllocator->createRow(); //meta: outputMeta
  1337. if (outSize == 1)
  1338. {
  1339. assertex(choosenLimit == 1);
  1340. *(byte *)ret = (byte)totalCount;
  1341. }
  1342. else
  1343. {
  1344. assertex(outSize == sizeof(unsigned __int64));
  1345. *(unsigned __int64 *)ret = totalCount;
  1346. }
  1347. return ret = rowAllocator->finalizeRow(outSize, ret, outSize);
  1348. }
  1349. extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1350. {
  1351. return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  1352. }
  1353. //-------------------------------------------------------------------------------------------------------------
  1354. class CHThorIndexGroupAggregateActivity : public CHThorIndexReadActivityBase, implements IHThorGroupAggregateCallback
  1355. {
  1356. public:
  1357. CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
  1358. IMPLEMENT_IINTERFACE
  1359. //interface IHThorInput
  1360. virtual void ready();
  1361. virtual const void *nextRow();
  1362. virtual bool needsAllocator() const { return true; }
  1363. virtual void processRow(const void * next);
  1364. protected:
  1365. void * createNextRow();
  1366. void gather();
  1367. protected:
  1368. IHThorIndexGroupAggregateArg &helper;
  1369. RowAggregator aggregated;
  1370. bool eof;
  1371. bool gathered;
  1372. };
  1373. CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), aggregated(_arg, _arg)
  1374. {
  1375. eof = false;
  1376. gathered = false;
  1377. }
  1378. void CHThorIndexGroupAggregateActivity::ready()
  1379. {
  1380. CHThorIndexReadActivityBase::ready();
  1381. eof = false;
  1382. gathered = false;
  1383. aggregated.reset();
  1384. aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
  1385. firstPart();
  1386. }
  1387. void CHThorIndexGroupAggregateActivity::processRow(const void * next)
  1388. {
  1389. aggregated.addRow(next);
  1390. }
  1391. void CHThorIndexGroupAggregateActivity::gather()
  1392. {
  1393. gathered = true;
  1394. if(!klManager)
  1395. return;
  1396. for (;;)
  1397. {
  1398. while (!klManager->lookup(true))
  1399. {
  1400. if (!nextPart())
  1401. return;
  1402. }
  1403. agent.reportProgress(NULL);
  1404. try
  1405. {
  1406. helper.processRow(klManager->queryKeyBuffer(), this);
  1407. }
  1408. catch(IException * e)
  1409. {
  1410. throw makeWrappedException(e);
  1411. }
  1412. callback.finishedRow();
  1413. }
  1414. }
  1415. const void *CHThorIndexGroupAggregateActivity::nextRow()
  1416. {
  1417. if (eof)
  1418. return NULL;
  1419. if (!gathered)
  1420. gather();
  1421. Owned<AggregateRowBuilder> next = aggregated.nextResult();
  1422. if (next)
  1423. {
  1424. processed++;
  1425. return next->finalizeRowClear();
  1426. }
  1427. eof = true;
  1428. return NULL;
  1429. }
  1430. extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  1431. {
  1432. return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  1433. }
  1434. //-------------------------------------------------------------------------------------------------------------
  1435. interface IThreadedExceptionHandler
  1436. {
  1437. virtual void noteException(IException *E) = 0;
  1438. };
  1439. template <class ROW, class OWNER>
  1440. class PartHandlerThread : public CInterface, implements IPooledThread
  1441. {
  1442. public:
  1443. typedef PartHandlerThread<ROW, OWNER> SELF;
  1444. IMPLEMENT_IINTERFACE;
  1445. PartHandlerThread() : owner(0)
  1446. {
  1447. }
  1448. virtual void init(void * _owner) override { owner = (OWNER *)_owner; }
  1449. virtual void threadmain() override
  1450. {
  1451. try
  1452. {
  1453. owner->openPart();
  1454. for (;;)
  1455. {
  1456. ROW * row = owner->getRow();
  1457. if (!row)
  1458. break;
  1459. owner->doRequest(row);
  1460. }
  1461. }
  1462. catch (IException *E)
  1463. {
  1464. owner->noteException(E);
  1465. }
  1466. }
  1467. virtual bool stop() override
  1468. {
  1469. owner->stopThread();
  1470. return true;
  1471. }
  1472. virtual bool canReuse() const override { return true; }
  1473. private:
  1474. OWNER * owner;
  1475. };
  1476. template <class ROW>
  1477. class ThreadedPartHandler : public CInterface
  1478. {
  1479. protected:
  1480. Linked<IThreadPool> threadPool;
  1481. PooledThreadHandle threadHandle = 0;
  1482. QueueOf<ROW, true> pending;
  1483. CriticalSection crit;
  1484. Semaphore limit;
  1485. bool started = false;
  1486. Owned<IDistributedFilePart> part;
  1487. IThreadedExceptionHandler *handler;
  1488. public:
  1489. typedef ThreadedPartHandler<ROW> SELF;
  1490. ThreadedPartHandler(IDistributedFilePart *_part, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool)
  1491. : threadPool(_threadPool), limit(MAX_FETCH_LOOKAHEAD), part(_part), handler(_handler)
  1492. {
  1493. }
  1494. ~ThreadedPartHandler()
  1495. {
  1496. //is it the responsibility of the derived class to clean up the list on destruction --- can do nothing but assert here, since implementations different and VMTs gone by now
  1497. assertex(pending.ordinality() == 0);
  1498. }
  1499. void addRow(ROW * row)
  1500. {
  1501. limit.wait();
  1502. CriticalBlock procedure(crit);
  1503. pending.enqueue(row);
  1504. if (!started)
  1505. {
  1506. started = true;
  1507. start();
  1508. }
  1509. }
  1510. void stopThread()
  1511. {
  1512. }
  1513. void start()
  1514. {
  1515. threadHandle = threadPool->start(this);
  1516. }
  1517. void join()
  1518. {
  1519. threadPool->join(threadHandle);
  1520. started = false;
  1521. }
  1522. ROW * getRow()
  1523. {
  1524. CriticalBlock procedure(crit);
  1525. if(pending.ordinality())
  1526. {
  1527. limit.signal();
  1528. return pending.dequeue();
  1529. }
  1530. else
  1531. {
  1532. started = false; //because returning NULL will cause thread to terminate (has to be within this CriticalBlock to avoid race cond.)
  1533. return NULL;
  1534. }
  1535. }
  1536. void noteException(IException * e)
  1537. {
  1538. handler->noteException(e);
  1539. }
  1540. private:
  1541. friend class PartHandlerThread<ROW, SELF>;
  1542. virtual void doRequest(ROW * row) = 0; // Must be implemented by derived class
  1543. virtual void openPart() = 0; // Must be implemented by derived class
  1544. };
  1545. template <class ROW>
  1546. class PartHandlerThreadFactory : public CInterface, implements IThreadFactory
  1547. {
  1548. IMPLEMENT_IINTERFACE;
  1549. typedef ThreadedPartHandler<ROW> OWNER;
  1550. IPooledThread * createNew() { return new PartHandlerThread<ROW, OWNER>(); }
  1551. };
  1552. class FetchRequest : public CInterface
  1553. {
  1554. public:
  1555. const void * left;
  1556. offset_t pos;
  1557. offset_t seq;
  1558. FetchRequest(const void * _left, offset_t _pos, offset_t _seq) : left(_left), pos(_pos), seq(_seq) {}
  1559. ~FetchRequest() { ReleaseRoxieRow(left); }
  1560. };
  1561. class IFlatFetchHandlerCallback
  1562. {
  1563. public:
  1564. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
  1565. };
  1566. class IXmlFetchHandlerCallback
  1567. {
  1568. public:
  1569. virtual void processFetched(FetchRequest const * fetch, IColumnProvider * lastMatch) = 0;
  1570. virtual IException * makeWrappedException(IException * e, char const * extra) const = 0;
  1571. };
  1572. // this class base for all three fetch activities and keyed join
  1573. class FetchPartHandlerBase
  1574. {
  1575. protected:
  1576. Owned<IFileIO> rawFile;
  1577. Owned<ISerialStream> rawStream;
  1578. offset_t base;
  1579. offset_t top;
  1580. bool blockcompressed;
  1581. MemoryAttr encryptionkey;
  1582. unsigned activityId;
  1583. CachedOutputMetaData const & outputMeta;
  1584. IEngineRowAllocator * rowAllocator;
  1585. ISourceRowPrefetcher * prefetcher;
  1586. public:
  1587. FetchPartHandlerBase(offset_t _base, offset_t _size, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
  1588. : blockcompressed(_blockcompressed),
  1589. encryptionkey(_encryptionkey),
  1590. activityId(_activityId),
  1591. outputMeta(_outputMeta),
  1592. rowAllocator(_rowAllocator),
  1593. prefetcher(_prefetcher)
  1594. {
  1595. base = _base;
  1596. top = _base + _size;
  1597. }
  1598. int compare(offset_t offset)
  1599. {
  1600. if (offset < base)
  1601. return -1;
  1602. else if (offset >= top)
  1603. return 1;
  1604. else
  1605. return 0;
  1606. }
  1607. offset_t translateFPos(offset_t rp)
  1608. {
  1609. if(isLocalFpos(rp))
  1610. return getLocalFposOffset(rp);
  1611. else
  1612. return rp-base;
  1613. }
  1614. virtual void openPart()
  1615. {
  1616. // MORE - cached file handles?
  1617. if(rawFile)
  1618. return;
  1619. IDistributedFilePart * part = queryPart();
  1620. unsigned numCopies = part->numCopies();
  1621. for (unsigned copy=0; copy < numCopies; copy++)
  1622. {
  1623. RemoteFilename rfn;
  1624. try
  1625. {
  1626. OwnedIFile ifile = createIFile(part->getFilename(rfn,copy));
  1627. offset_t thissize = ifile->size();
  1628. if (thissize != (offset_t)-1)
  1629. {
  1630. IPropertyTree & props = part->queryAttributes();
  1631. unsigned __int64 expectedSize;
  1632. Owned<IExpander> eexp;
  1633. if (encryptionkey.length()!=0) {
  1634. eexp.setown(createAESExpander256((size32_t)encryptionkey.length(),encryptionkey.get()));
  1635. blockcompressed = true;
  1636. }
  1637. if(blockcompressed)
  1638. expectedSize = props.getPropInt64("@compressedSize", -1);
  1639. else
  1640. expectedSize = props.getPropInt64("@size", -1);
  1641. if(thissize != expectedSize && expectedSize != (unsigned __int64)-1)
  1642. throw MakeStringException(0, "File size mismatch: file %s was supposed to be %" I64F "d bytes but appears to be %" I64F "d bytes", ifile->queryFilename(), expectedSize, thissize);
  1643. if(blockcompressed)
  1644. rawFile.setown(createCompressedFileReader(ifile,eexp));
  1645. else
  1646. rawFile.setown(ifile->open(IFOread));
  1647. break;
  1648. }
  1649. }
  1650. catch (IException *E)
  1651. {
  1652. EXCLOG(E, "Opening key part");
  1653. E->Release();
  1654. }
  1655. }
  1656. if(!rawFile)
  1657. {
  1658. RemoteFilename rfn;
  1659. StringBuffer rmtPath;
  1660. part->getFilename(rfn).getRemotePath(rmtPath);
  1661. throw MakeStringException(1001, "Could not open file part at %s%s", rmtPath.str(), (numCopies > 1) ? " or any alternate location." : ".");
  1662. }
  1663. rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
  1664. }
  1665. virtual IDistributedFilePart * queryPart() = 0;
  1666. };
  1667. // this class base for all three fetch activities, but not keyed join
  1668. class SimpleFetchPartHandlerBase : public FetchPartHandlerBase, public ThreadedPartHandler<FetchRequest>
  1669. {
  1670. public:
  1671. SimpleFetchPartHandlerBase(IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
  1672. : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
  1673. ThreadedPartHandler<FetchRequest>(_part, _handler, _threadPool)
  1674. {
  1675. }
  1676. ~SimpleFetchPartHandlerBase()
  1677. {
  1678. while(FetchRequest * fetch = pending.dequeue())
  1679. fetch->Release();
  1680. }
  1681. IMPLEMENT_IINTERFACE;
  1682. virtual IDistributedFilePart * queryPart() { return part; }
  1683. private:
  1684. virtual void openPart() { FetchPartHandlerBase::openPart(); }
  1685. };
  1686. // this class used for flat and CSV fetch activities, but not XML fetch or keyed join
  1687. class FlatFetchPartHandler : public SimpleFetchPartHandlerBase
  1688. {
  1689. public:
  1690. FlatFetchPartHandler(IFlatFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
  1691. : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
  1692. owner(_owner)
  1693. {
  1694. }
  1695. virtual void doRequest(FetchRequest * _fetch)
  1696. {
  1697. Owned<FetchRequest> fetch(_fetch);
  1698. offset_t pos = translateFPos(fetch->pos);
  1699. if(pos >= rawFile->size())
  1700. throw MakeStringException(0, "Attempted to fetch at invalid filepos");
  1701. owner.processFetch(fetch, pos, rawStream);
  1702. }
  1703. private:
  1704. IFlatFetchHandlerCallback & owner;
  1705. };
  1706. class DistributedFileFetchHandlerBase : public CInterface, implements IInterface, implements IThreadedExceptionHandler
  1707. {
  1708. public:
  1709. IMPLEMENT_IINTERFACE;
  1710. DistributedFileFetchHandlerBase() {}
  1711. virtual ~DistributedFileFetchHandlerBase() {}
  1712. virtual void noteException(IException *E)
  1713. {
  1714. CriticalBlock procedure(exceptionCrit);
  1715. if (exception)
  1716. E->Release();
  1717. else
  1718. exception = E;
  1719. }
  1720. protected:
  1721. static offset_t getPartSize(IDistributedFilePart *part)
  1722. {
  1723. offset_t partsize = part->queryAttributes().getPropInt64("@size", -1);
  1724. if (partsize == (offset_t)-1)
  1725. {
  1726. MTIME_SECTION(queryActiveTimer(), "Fetch remote file size");
  1727. unsigned numCopies = part->numCopies();
  1728. for (unsigned copy=0; copy < numCopies; copy++)
  1729. {
  1730. RemoteFilename rfn;
  1731. try
  1732. {
  1733. OwnedIFile ifile = createIFile(part->getFilename(rfn,copy));
  1734. partsize = ifile->size();
  1735. if (partsize != (offset_t)-1)
  1736. {
  1737. // TODO: Create DistributedFilePropertyLock for parts
  1738. part->lockProperties();
  1739. part->queryAttributes().setPropInt64("@size", partsize);
  1740. part->unlockProperties();
  1741. break;
  1742. }
  1743. }
  1744. catch(IException *E)
  1745. {
  1746. EXCLOG(E, "Open remote file");
  1747. E->Release();
  1748. }
  1749. }
  1750. }
  1751. if (partsize == (offset_t)-1)
  1752. throw MakeStringException(0, "Unable to determine size of filepart");
  1753. return partsize;
  1754. }
  1755. protected:
  1756. CriticalSection exceptionCrit;
  1757. IException * exception;
  1758. };
  1759. template <class PARTHANDLER>
  1760. class IFetchHandlerFactory
  1761. {
  1762. public:
  1763. virtual PARTHANDLER * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator) = 0;
  1764. };
  1765. template <class PARTHANDLER, class LEFTPTR, class REQUEST>
  1766. class DistributedFileFetchHandler : public DistributedFileFetchHandlerBase
  1767. {
  1768. public:
  1769. typedef DistributedFileFetchHandler<PARTHANDLER, LEFTPTR, REQUEST> SELF;
  1770. DistributedFileFetchHandler(IDistributedFile * f, IFetchHandlerFactory<PARTHANDLER> & factory, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator) : file(f)
  1771. {
  1772. numParts = f->numParts();
  1773. parts = new PARTHANDLER *[numParts];
  1774. Owned<IFileDescriptor> fdesc = f->getFileDescriptor();
  1775. bool blockcompressed = fdesc->isCompressed(); //assume new compression, old compression was never handled on fetch
  1776. offset_t base = 0;
  1777. unsigned idx;
  1778. for (idx = 0; idx < numParts; idx++)
  1779. {
  1780. IDistributedFilePart *part = f->getPart(idx);
  1781. offset_t size = getPartSize(part);
  1782. parts[idx] = factory.createFetchPartHandler(part, base, size, this, blockcompressed, encryptionkey, prefetcher, rowAllocator);
  1783. base += size;
  1784. }
  1785. exception = NULL;
  1786. }
  1787. ~DistributedFileFetchHandler()
  1788. {
  1789. unsigned idx;
  1790. for (idx = 0; idx < numParts; idx++)
  1791. {
  1792. delete parts[idx];
  1793. }
  1794. delete [] parts;
  1795. }
  1796. int compare(offset_t l, PARTHANDLER * r)
  1797. {
  1798. return r->compare(l);
  1799. }
  1800. void addRow(LEFTPTR left, offset_t rp, offset_t seq)
  1801. {
  1802. PARTHANDLER * part = binsearch(rp, parts, numParts, this);
  1803. if(!part)
  1804. throw MakeStringException(1002, "FETCH: file position %" I64F "d out of range", rp);
  1805. part->addRow(new REQUEST(left, rp, seq));
  1806. }
  1807. void stopThread()
  1808. {
  1809. unsigned idx;
  1810. for (idx = 0; idx < numParts; idx++)
  1811. {
  1812. parts[idx]->stopThread();
  1813. parts[idx]->join();
  1814. }
  1815. if (exception)
  1816. throw (exception);
  1817. }
  1818. private:
  1819. Linked<IDistributedFile> file;
  1820. unsigned numParts;
  1821. PARTHANDLER * * parts;
  1822. };
  1823. //-------------------------------------------------------------------------------------------------------------
  1824. class CHThorThreadedActivityBase : public CHThorActivityBase, implements IThreadedExceptionHandler
  1825. {
  1826. class InputHandler : extends Thread
  1827. {
  1828. CHThorThreadedActivityBase *parent;
  1829. public:
  1830. InputHandler(CHThorThreadedActivityBase *_parent) : parent(_parent)
  1831. {
  1832. }
  1833. virtual int run()
  1834. {
  1835. try
  1836. {
  1837. parent->fetchAll();
  1838. }
  1839. catch (IException *E)
  1840. {
  1841. parent->noteException(E);
  1842. }
  1843. catch (...)
  1844. {
  1845. parent->noteException(MakeStringException(0, "Unknown exception caught in Fetch::InputHandler"));
  1846. }
  1847. return 0;
  1848. }
  1849. };
  1850. public:
  1851. CHThorThreadedActivityBase (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node)
  1852. : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), fetch(_fetch)
  1853. {
  1854. exception = NULL;
  1855. rowLimit = 0;
  1856. if (_node)
  1857. isCodeSigned = isActivityCodeSigned(*_node);
  1858. }
  1859. virtual ~CHThorThreadedActivityBase ()
  1860. {
  1861. }
  1862. virtual void waitForThreads()
  1863. {
  1864. aborting = true;
  1865. if (inputThread)
  1866. inputThread->join();
  1867. inputThread.clear();
  1868. threadPool.clear();
  1869. }
  1870. virtual void fetchAll() = 0;
  1871. virtual void ready()
  1872. {
  1873. CHThorActivityBase::ready();
  1874. started = false;
  1875. stopped = false;
  1876. aborting = false;
  1877. initializeThreadPool();
  1878. }
  1879. virtual void initializeThreadPool() = 0;
  1880. virtual void stop()
  1881. {
  1882. aborting = true;
  1883. stopThread();
  1884. if (inputThread)
  1885. inputThread->join();
  1886. while (!stopped)
  1887. {
  1888. const void * row = getRow();
  1889. ReleaseRoxieRow(row);
  1890. }
  1891. clearQueue();
  1892. waitForThreads();
  1893. avail.reinit(0);
  1894. CHThorActivityBase::stop();
  1895. }
  1896. virtual const void * getRow() = 0;
  1897. virtual void clearQueue() = 0;
  1898. IHThorInput *queryOutput(unsigned index) { return this; }
  1899. //interface IHThorInput
  1900. virtual bool isGrouped() { return false; }
  1901. virtual const char *getFileName() { return NULL; }
  1902. virtual bool outputToFile(const char *) { return false; }
  1903. virtual IOutputMetaData * queryOutputMeta() const { return CHThorActivityBase::outputMeta; }
  1904. protected:
  1905. Semaphore avail;
  1906. bool stopped;
  1907. bool started;
  1908. bool aborting;
  1909. IHThorFetchContext &fetch;
  1910. Owned<InputHandler> inputThread;
  1911. unsigned numParts;
  1912. unsigned __int64 rowLimit;
  1913. bool isCodeSigned = false;
  1914. Owned<IThreadPool> threadPool;
  1915. CriticalSection pendingCrit;
  1916. IException *exception;
  1917. public:
  1918. virtual void noteException(IException *E)
  1919. {
  1920. CriticalBlock procedure(pendingCrit);
  1921. if (exception)
  1922. E->Release();
  1923. else
  1924. exception = E;
  1925. avail.signal();
  1926. }
  1927. void stopThread()
  1928. {
  1929. avail.signal();
  1930. }
  1931. virtual const void *nextRow()
  1932. {
  1933. if (!started)
  1934. {
  1935. started = true;
  1936. start();
  1937. }
  1938. try
  1939. {
  1940. const void *ret = getRow();
  1941. if (ret)
  1942. {
  1943. processed++;
  1944. if ((processed-initialProcessed) > rowLimit)
  1945. {
  1946. onLimitExceeded();
  1947. if ( agent.queryCodeContext()->queryDebugContext())
  1948. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  1949. }
  1950. }
  1951. return ret;
  1952. }
  1953. catch(...)
  1954. {
  1955. stopParts();
  1956. throw;
  1957. }
  1958. }
  1959. virtual void initParts(IDistributedFile * f) = 0;
  1960. virtual void stopParts() = 0;
  1961. virtual void onLimitExceeded() = 0;
  1962. virtual void start()
  1963. {
  1964. OwnedRoxieString lfn(fetch.getFileName());
  1965. if (lfn.get())
  1966. {
  1967. Owned<ILocalOrDistributedFile> ldFile = resolveLFNFlat(agent, lfn, "Fetch", 0 != (fetch.getFetchFlags() & FFdatafileoptional), isCodeSigned);
  1968. IDistributedFile * dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1969. if(dFile)
  1970. {
  1971. verifyFetchFormatCrc(dFile);
  1972. agent.logFileAccess(dFile, "HThor", "READ", graph);
  1973. initParts(dFile);
  1974. }
  1975. else
  1976. {
  1977. StringBuffer buff;
  1978. buff.append("Skipping OPT fetch of nonexistent file ").append(lfn);
  1979. agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptFile, SeverityInformation, MSGAUD_user, "hthor");
  1980. }
  1981. }
  1982. inputThread.setown(new InputHandler(this));
  1983. inputThread->start();
  1984. }
  1985. protected:
  1986. virtual void verifyFetchFormatCrc(IDistributedFile * f) {} // do nothing here as (currently, and probably by design) not available for CSV and XML, so only implement for binary
  1987. };
  1988. class CHThorFetchActivityBase : public CHThorThreadedActivityBase, public IFetchHandlerFactory<SimpleFetchPartHandlerBase>
  1989. {
  1990. public:
  1991. CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node)
  1992. : CHThorThreadedActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, _graph, diskSize, _node)
  1993. {
  1994. pendingSeq = 0;
  1995. signalSeq = 0;
  1996. dequeuedSeq = 0;
  1997. if (_node)
  1998. {
  1999. const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layouttranslation']/@value");
  2000. if (recordTranslationModeHintText)
  2001. recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText, true);
  2002. }
  2003. }
  2004. ~CHThorFetchActivityBase()
  2005. {
  2006. clearQueue();
  2007. }
  2008. virtual void initializeThreadPool()
  2009. {
  2010. threadPool.setown(createThreadPool("hthor fetch activity thread pool", &threadFactory));
  2011. }
  2012. virtual void initParts(IDistributedFile * f)
  2013. {
  2014. size32_t kl;
  2015. void *k;
  2016. fetch.getFileEncryptKey(kl,k);
  2017. MemoryAttr encryptionkey;
  2018. encryptionkey.setOwn(kl,k);
  2019. parts.setown(new DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest>(f, *this, encryptionkey, prefetcher, rowAllocator));
  2020. }
  2021. virtual void stopParts()
  2022. {
  2023. if(parts)
  2024. parts->stopThread();
  2025. }
  2026. virtual void fetchAll()
  2027. {
  2028. if(parts)
  2029. {
  2030. for (;;)
  2031. {
  2032. if (aborting)
  2033. break;
  2034. const void *row = input->nextRow();
  2035. if (!row)
  2036. {
  2037. row = input->nextRow();
  2038. if (!row)
  2039. break;
  2040. }
  2041. offset_t rp = fetch.extractPosition(row);
  2042. offset_t seq = addRowPlaceholder();
  2043. parts->addRow(row, rp, seq);
  2044. }
  2045. parts->stopThread();
  2046. }
  2047. stopThread();
  2048. }
  2049. // to preserve order, we enqueue NULLs onto the queue and issue sequence numbers, and we only signal avail when rows in correct sequence are available
  2050. // pendingSeq gives the next sequence number to issue; signalSeq gives the next sequence number to signal for; and dequeuedSeq gives the number actually dequeued
  2051. offset_t addRowPlaceholder()
  2052. {
  2053. CriticalBlock procedure(pendingCrit);
  2054. pending.enqueue(NULL);
  2055. return pendingSeq++;
  2056. }
  2057. void setRow(const void *row, offset_t seq)
  2058. {
  2059. CriticalBlock procedure(pendingCrit);
  2060. //GH->? Why does this append allocated nulls instead of having a queue of const void??
  2061. pending.set((unsigned)(seq-dequeuedSeq), new const void*(row));
  2062. if(seq!=signalSeq)
  2063. return;
  2064. do
  2065. {
  2066. avail.signal();
  2067. ++signalSeq;
  2068. } while((signalSeq < pendingSeq) && (pending.query((unsigned)(signalSeq-dequeuedSeq)) != NULL));
  2069. }
  2070. const void * getRow()
  2071. {
  2072. while(!stopped)
  2073. {
  2074. avail.wait();
  2075. CriticalBlock procedure(pendingCrit);
  2076. if (exception)
  2077. {
  2078. IException *E = exception;
  2079. exception = NULL;
  2080. throw E;
  2081. }
  2082. if(pending.ordinality() == 0)
  2083. {
  2084. stopped = true;
  2085. break;
  2086. }
  2087. const void * * ptr = pending.dequeue();
  2088. ++dequeuedSeq;
  2089. const void * ret = *ptr;
  2090. delete ptr;
  2091. if(ret)
  2092. return ret;
  2093. }
  2094. return NULL;
  2095. }
  2096. virtual void clearQueue()
  2097. {
  2098. while(pending.ordinality())
  2099. {
  2100. const void * * ptr = pending.dequeue();
  2101. if(ptr)
  2102. {
  2103. ReleaseRoxieRow(*ptr);
  2104. delete ptr;
  2105. }
  2106. }
  2107. pendingSeq = 0;
  2108. signalSeq = 0;
  2109. dequeuedSeq = 0;
  2110. }
  2111. protected:
  2112. Owned<ISourceRowPrefetcher> prefetcher;
  2113. Owned<IOutputMetaData> actualDiskMeta;
  2114. Owned<const IDynamicTransform> translator;
  2115. private:
  2116. PartHandlerThreadFactory<FetchRequest> threadFactory;
  2117. Owned<DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest> > parts;
  2118. offset_t pendingSeq, signalSeq, dequeuedSeq;
  2119. QueueOf<const void *, true> pending;
  2120. RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
  2121. protected:
  2122. RecordTranslationMode getLayoutTranslationMode()
  2123. {
  2124. if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
  2125. return recordTranslationModeHint;
  2126. return agent.getLayoutTranslationMode();
  2127. }
  2128. };
  2129. class CHThorFlatFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
  2130. {
  2131. public:
  2132. CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node, MemoryAttr &encryptionkey)
  2133. : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, _graph, diskSize, _node), helper(_arg)
  2134. {}
  2135. ~CHThorFlatFetchActivity()
  2136. {
  2137. waitForThreads();
  2138. }
  2139. virtual void ready()
  2140. {
  2141. CHThorFetchActivityBase::ready();
  2142. rowLimit = helper.getRowLimit();
  2143. }
  2144. virtual void initParts(IDistributedFile * f) override
  2145. {
  2146. CHThorFetchActivityBase::initParts(f);
  2147. prefetcher.setown(actualDiskMeta->createDiskPrefetcher());
  2148. }
  2149. virtual bool needsAllocator() const { return true; }
  2150. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  2151. {
  2152. CThorContiguousRowBuffer prefetchSource;
  2153. prefetchSource.setStream(rawStream);
  2154. prefetchSource.reset(pos);
  2155. prefetcher->readAhead(prefetchSource);
  2156. const byte *rawBuffer = prefetchSource.queryRow();
  2157. MemoryBuffer buf;
  2158. if (translator)
  2159. {
  2160. MemoryBufferBuilder aBuilder(buf, 0);
  2161. FetchVirtualFieldCallback fieldCallback(fetch->pos);
  2162. translator->translate(aBuilder, fieldCallback, rawBuffer);
  2163. rawBuffer = aBuilder.getSelf();
  2164. }
  2165. CriticalBlock procedure(transformCrit);
  2166. size32_t thisSize;
  2167. try
  2168. {
  2169. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2170. thisSize = helper.transform(rowBuilder, rawBuffer, fetch->left, fetch->pos);
  2171. if(thisSize)
  2172. {
  2173. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2174. }
  2175. else
  2176. {
  2177. setRow(NULL, fetch->seq);
  2178. }
  2179. }
  2180. catch(IException * e)
  2181. {
  2182. throw makeWrappedException(e);
  2183. }
  2184. }
  2185. virtual void onLimitExceeded()
  2186. {
  2187. helper.onLimitExceeded();
  2188. }
  2189. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
  2190. {
  2191. return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
  2192. }
  2193. protected:
  2194. virtual void verifyFetchFormatCrc(IDistributedFile * f)
  2195. {
  2196. actualDiskMeta.set(helper.queryDiskRecordSize());
  2197. translator.clear();
  2198. if (getLayoutTranslationMode()==RecordTranslationMode::None)
  2199. {
  2200. ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
  2201. }
  2202. else
  2203. {
  2204. bool crcMatched = ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, false); // MORE - fetch requires all to match.
  2205. if (!crcMatched)
  2206. {
  2207. IPropertyTree &props = f->queryAttributes();
  2208. actualDiskMeta.setown(getDaliLayoutInfo(props));
  2209. if (actualDiskMeta)
  2210. {
  2211. translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
  2212. DBGLOG("Record layout translator created for %s", f->queryLogicalName());
  2213. translator->describe();
  2214. if (translator->canTranslate())
  2215. {
  2216. if (getLayoutTranslationMode()==RecordTranslationMode::None)
  2217. throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
  2218. VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
  2219. agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
  2220. }
  2221. else
  2222. throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());
  2223. }
  2224. else
  2225. throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s - key layout information not found", f->queryLogicalName());
  2226. }
  2227. }
  2228. }
  2229. protected:
  2230. CriticalSection transformCrit;
  2231. IHThorFetchArg & helper;
  2232. };
  2233. extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  2234. {
  2235. size32_t kl;
  2236. void *k;
  2237. arg.getFileEncryptKey(kl,k);
  2238. MemoryAttr encryptionkey;
  2239. encryptionkey.setOwn(kl,k);
  2240. return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, _graph, arg.queryDiskRecordSize(), _node, encryptionkey);
  2241. }
  2242. //------------------------------------------------------------------------------------------
  2243. class CHThorCsvFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
  2244. {
  2245. public:
  2246. CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  2247. : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, NULL, _node), helper(_arg)
  2248. {
  2249. //MORE: I have no idea what should be passed for recordSize in the line above, either something that reads a fixed size, or
  2250. //reads a record based on the csv information
  2251. ICsvParameters * csvInfo = _arg.queryCsvParameters();
  2252. OwnedRoxieString lfn(fetch.getFileName());
  2253. Owned<ILocalOrDistributedFile> ldFile = resolveLFNFlat(agent, lfn, "CsvFetch", 0 != (_arg.getFetchFlags() & FFdatafileoptional), isCodeSigned);
  2254. IDistributedFile * dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  2255. const char * quotes = NULL;
  2256. const char * separators = NULL;
  2257. const char * terminators = NULL;
  2258. const char * escapes = NULL;
  2259. if (dFile)
  2260. {
  2261. IPropertyTree & options = dFile->queryAttributes();
  2262. quotes = options.queryProp("@csvQuote");
  2263. separators = options.queryProp("@csvSeparate");
  2264. terminators = options.queryProp("@csvTerminate");
  2265. escapes = options.queryProp("@csvEscape");
  2266. agent.logFileAccess(dFile, "HThor", "READ", graph);
  2267. }
  2268. else
  2269. {
  2270. StringBuffer buff;
  2271. buff.append("Skipping OPT fetch of nonexistent file ").append(lfn);
  2272. agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptFile, SeverityInformation, MSGAUD_user, "hthor");
  2273. }
  2274. csvSplitter.init(_arg.getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  2275. }
  2276. ~CHThorCsvFetchActivity()
  2277. {
  2278. waitForThreads();
  2279. }
  2280. virtual bool needsAllocator() const { return true; }
  2281. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  2282. {
  2283. rawStream->reset(pos);
  2284. CriticalBlock procedure(transformCrit);
  2285. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  2286. unsigned thisLineLength = csvSplitter.splitLine(rawStream, maxRowSize);
  2287. if (!thisLineLength)
  2288. return;
  2289. try
  2290. {
  2291. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2292. size32_t thisSize = helper.transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), fetch->left, fetch->pos);
  2293. if (thisSize)
  2294. {
  2295. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2296. }
  2297. else
  2298. {
  2299. setRow(NULL, fetch->seq);
  2300. }
  2301. }
  2302. catch(IException * e)
  2303. {
  2304. throw makeWrappedException(e);
  2305. }
  2306. }
  2307. virtual void ready()
  2308. {
  2309. CHThorFetchActivityBase::ready();
  2310. rowLimit = helper.getRowLimit();
  2311. }
  2312. virtual void onLimitExceeded()
  2313. {
  2314. helper.onLimitExceeded();
  2315. }
  2316. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
  2317. {
  2318. return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
  2319. }
  2320. protected:
  2321. CSVSplitter csvSplitter;
  2322. CriticalSection transformCrit;
  2323. IHThorCsvFetchArg & helper;
  2324. };
  2325. extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  2326. {
  2327. return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  2328. }
  2329. //------------------------------------------------------------------------------------------
  2330. class XmlFetchPartHandler : public SimpleFetchPartHandlerBase, public IXMLSelect
  2331. {
  2332. public:
  2333. IMPLEMENT_IINTERFACE;
  2334. XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, bool _jsonFormat)
  2335. : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, NULL, NULL),
  2336. owner(_owner),
  2337. streamBufferSize(_streamBufferSize),
  2338. jsonFormat(_jsonFormat)
  2339. {
  2340. }
  2341. virtual void doRequest(FetchRequest * _fetch)
  2342. {
  2343. Owned<FetchRequest> fetch(_fetch);
  2344. offset_t pos = translateFPos(fetch->pos);
  2345. rawStream->seek(pos, IFSbegin);
  2346. while(!lastMatch)
  2347. {
  2348. bool gotNext = false;
  2349. try
  2350. {
  2351. gotNext = parser->next();
  2352. }
  2353. catch(IException * e)
  2354. {
  2355. StringBuffer fname;
  2356. RemoteFilename rfn;
  2357. part->getFilename(rfn).getPath(fname);
  2358. throw owner.makeWrappedException(e, fname.str());
  2359. }
  2360. if(!gotNext)
  2361. {
  2362. StringBuffer fname;
  2363. RemoteFilename rfn;
  2364. part->getFilename(rfn).getPath(fname);
  2365. throw MakeStringException(0, "Fetch fpos at EOF of %s", fname.str());
  2366. }
  2367. }
  2368. owner.processFetched(fetch, lastMatch);
  2369. lastMatch.clear();
  2370. parser->reset();
  2371. }
  2372. virtual void openPart()
  2373. {
  2374. if(parser)
  2375. return;
  2376. FetchPartHandlerBase::openPart();
  2377. rawStream.setown(createBufferedIOStream(rawFile, streamBufferSize));
  2378. parser.setown(jsonFormat ? createJSONParse(*rawStream, "/", *this) : createXMLParse(*rawStream, "/", *this));
  2379. }
  2380. //iface IXMLSelect
  2381. void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
  2382. {
  2383. lastMatch.set(&entry);
  2384. }
  2385. protected:
  2386. IXmlFetchHandlerCallback & owner;
  2387. Owned<IFileIOStream> rawStream;
  2388. Owned<IXMLParse> parser;
  2389. Owned<IColumnProvider> lastMatch;
  2390. unsigned streamBufferSize;
  2391. bool jsonFormat;
  2392. };
  2393. class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
  2394. {
  2395. public:
  2396. CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  2397. : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, NULL, _node), helper(_arg)
  2398. {
  2399. }
  2400. ~CHThorXmlFetchActivity()
  2401. {
  2402. waitForThreads();
  2403. }
  2404. virtual bool needsAllocator() const { return true; }
  2405. virtual void processFetched(FetchRequest const * fetch, IColumnProvider * lastMatch)
  2406. {
  2407. CriticalBlock procedure(transformCrit);
  2408. size32_t thisSize;
  2409. try
  2410. {
  2411. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2412. thisSize = helper.transform(rowBuilder, lastMatch, fetch->left, fetch->pos);
  2413. if(thisSize)
  2414. {
  2415. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2416. }
  2417. else
  2418. {
  2419. setRow(NULL, fetch->seq);
  2420. }
  2421. }
  2422. catch(IException * e)
  2423. {
  2424. throw makeWrappedException(e);
  2425. }
  2426. }
  2427. IException * makeWrappedException(IException * e) const { return CHThorActivityBase::makeWrappedException(e); }
  2428. virtual IException * makeWrappedException(IException * e, char const * extra) const { return CHThorActivityBase::makeWrappedException(e, extra); }
  2429. virtual void ready()
  2430. {
  2431. CHThorFetchActivityBase::ready();
  2432. rowLimit = helper.getRowLimit();
  2433. }
  2434. virtual void onLimitExceeded()
  2435. {
  2436. helper.onLimitExceeded();
  2437. }
  2438. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
  2439. {
  2440. return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, kind==TAKjsonfetch); //MORE: need to put correct stream buffer size here, when Gavin provides it
  2441. }
  2442. protected:
  2443. CriticalSection transformCrit;
  2444. IHThorXmlFetchArg & helper;
  2445. };
  2446. extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  2447. {
  2448. return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  2449. }
  2450. //------------------------------------------------------------------------------------------
  2451. class CJoinGroup;
  2452. class MatchSet : public CInterface
  2453. {
  2454. public:
  2455. MatchSet(CJoinGroup * _jg) : jg(_jg)
  2456. {
  2457. }
  2458. ~MatchSet()
  2459. {
  2460. ForEachItemIn(idx, rows)
  2461. ReleaseRoxieRow(rows.item(idx));
  2462. }
  2463. void addRightMatch(void * right);
  2464. offset_t addRightPending();
  2465. void setPendingRightMatch(offset_t seq, void * right);
  2466. void incRightMatchCount();
  2467. unsigned count() const { return rows.ordinality(); }
  2468. CJoinGroup * queryJoinGroup() const { return jg; }
  2469. void * queryRow(unsigned idx) const { return rows.item(idx); }
  2470. private:
  2471. CJoinGroup * jg;
  2472. PointerArray rows;
  2473. };
  2474. interface IJoinProcessor
  2475. {
  2476. virtual CJoinGroup *createJoinGroup(const void *row) = 0;
  2477. virtual void readyManager(IKeyManager * manager, const void * row) = 0;
  2478. virtual void doneManager(IKeyManager * manager) = 0;
  2479. virtual bool addMatch(MatchSet * ms, IKeyManager * manager) = 0;
  2480. virtual void onComplete(CJoinGroup * jg) = 0;
  2481. virtual bool leftCanMatch(const void *_left) = 0;
  2482. virtual const IDynamicTransform * getLayoutTranslator(IDistributedFile * f) = 0;
  2483. virtual const RtlRecord &queryIndexRecord() = 0;
  2484. virtual void verifyIndex(IDistributedFile * f, IKeyIndex * idx, const IDynamicTransform * trans) = 0;
  2485. virtual bool hasNewSegmentMonitors() = 0;
  2486. };
  2487. class CJoinGroup : implements IInterface, public CInterface
  2488. {
  2489. public:
  2490. class MatchIterator
  2491. {
  2492. public:
  2493. // Single threaded by now
  2494. void const * queryRow() const { return owner.matchsets.item(ms).queryRow(idx); }
  2495. bool start()
  2496. {
  2497. idx = 0;
  2498. for(ms = 0; owner.matchsets.isItem(ms); ++ms)
  2499. if(owner.matchsets.item(ms).count())
  2500. return true;
  2501. return false;
  2502. }
  2503. bool next()
  2504. {
  2505. if(++idx < owner.matchsets.item(ms).count())
  2506. return true;
  2507. idx = 0;
  2508. while(owner.matchsets.isItem(++ms))
  2509. if(owner.matchsets.item(ms).count())
  2510. return true;
  2511. return false;
  2512. }
  2513. private:
  2514. friend class CJoinGroup;
  2515. MatchIterator(CJoinGroup const & _owner) : owner(_owner) {}
  2516. CJoinGroup const & owner;
  2517. unsigned ms;
  2518. unsigned idx;
  2519. } matches;
  2520. CJoinGroup *prev; // Doubly-linked list to allow us to keep track of ones that are still in use
  2521. CJoinGroup *next;
  2522. CJoinGroup() : matches(*this)
  2523. {
  2524. // Used for head object only
  2525. left = NULL;
  2526. prev = NULL;
  2527. next = NULL;
  2528. endMarkersPending = 0;
  2529. groupStart = NULL;
  2530. matchcount = 0;
  2531. }
  2532. IMPLEMENT_IINTERFACE;
  2533. CJoinGroup(const void *_left, IJoinProcessor *_join, CJoinGroup *_groupStart) : matches(*this),join(_join)
  2534. {
  2535. candidates = 0;
  2536. left = _left;
  2537. if (_groupStart)
  2538. {
  2539. groupStart = _groupStart;
  2540. ++_groupStart->endMarkersPending;
  2541. }
  2542. else
  2543. {
  2544. groupStart = this;
  2545. endMarkersPending = 1;
  2546. }
  2547. matchcount = 0;
  2548. }
  2549. ~CJoinGroup()
  2550. {
  2551. ReleaseRoxieRow(left);
  2552. join = nullptr; // not required, but clear to highlight any race conditions
  2553. }
  2554. MatchSet * getMatchSet()
  2555. {
  2556. CriticalBlock b(crit);
  2557. MatchSet * ms = new MatchSet(this);
  2558. matchsets.append(*ms);
  2559. return ms;
  2560. }
  2561. inline void notePending()
  2562. {
  2563. // assertex(!complete());
  2564. ++groupStart->endMarkersPending;
  2565. }
  2566. inline bool complete() const
  2567. {
  2568. return groupStart->endMarkersPending == 0;
  2569. }
  2570. inline bool inGroup(CJoinGroup *leader) const
  2571. {
  2572. return groupStart==leader;
  2573. }
  2574. inline void noteEnd()
  2575. {
  2576. assertex(!complete());
  2577. //Another completing group could cause this group to be processed once endMarkersPending is set to 0
  2578. //So link this object to ensure it is not disposed of while this function is executing
  2579. Linked<CJoinGroup> saveThis(this);
  2580. if (--groupStart->endMarkersPending == 0)
  2581. {
  2582. join->onComplete(groupStart);
  2583. }
  2584. }
  2585. inline unsigned noteCandidate()
  2586. {
  2587. CriticalBlock b(crit);
  2588. return ++candidates;
  2589. }
  2590. inline const void *queryLeft() const
  2591. {
  2592. return left;
  2593. }
  2594. inline unsigned rowsSeen() const
  2595. {
  2596. CriticalBlock b(crit);
  2597. return matchcount;
  2598. }
  2599. inline unsigned candidateCount() const
  2600. {
  2601. CriticalBlock b(crit);
  2602. return candidates;
  2603. }
  2604. protected:
  2605. friend class MatchSet;
  2606. friend class MatchIterator;
  2607. const void *left;
  2608. unsigned matchcount;
  2609. CIArrayOf<MatchSet> matchsets;
  2610. std::atomic<unsigned> endMarkersPending;
  2611. IJoinProcessor *join = nullptr;
  2612. mutable CriticalSection crit;
  2613. CJoinGroup *groupStart;
  2614. unsigned candidates;
  2615. };
  2616. void MatchSet::addRightMatch(void * right)
  2617. {
  2618. assertex(!jg->complete());
  2619. CriticalBlock b(jg->crit);
  2620. rows.append(right);
  2621. jg->matchcount++;
  2622. }
  2623. offset_t MatchSet::addRightPending()
  2624. {
  2625. assertex(!jg->complete());
  2626. CriticalBlock b(jg->crit);
  2627. offset_t seq = rows.ordinality();
  2628. rows.append(NULL);
  2629. return seq;
  2630. }
  2631. void MatchSet::setPendingRightMatch(offset_t seq, void * right)
  2632. {
  2633. assertex(!jg->complete());
  2634. CriticalBlock b(jg->crit);
  2635. rows.replace(right, (aindex_t)seq);
  2636. jg->matchcount++;
  2637. }
  2638. void MatchSet::incRightMatchCount()
  2639. {
  2640. assertex(!jg->complete());
  2641. CriticalBlock b(jg->crit);
  2642. jg->matchcount++;
  2643. }
  2644. class JoinGroupPool : public CInterface
  2645. {
  2646. CJoinGroup *groupStart;
  2647. public:
  2648. CJoinGroup head;
  2649. CriticalSection crit;
  2650. bool preserveGroups;
  2651. JoinGroupPool(bool _preserveGroups)
  2652. {
  2653. head.next = &head;
  2654. head.prev = &head;
  2655. preserveGroups = _preserveGroups;
  2656. groupStart = NULL;
  2657. }
  2658. ~JoinGroupPool()
  2659. {
  2660. CJoinGroup *finger = head.next;
  2661. while (finger != &head)
  2662. {
  2663. CJoinGroup *next = finger->next;
  2664. finger->Release();
  2665. finger = next;
  2666. }
  2667. }
  2668. CJoinGroup *createJoinGroup(const void *row, IJoinProcessor *join)
  2669. {
  2670. CJoinGroup *jg = new CJoinGroup(row, join, groupStart);
  2671. if (preserveGroups && !groupStart)
  2672. {
  2673. jg->notePending(); // Make sure we wait for the group end
  2674. groupStart = jg;
  2675. }
  2676. CriticalBlock c(crit);
  2677. jg->next = &head;
  2678. jg->prev = head.prev;
  2679. head.prev->next = jg;
  2680. head.prev = jg;
  2681. return jg;
  2682. }
  2683. void endGroup()
  2684. {
  2685. if (groupStart)
  2686. groupStart->noteEnd();
  2687. groupStart = NULL;
  2688. }
  2689. void releaseJoinGroup(CJoinGroup *goer)
  2690. {
  2691. CriticalBlock c(crit);
  2692. goer->next->prev = goer->prev;
  2693. goer->prev->next = goer->next;
  2694. goer->Release(); // MORE - could put onto another list to reuse....
  2695. }
  2696. };
  2697. //=============================================================================================
  2698. class DistributedKeyLookupHandler;
  2699. class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements IInterface
  2700. {
  2701. IJoinProcessor &owner;
  2702. Owned<IKeyManager> manager;
  2703. IAgentContext &agent;
  2704. DistributedKeyLookupHandler * tlk;
  2705. public:
  2706. IMPLEMENT_IINTERFACE;
  2707. KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent);
  2708. ~KeyedLookupPartHandler()
  2709. {
  2710. while(pending.dequeue())
  2711. ; //do nothing but dequeue as don't own MatchSets
  2712. }
  2713. private:
  2714. virtual void doRequest(MatchSet * ms)
  2715. {
  2716. agent.reportProgress(NULL);
  2717. CJoinGroup * jg = ms->queryJoinGroup();
  2718. owner.readyManager(manager, jg->queryLeft());
  2719. while(manager->lookup(true))
  2720. {
  2721. if(owner.addMatch(ms, manager))
  2722. break;
  2723. }
  2724. jg->noteEnd();
  2725. owner.doneManager(manager);
  2726. }
  2727. virtual void openPart();
  2728. };
  2729. interface IKeyLookupHandler : extends IInterface
  2730. {
  2731. virtual void addRow(const void *row) = 0;
  2732. virtual void stopThread() = 0;
  2733. };
  2734. class DistributedKeyLookupHandler : public CInterface, implements IThreadedExceptionHandler, implements IKeyLookupHandler
  2735. {
  2736. bool opened;
  2737. IArrayOf<IKeyManager> managers;
  2738. Owned<const IDynamicTransform> trans;
  2739. UnsignedArray keyNumParts;
  2740. IArrayOf<KeyedLookupPartHandler> parts;
  2741. IArrayOf<IDistributedFile> keyFiles;
  2742. IArrayOf<IDistributedFilePart> tlks;
  2743. IJoinProcessor &owner;
  2744. CriticalSection exceptionCrit;
  2745. IException *exception;
  2746. Linked<IDistributedFile> file;
  2747. PartHandlerThreadFactory<MatchSet> threadFactory;
  2748. Owned<IThreadPool> threadPool;
  2749. IntArray subSizes;
  2750. IAgentContext &agent;
  2751. void addFile(IDistributedFile &f)
  2752. {
  2753. if((f.numParts() == 1) || (f.queryAttributes().hasProp("@local")))
  2754. throw MakeStringException(0, "Superfile %s contained mixed monolithic/local/noroot and regular distributed keys --- not supported", file->queryLogicalName());
  2755. subSizes.append(parts.length());
  2756. unsigned numParts = f.numParts()-1;
  2757. for (unsigned idx = 0; idx < numParts; idx++)
  2758. {
  2759. IDistributedFilePart *part = f.getPart(idx);
  2760. parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent));
  2761. }
  2762. keyFiles.append(OLINK(f));
  2763. tlks.append(*f.getPart(numParts));
  2764. keyNumParts.append(numParts);
  2765. }
  2766. public:
  2767. IMPLEMENT_IINTERFACE;
  2768. DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
  2769. : owner(_owner), file(f), agent(_agent)
  2770. {
  2771. threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
  2772. IDistributedSuperFile *super = f->querySuperFile();
  2773. if (super)
  2774. {
  2775. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true);
  2776. ForEach(*it)
  2777. addFile(it->query());
  2778. }
  2779. else
  2780. addFile(*f);
  2781. opened = false;
  2782. exception = NULL;
  2783. }
  2784. ~DistributedKeyLookupHandler()
  2785. {
  2786. threadPool.clear();
  2787. }
  2788. void addRow(const void *row)
  2789. {
  2790. if (owner.leftCanMatch(row))
  2791. {
  2792. if(!opened)
  2793. openTLK();
  2794. CJoinGroup *jg = owner.createJoinGroup(row);
  2795. ForEachItemIn(subno, managers)
  2796. {
  2797. agent.reportProgress(NULL);
  2798. unsigned subStart = subSizes.item(subno);
  2799. IKeyManager & manager = managers.item(subno);
  2800. owner.readyManager(&manager, row);
  2801. while(manager.lookup(false))
  2802. {
  2803. unsigned recptr = (unsigned)extractFpos(&manager);
  2804. if (recptr)
  2805. {
  2806. jg->notePending();
  2807. parts.item(recptr+subStart-1).addRow(jg->getMatchSet());
  2808. }
  2809. }
  2810. owner.doneManager(&manager);
  2811. }
  2812. jg->noteEnd();
  2813. }
  2814. else
  2815. {
  2816. CJoinGroup *jg = owner.createJoinGroup(row);
  2817. jg->noteEnd();
  2818. }
  2819. }
  2820. void openTLK()
  2821. {
  2822. ForEachItemIn(idx, tlks)
  2823. {
  2824. IDistributedFile & f = keyFiles.item(idx);
  2825. IDistributedFilePart &tlk = tlks.item(idx);
  2826. Owned<IKeyIndex> index = openKeyFile(tlk);
  2827. //Owned<IRecordLayoutTranslator>
  2828. trans.setown(owner.getLayoutTranslator(&f));
  2829. owner.verifyIndex(&f, index, trans);
  2830. Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false);
  2831. managers.append(*manager.getLink());
  2832. }
  2833. opened = true;
  2834. }
  2835. void stopThread()
  2836. {
  2837. ForEachItemIn(idx, parts)
  2838. {
  2839. parts.item(idx).stopThread();
  2840. parts.item(idx).join();
  2841. }
  2842. if (exception)
  2843. throw exception;
  2844. }
  2845. virtual void noteException(IException *E)
  2846. {
  2847. CriticalBlock procedure(exceptionCrit);
  2848. if (exception)
  2849. E->Release();
  2850. else
  2851. exception = E;
  2852. }
  2853. const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; }
  2854. };
  2855. KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent)
  2856. : ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk)
  2857. {
  2858. }
  2859. void KeyedLookupPartHandler::openPart()
  2860. {
  2861. if(manager)
  2862. return;
  2863. Owned<IKeyIndex> index = openKeyFile(*part);
  2864. manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
  2865. const IDynamicTransform * trans = tlk->queryRecordLayoutTranslator();
  2866. if(trans && !index->isTopLevelKey())
  2867. manager->setLayoutTranslator(trans);
  2868. }
  2869. class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandler
  2870. {
  2871. IArrayOf<IKeyManager> managers;
  2872. Linked<IDistributedFile> file;
  2873. IDistributedSuperFile * super;
  2874. IArrayOf<IDistributedFile> keyFiles;
  2875. IJoinProcessor &owner;
  2876. IAgentContext &agent;
  2877. bool opened;
  2878. public:
  2879. IMPLEMENT_IINTERFACE;
  2880. MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
  2881. : file(f), owner(_owner), agent(_agent), opened(false)
  2882. {
  2883. super = f->querySuperFile();
  2884. if (super)
  2885. {
  2886. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true);
  2887. ForEach(*it)
  2888. addFile(it->query());
  2889. }
  2890. else
  2891. addFile(*f);
  2892. }
  2893. void addFile(IDistributedFile &f)
  2894. {
  2895. if((f.numParts() != 1) && (!f.queryAttributes().hasProp("@local")))
  2896. throw MakeStringException(0, "Superfile %s contained mixed monolithic/local/noroot and regular distributed keys --- not supported", file->queryLogicalName());
  2897. keyFiles.append(OLINK(f));
  2898. }
  2899. void addRow(const void *row)
  2900. {
  2901. if (owner.leftCanMatch(row))
  2902. {
  2903. if(!opened)
  2904. openKey();
  2905. CJoinGroup *jg = owner.createJoinGroup(row);
  2906. ForEachItemIn(idx, managers)
  2907. {
  2908. agent.reportProgress(NULL);
  2909. IKeyManager & manager = managers.item(idx);
  2910. owner.readyManager(&manager, row);
  2911. while(manager.lookup(true))
  2912. {
  2913. if(owner.addMatch(jg->getMatchSet(), &manager))
  2914. break;
  2915. }
  2916. owner.doneManager(&manager);
  2917. }
  2918. jg->noteEnd();
  2919. }
  2920. else
  2921. {
  2922. CJoinGroup *jg = owner.createJoinGroup(row);
  2923. jg->noteEnd();
  2924. }
  2925. }
  2926. void openKey()
  2927. {
  2928. ForEachItemIn(idx, keyFiles)
  2929. {
  2930. IDistributedFile & f = keyFiles.item(idx);
  2931. Owned<const IDynamicTransform> trans = owner.getLayoutTranslator(&f);
  2932. Owned<IKeyManager> manager;
  2933. if(f.numParts() == 1)
  2934. {
  2935. Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
  2936. owner.verifyIndex(&f, index, trans);
  2937. manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
  2938. }
  2939. else
  2940. {
  2941. unsigned num = f.numParts()-1;
  2942. Owned<IKeyIndexSet> parts = createKeyIndexSet();
  2943. Owned<IKeyIndex> index;
  2944. for(unsigned i=0; i<num; ++i)
  2945. {
  2946. index.setown(openKeyFile(f.queryPart(i)));
  2947. parts->addIndex(index.getLink());
  2948. }
  2949. owner.verifyIndex(&f, index, trans);
  2950. manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, nullptr, owner.hasNewSegmentMonitors(), false));
  2951. }
  2952. if(trans)
  2953. manager->setLayoutTranslator(trans);
  2954. managers.append(*manager.getLink());
  2955. }
  2956. opened = true;
  2957. }
  2958. void stopThread()
  2959. {
  2960. }
  2961. };
  2962. //------------------------------------------------------------------------------------------
  2963. class KeyedJoinFetchRequest : public CInterface
  2964. {
  2965. public:
  2966. MatchSet * ms;
  2967. offset_t pos;
  2968. offset_t seq;
  2969. KeyedJoinFetchRequest(MatchSet * _ms, offset_t _pos, offset_t _seq) : ms(_ms), pos(_pos), seq(_seq) {}
  2970. };
  2971. class IKeyedJoinFetchHandlerCallback
  2972. {
  2973. public:
  2974. virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
  2975. };
  2976. class KeyedJoinFetchPartHandler : public FetchPartHandlerBase, public ThreadedPartHandler<KeyedJoinFetchRequest>
  2977. {
  2978. public:
  2979. KeyedJoinFetchPartHandler(IKeyedJoinFetchHandlerCallback & _owner, IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
  2980. : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
  2981. ThreadedPartHandler<KeyedJoinFetchRequest>(_part, _handler, _threadPool),
  2982. owner(_owner)
  2983. {
  2984. }
  2985. virtual ~KeyedJoinFetchPartHandler()
  2986. {
  2987. while(KeyedJoinFetchRequest * fetch = pending.dequeue())
  2988. fetch->Release();
  2989. }
  2990. IMPLEMENT_IINTERFACE;
  2991. virtual IDistributedFilePart * queryPart() { return part; }
  2992. private:
  2993. virtual void openPart()
  2994. {
  2995. FetchPartHandlerBase::openPart();
  2996. }
  2997. virtual void doRequest(KeyedJoinFetchRequest * _fetch)
  2998. {
  2999. Owned<KeyedJoinFetchRequest> fetch(_fetch);
  3000. offset_t pos = translateFPos(fetch->pos);
  3001. if(pos >= rawFile->size())
  3002. throw MakeStringException(0, "Attempted to fetch at invalid filepos");
  3003. owner.processFetch(fetch, pos, rawStream);
  3004. }
  3005. private:
  3006. IKeyedJoinFetchHandlerCallback & owner;
  3007. };
  3008. class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements IJoinProcessor, public IKeyedJoinFetchHandlerCallback, public IFetchHandlerFactory<KeyedJoinFetchPartHandler>
  3009. {
  3010. PartHandlerThreadFactory<FetchRequest> threadFactory;
  3011. Owned<DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest> > parts;
  3012. IHThorKeyedJoinArg &helper;
  3013. Owned<IKeyLookupHandler> lookup;
  3014. Owned<IEngineRowAllocator> defaultRightAllocator;
  3015. OwnedConstRoxieRow defaultRight;
  3016. bool leftOuter;
  3017. bool exclude;
  3018. bool extractJoinFields;
  3019. bool limitFail;
  3020. bool limitOnFail;
  3021. bool needsDiskRead;
  3022. unsigned atMost;
  3023. unsigned abortLimit;
  3024. unsigned keepLimit;
  3025. bool preserveOrder;
  3026. bool preserveGroups;
  3027. Owned<JoinGroupPool> pool;
  3028. QueueOf<const void, true> pending;
  3029. CriticalSection statsCrit, imatchCrit, fmatchCrit;
  3030. RelaxedAtomic<unsigned> prefiltered;
  3031. RelaxedAtomic<unsigned> postfiltered;
  3032. RelaxedAtomic<unsigned> skips;
  3033. unsigned seeks;
  3034. unsigned scans;
  3035. unsigned wildseeks;
  3036. OwnedRowArray extractedRows;
  3037. Owned <ILocalOrDistributedFile> ldFile;
  3038. IDistributedFile * dFile;
  3039. IDistributedSuperFile * super;
  3040. CachedOutputMetaData eclKeySize;
  3041. Owned<ISourceRowPrefetcher> prefetcher;
  3042. IPointerArrayOf<IOutputMetaData> actualLayouts; // all the index layouts are saved in here to ensure their lifetime is adequate
  3043. Owned<IOutputMetaData> actualDiskMeta; // only one disk layout is permitted
  3044. Owned<const IDynamicTransform> translator;
  3045. RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
  3046. bool isCodeSigned = false;
  3047. public:
  3048. CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  3049. : CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
  3050. {
  3051. prefiltered = 0;
  3052. postfiltered = 0;
  3053. skips = 0;
  3054. seeks = 0;
  3055. scans = 0;
  3056. eclKeySize.set(helper.queryIndexRecordSize());
  3057. if (_node)
  3058. {
  3059. const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layouttranslation']/@value");
  3060. if (recordTranslationModeHintText)
  3061. recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText, true);
  3062. isCodeSigned = isActivityCodeSigned(*_node);
  3063. }
  3064. }
  3065. ~CHThorKeyedJoinActivity()
  3066. {
  3067. clearQueue();
  3068. waitForThreads();
  3069. }
  3070. virtual bool needsAllocator() const { return true; }
  3071. virtual bool hasNewSegmentMonitors() { return helper.hasNewSegmentMonitors(); }
  3072. virtual void ready()
  3073. {
  3074. CHThorThreadedActivityBase::ready();
  3075. preserveOrder = ((helper.getJoinFlags() & JFreorderable) == 0);
  3076. preserveGroups = helper.queryOutputMeta()->isGrouped();
  3077. needsDiskRead = helper.diskAccessRequired();
  3078. extractJoinFields = ((helper.getJoinFlags() & JFextractjoinfields) != 0);
  3079. atMost = helper.getJoinLimit();
  3080. if (atMost == 0) atMost = (unsigned)-1;
  3081. abortLimit = helper.getMatchAbortLimit();
  3082. if (abortLimit == 0) abortLimit = (unsigned)-1;
  3083. leftOuter = ((helper.getJoinFlags() & JFleftouter) != 0);
  3084. exclude = ((helper.getJoinFlags() & JFexclude) != 0);
  3085. keepLimit = helper.getKeepLimit();
  3086. if (keepLimit == 0) keepLimit = (unsigned)-1;
  3087. rowLimit = helper.getRowLimit();
  3088. pool.setown(new JoinGroupPool(preserveGroups));
  3089. limitOnFail = ((helper.getJoinFlags() & JFonfail) != 0);
  3090. limitFail = !limitOnFail && ((helper.getJoinFlags() & JFmatchAbortLimitSkips) == 0);
  3091. if(leftOuter || limitOnFail)
  3092. {
  3093. if (!defaultRight)
  3094. {
  3095. RtlDynamicRowBuilder rowBuilder(queryRightRowAllocator());
  3096. size32_t thisSize = helper.createDefaultRight(rowBuilder);
  3097. defaultRight.setown(rowBuilder.finalizeRowClear(thisSize));
  3098. }
  3099. }
  3100. }
  3101. virtual void stop()
  3102. {
  3103. ldFile.clear();
  3104. CHThorThreadedActivityBase::stop();
  3105. }
  3106. virtual void initializeThreadPool()
  3107. {
  3108. threadPool.setown(createThreadPool("hthor keyed join fetch thread pool", &threadFactory));
  3109. }
  3110. virtual void initParts(IDistributedFile * f)
  3111. {
  3112. size32_t kl;
  3113. void *k;
  3114. fetch.getFileEncryptKey(kl,k);
  3115. MemoryAttr encryptionkey;
  3116. encryptionkey.setOwn(kl,k);
  3117. Owned<IEngineRowAllocator> inputRowAllocator;
  3118. if (needsDiskRead)
  3119. {
  3120. inputRowAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
  3121. parts.setown(new DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest>(f, *this, encryptionkey, prefetcher, inputRowAllocator));
  3122. prefetcher.setown(actualDiskMeta->createDiskPrefetcher());
  3123. }
  3124. }
  3125. virtual void stopParts()
  3126. {
  3127. if(parts)
  3128. parts->stopThread();
  3129. }
  3130. virtual bool isGrouped() { return preserveGroups; }
  3131. virtual void waitForThreads()
  3132. {
  3133. aborting = true;
  3134. if (inputThread)
  3135. inputThread->join();
  3136. lookup.clear();
  3137. threadPool.clear();
  3138. }
  3139. virtual void clearQueue()
  3140. {
  3141. while (pending.ordinality())
  3142. ReleaseRoxieRow(pending.dequeue());
  3143. }
  3144. void addRow(const void *row)
  3145. {
  3146. CriticalBlock procedure(pendingCrit);
  3147. pending.enqueue(row);
  3148. avail.signal();
  3149. }
  3150. const void * getRow()
  3151. {
  3152. if (stopped)
  3153. return NULL;
  3154. avail.wait();
  3155. CriticalBlock procedure(pendingCrit);
  3156. if (exception)
  3157. {
  3158. IException *E = exception;
  3159. exception = NULL;
  3160. throw E;
  3161. }
  3162. if (pending.ordinality())
  3163. return pending.dequeue();
  3164. else
  3165. {
  3166. stopped = true;
  3167. return NULL;
  3168. }
  3169. }
  3170. virtual void fetchAll()
  3171. {
  3172. bool eogSeen = false; // arguably true makes more sense
  3173. for (;;)
  3174. {
  3175. if (aborting)
  3176. break;
  3177. const void *row = input->nextRow();
  3178. if (!row)
  3179. {
  3180. if (eogSeen)
  3181. break;
  3182. else
  3183. eogSeen = true;
  3184. pool->endGroup();
  3185. }
  3186. else
  3187. {
  3188. eogSeen = false;
  3189. if(lookup)
  3190. {
  3191. lookup->addRow(row);
  3192. }
  3193. else
  3194. {
  3195. CJoinGroup *jg = createJoinGroup(row);
  3196. jg->noteEnd();
  3197. }
  3198. }
  3199. }
  3200. if(lookup)
  3201. lookup->stopThread();
  3202. if (parts)
  3203. parts->stopThread();
  3204. stopThread();
  3205. }
  3206. virtual KeyedJoinFetchPartHandler * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
  3207. {
  3208. return new KeyedJoinFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
  3209. }
  3210. virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  3211. {
  3212. CThorContiguousRowBuffer prefetchSource;
  3213. prefetchSource.setStream(rawStream);
  3214. prefetchSource.reset(pos);
  3215. prefetcher->readAhead(prefetchSource);
  3216. const byte *row = prefetchSource.queryRow();
  3217. MemoryBuffer buf;
  3218. if (translator)
  3219. {
  3220. MemoryBufferBuilder aBuilder(buf, 0);
  3221. FetchVirtualFieldCallback fieldCallback(pos);
  3222. translator->translate(aBuilder, fieldCallback, row);
  3223. row = aBuilder.getSelf();
  3224. }
  3225. if(match(fetch->ms, row))
  3226. {
  3227. if(exclude)
  3228. {
  3229. fetch->ms->incRightMatchCount();
  3230. }
  3231. else
  3232. {
  3233. RtlDynamicRowBuilder extractBuilder(queryRightRowAllocator());
  3234. size32_t size = helper.extractJoinFields(extractBuilder, row, NULL);
  3235. void * ret = (void *) extractBuilder.finalizeRowClear(size);
  3236. fetch->ms->setPendingRightMatch(fetch->seq, ret);
  3237. }
  3238. }
  3239. fetch->ms->queryJoinGroup()->noteEnd();
  3240. }
  3241. bool match(MatchSet * ms, const void * right)
  3242. {
  3243. CriticalBlock proc(fmatchCrit);
  3244. bool ret = helper.fetchMatch(ms->queryJoinGroup()->queryLeft(), right);
  3245. if (!ret)
  3246. ++postfiltered;
  3247. return ret;
  3248. }
  3249. virtual bool leftCanMatch(const void * _left)
  3250. {
  3251. bool ret = helper.leftCanMatch(_left);
  3252. if (!ret)
  3253. ++prefiltered;
  3254. return ret;
  3255. }
  3256. virtual CJoinGroup *createJoinGroup(const void *row)
  3257. {
  3258. // NOTE - single threaded
  3259. return pool->createJoinGroup(row, this);
  3260. }
  3261. virtual void onComplete(CJoinGroup *jg)
  3262. {
  3263. CriticalBlock c(pool->crit);
  3264. if (preserveOrder)
  3265. {
  3266. CJoinGroup *finger = pool->head.next;
  3267. if(preserveGroups)
  3268. {
  3269. unsigned joinGroupSize = 0;
  3270. Linked<CJoinGroup> firstInGroup = finger;
  3271. while(finger != &pool->head)
  3272. {
  3273. CJoinGroup *next = finger->next;
  3274. if(finger->complete())
  3275. joinGroupSize += doJoinGroup(finger);
  3276. else
  3277. break;
  3278. finger = next;
  3279. if(!finger->inGroup(firstInGroup))
  3280. {
  3281. if(joinGroupSize)
  3282. addRow(NULL);
  3283. joinGroupSize = 0;
  3284. firstInGroup.set(finger);
  3285. }
  3286. }
  3287. assertex(finger == firstInGroup.get());
  3288. }
  3289. else
  3290. {
  3291. while(finger != &pool->head)
  3292. {
  3293. CJoinGroup *next = finger->next;
  3294. if(finger->complete())
  3295. doJoinGroup(finger);
  3296. else
  3297. break;
  3298. finger = next;
  3299. }
  3300. }
  3301. }
  3302. else if (preserveGroups)
  3303. {
  3304. Linked<CJoinGroup> head = jg; // Must avoid releasing head until the end, or while loop can overrun if head is reused
  3305. assertex(jg->inGroup(jg));
  3306. CJoinGroup *finger = jg;
  3307. unsigned joinGroupSize = 0;
  3308. while (finger->inGroup(jg))
  3309. {
  3310. CJoinGroup *next = finger->next;
  3311. joinGroupSize += doJoinGroup(finger);
  3312. finger = next;
  3313. }
  3314. if (joinGroupSize)
  3315. addRow(NULL);
  3316. }
  3317. else
  3318. doJoinGroup(jg);
  3319. }
  3320. void failLimit(const void * left)
  3321. {
  3322. helper.onMatchAbortLimitExceeded();
  3323. CommonXmlWriter xmlwrite(0);
  3324. if (input && input->queryOutputMeta() && input->queryOutputMeta()->hasXML())
  3325. {
  3326. input->queryOutputMeta()->toXML((byte *) left, xmlwrite);
  3327. }
  3328. throw MakeStringException(0, "More than %d match candidates in keyed join for row %s", abortLimit, xmlwrite.str());
  3329. }
  3330. unsigned doJoinGroup(CJoinGroup *jg)
  3331. {
  3332. unsigned matched = jg->rowsSeen();
  3333. unsigned added = 0;
  3334. const void *left = jg->queryLeft();
  3335. if (jg->candidateCount() > abortLimit)
  3336. {
  3337. if(limitFail)
  3338. failLimit(left);
  3339. if(limitOnFail)
  3340. {
  3341. Owned<IException> except;
  3342. try
  3343. {
  3344. failLimit(left);
  3345. }
  3346. catch(IException * e)
  3347. {
  3348. except.setown(e);
  3349. }
  3350. assertex(except);
  3351. size32_t transformedSize;
  3352. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3353. try
  3354. {
  3355. transformedSize = helper.onFailTransform(rowBuilder, left, defaultRight, 0, except);
  3356. }
  3357. catch(IException * e)
  3358. {
  3359. throw makeWrappedException(e);
  3360. }
  3361. if(transformedSize)
  3362. {
  3363. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3364. addRow(shrunk);
  3365. added++;
  3366. }
  3367. else
  3368. {
  3369. ++skips;
  3370. }
  3371. }
  3372. else
  3373. return 0;
  3374. }
  3375. else if(!matched || jg->candidateCount() > atMost)
  3376. {
  3377. if(leftOuter)
  3378. {
  3379. switch(kind)
  3380. {
  3381. case TAKkeyedjoin:
  3382. case TAKkeyeddenormalizegroup:
  3383. {
  3384. size32_t transformedSize = 0;
  3385. try
  3386. {
  3387. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3388. if (kind == TAKkeyedjoin)
  3389. transformedSize = helper.transform(rowBuilder, left, defaultRight, (__uint64)0, (unsigned)0);
  3390. else if (kind == TAKkeyeddenormalizegroup)
  3391. transformedSize = helper.transform(rowBuilder, left, defaultRight, 0, (const void * *)NULL);
  3392. if (transformedSize)
  3393. {
  3394. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3395. addRow(shrunk);
  3396. added++;
  3397. }
  3398. else
  3399. {
  3400. ++skips;
  3401. }
  3402. }
  3403. catch(IException * e)
  3404. {
  3405. throw makeWrappedException(e);
  3406. }
  3407. break;
  3408. }
  3409. case TAKkeyeddenormalize:
  3410. {
  3411. LinkRoxieRow(left);
  3412. addRow((void *) left );
  3413. added++;
  3414. break;
  3415. }
  3416. default:
  3417. throwUnexpected();
  3418. }
  3419. }
  3420. }
  3421. else if(!exclude)
  3422. {
  3423. switch(kind)
  3424. {
  3425. case TAKkeyedjoin:
  3426. {
  3427. if(jg->matches.start())
  3428. {
  3429. unsigned counter = 0;
  3430. do
  3431. {
  3432. try
  3433. {
  3434. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3435. void const * row = jg->matches.queryRow();
  3436. if(!row) continue;
  3437. offset_t fpos = 0;
  3438. size32_t transformedSize;
  3439. transformedSize = helper.transform(rowBuilder, left, row, fpos, ++counter);
  3440. if (transformedSize)
  3441. {
  3442. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3443. addRow(shrunk);
  3444. added++;
  3445. if (added==keepLimit)
  3446. break;
  3447. }
  3448. else
  3449. {
  3450. ++skips;
  3451. }
  3452. }
  3453. catch(IException * e)
  3454. {
  3455. throw makeWrappedException(e);
  3456. }
  3457. } while(jg->matches.next());
  3458. }
  3459. break;
  3460. }
  3461. case TAKkeyeddenormalize:
  3462. {
  3463. OwnedConstRoxieRow newLeft;
  3464. newLeft.set(left);
  3465. unsigned rowSize = 0;
  3466. unsigned count = 0;
  3467. unsigned rightAdded = 0;
  3468. if(jg->matches.start())
  3469. {
  3470. do
  3471. {
  3472. void const * row = jg->matches.queryRow();
  3473. if(!row) continue;
  3474. ++count;
  3475. offset_t fpos = 0;
  3476. size32_t transformedSize;
  3477. try
  3478. {
  3479. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3480. transformedSize = helper.transform(rowBuilder, newLeft, row, fpos, count);
  3481. if (transformedSize)
  3482. {
  3483. newLeft.setown(rowBuilder.finalizeRowClear(transformedSize));
  3484. rowSize = transformedSize;
  3485. rightAdded++;
  3486. if (rightAdded==keepLimit)
  3487. break;
  3488. }
  3489. else
  3490. {
  3491. ++skips;
  3492. }
  3493. }
  3494. catch(IException * e)
  3495. {
  3496. throw makeWrappedException(e);
  3497. }
  3498. } while(jg->matches.next());
  3499. }
  3500. if (rowSize)
  3501. {
  3502. addRow(newLeft.getClear());
  3503. ReleaseRoxieRow(newLeft);
  3504. added++;
  3505. }
  3506. break;
  3507. }
  3508. case TAKkeyeddenormalizegroup:
  3509. {
  3510. extractedRows.clear();
  3511. unsigned count = 0;
  3512. if(jg->matches.start())
  3513. do
  3514. {
  3515. const void * row = jg->matches.queryRow();
  3516. if(!row) continue;
  3517. if(++count > keepLimit)
  3518. break;
  3519. LinkRoxieRow(row);
  3520. extractedRows.append(row);
  3521. } while(jg->matches.next());
  3522. size32_t transformedSize;
  3523. try
  3524. {
  3525. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3526. transformedSize = helper.transform(rowBuilder, left, extractedRows.item(0), extractedRows.ordinality(), (const void * *)extractedRows.getArray());
  3527. extractedRows.clear();
  3528. if (transformedSize)
  3529. {
  3530. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3531. addRow(shrunk);
  3532. added++;
  3533. }
  3534. else
  3535. {
  3536. ++skips;
  3537. }
  3538. }
  3539. catch(IException * e)
  3540. {
  3541. throw makeWrappedException(e);
  3542. }
  3543. break;
  3544. }
  3545. default:
  3546. throwUnexpected();
  3547. }
  3548. }
  3549. pool->releaseJoinGroup(jg); // releases link to gotten row
  3550. return added;
  3551. }
  3552. static bool useMonolithic(IDistributedFile & f)
  3553. {
  3554. return ((f.numParts() == 1) || (f.queryAttributes().hasProp("@local")));
  3555. }
  3556. virtual void start()
  3557. {
  3558. OwnedRoxieString lfn(helper.getIndexFileName());
  3559. Owned<ILocalOrDistributedFile> ldFile = resolveLFNIndex(agent, lfn, "KeyedJoin", 0 != (helper.getJoinFlags() & JFindexoptional), true, AccessMode::tbdRead, isCodeSigned);
  3560. dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  3561. if (dFile)
  3562. {
  3563. Owned<IDistributedFile> odFile;
  3564. odFile.setown(dFile);
  3565. LINK(odFile);
  3566. enterSingletonSuperfiles(odFile);
  3567. bool mono;
  3568. super = dFile->querySuperFile();
  3569. if(super)
  3570. {
  3571. if(super->numSubFiles()==0)
  3572. throw MakeStringException(0, "Superkey %s empty", super->queryLogicalName());
  3573. mono = useMonolithic(super->querySubFile(0));
  3574. }
  3575. else
  3576. {
  3577. mono = useMonolithic(*dFile);
  3578. }
  3579. if (mono)
  3580. lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
  3581. else
  3582. lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
  3583. agent.logFileAccess(dFile, "HThor", "READ", graph);
  3584. }
  3585. else
  3586. {
  3587. StringBuffer buff;
  3588. buff.append("Skipping OPT keyed join against nonexistent file ").append(lfn);
  3589. agent.addWuExceptionEx(buff.str(), WRN_SkipMissingOptFile, SeverityInformation, MSGAUD_user, "hthor");
  3590. }
  3591. CHThorThreadedActivityBase::start();
  3592. }
  3593. virtual void readyManager(IKeyManager * manager, const void * row)
  3594. {
  3595. helper.createSegmentMonitors(manager, row);
  3596. manager->finishSegmentMonitors();
  3597. manager->reset();
  3598. manager->resetCounts();
  3599. }
  3600. virtual void doneManager(IKeyManager * manager)
  3601. {
  3602. manager->releaseSegmentMonitors();
  3603. CriticalBlock b(statsCrit);
  3604. seeks += manager->querySeeks();
  3605. scans += manager->queryScans();
  3606. wildseeks += manager->queryWildSeeks();
  3607. }
  3608. virtual bool addMatch(MatchSet * ms, IKeyManager * manager)
  3609. {
  3610. CJoinGroup * jg = ms->queryJoinGroup();
  3611. unsigned candTotal = jg->noteCandidate();
  3612. if (candTotal > atMost || candTotal > abortLimit)
  3613. {
  3614. if ( agent.queryCodeContext()->queryDebugContext())
  3615. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  3616. return true;
  3617. }
  3618. KLBlobProviderAdapter adapter(manager);
  3619. byte const * rhs = manager->queryKeyBuffer();
  3620. if(indexReadMatch(jg->queryLeft(), rhs, &adapter))
  3621. {
  3622. if(needsDiskRead)
  3623. {
  3624. size_t fposOffset = manager->queryRowSize() - sizeof(offset_t);
  3625. offset_t fpos = rtlReadBigUInt8(rhs + fposOffset);
  3626. jg->notePending();
  3627. offset_t seq = ms->addRightPending();
  3628. parts->addRow(ms, fpos, seq);
  3629. }
  3630. else
  3631. {
  3632. if(exclude)
  3633. ms->incRightMatchCount();
  3634. else
  3635. {
  3636. RtlDynamicRowBuilder rowBuilder(queryRightRowAllocator());
  3637. size32_t size = helper.extractJoinFields(rowBuilder, rhs, &adapter);
  3638. void * ret = (void *)rowBuilder.finalizeRowClear(size);
  3639. ms->addRightMatch(ret);
  3640. }
  3641. }
  3642. }
  3643. else
  3644. {
  3645. ++postfiltered;
  3646. }
  3647. return false;
  3648. }
  3649. bool indexReadMatch(const void * indexRow, const void * inputRow, IBlobProvider * blobs)
  3650. {
  3651. CriticalBlock proc(imatchCrit);
  3652. return helper.indexReadMatch(indexRow, inputRow, blobs);
  3653. }
  3654. IEngineRowAllocator * queryRightRowAllocator()
  3655. {
  3656. if (!defaultRightAllocator)
  3657. defaultRightAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryJoinFieldsRecordSize(), activityId));
  3658. return defaultRightAllocator;
  3659. }
  3660. virtual void onLimitExceeded()
  3661. {
  3662. helper.onLimitExceeded();
  3663. }
  3664. virtual void updateProgress(IStatisticGatherer &progress) const
  3665. {
  3666. CHThorThreadedActivityBase::updateProgress(progress);
  3667. StatsActivityScope scope(progress, activityId);
  3668. progress.addStatistic(StNumPreFiltered, prefiltered);
  3669. progress.addStatistic(StNumPostFiltered, postfiltered);
  3670. progress.addStatistic(StNumIndexSkips, skips);
  3671. progress.addStatistic(StNumIndexSeeks, seeks);
  3672. progress.addStatistic(StNumIndexScans, scans);
  3673. progress.addStatistic(StNumIndexWildSeeks, wildseeks);
  3674. }
  3675. protected:
  3676. RecordTranslationMode getLayoutTranslationMode()
  3677. {
  3678. if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
  3679. return recordTranslationModeHint;
  3680. return agent.getLayoutTranslationMode();
  3681. }
  3682. virtual const IDynamicTransform * getLayoutTranslator(IDistributedFile * f) override
  3683. {
  3684. if(getLayoutTranslationMode() == RecordTranslationMode::AlwaysECL)
  3685. {
  3686. verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, false); // Traces if mismatch
  3687. return NULL;
  3688. }
  3689. if(getLayoutTranslationMode() == RecordTranslationMode::None)
  3690. {
  3691. verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, true);
  3692. return NULL;
  3693. }
  3694. if(verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, false))
  3695. {
  3696. return NULL;
  3697. }
  3698. IPropertyTree &props = f->queryAttributes();
  3699. Owned<IOutputMetaData> actualFormat = getDaliLayoutInfo(props);
  3700. if (actualFormat)
  3701. {
  3702. actualLayouts.append(actualFormat.getLink()); // ensure adequate lifespan
  3703. Owned<const IDynamicTransform> payloadTranslator = createRecordTranslator(helper.queryProjectedIndexRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
  3704. DBGLOG("Record layout translator created for %s", f->queryLogicalName());
  3705. payloadTranslator->describe();
  3706. if (!payloadTranslator->canTranslate())
  3707. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
  3708. if (payloadTranslator->keyedTranslated())
  3709. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
  3710. if (getLayoutTranslationMode()==RecordTranslationMode::None)
  3711. throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
  3712. VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
  3713. agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
  3714. return payloadTranslator.getClear();
  3715. }
  3716. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - key layout information not found", f->queryLogicalName());
  3717. }
  3718. virtual void verifyIndex(IDistributedFile * f, IKeyIndex * idx, const IDynamicTransform * trans)
  3719. {
  3720. if (eclKeySize.isFixedSize())
  3721. {
  3722. if(trans)
  3723. {
  3724. if (!trans->canTranslate())
  3725. throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
  3726. }
  3727. else
  3728. {
  3729. unsigned fileposSize = idx->hasSpecialFileposition() && !hasTrailingFileposition(eclKeySize.queryTypeInfo()) ? sizeof(offset_t) : 0;
  3730. if(idx->keySize() != eclKeySize.getFixedSize() + fileposSize)
  3731. throw MakeStringException(1002, "Key size mismatch on key %s: key file indicates record size should be %u, but ECL declaration was %u", f->queryLogicalName(), idx->keySize(), eclKeySize.getFixedSize() + fileposSize);
  3732. }
  3733. }
  3734. }
  3735. virtual void verifyFetchFormatCrc(IDistributedFile * f)
  3736. {
  3737. actualDiskMeta.set(helper.queryDiskRecordSize());
  3738. translator.clear();
  3739. if (getLayoutTranslationMode()==RecordTranslationMode::None)
  3740. {
  3741. ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
  3742. }
  3743. else
  3744. {
  3745. bool crcMatched = ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, false); // MORE - fetch requires all to match.
  3746. if (!crcMatched)
  3747. {
  3748. IPropertyTree &props = f->queryAttributes();
  3749. actualDiskMeta.setown(getDaliLayoutInfo(props));
  3750. if (actualDiskMeta)
  3751. {
  3752. translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
  3753. if (translator->canTranslate())
  3754. {
  3755. if (getLayoutTranslationMode()==RecordTranslationMode::None)
  3756. throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
  3757. VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
  3758. agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
  3759. }
  3760. else
  3761. throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());
  3762. }
  3763. else
  3764. throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s - key layout information not found", f->queryLogicalName());
  3765. }
  3766. }
  3767. }
  3768. virtual const RtlRecord &queryIndexRecord()
  3769. {
  3770. return eclKeySize.queryRecordAccessor(true);
  3771. }
  3772. virtual void fail(char const * msg)
  3773. {
  3774. throw MakeStringExceptionDirect(0, msg);
  3775. }
  3776. };
  3777. extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
  3778. {
  3779. return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
  3780. }