thfetchslave.cpp 28 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "limits.h"
  15. #include "slave.ipp"
  16. #include "thorport.hpp"
  17. #include "jio.hpp"
  18. #include "jlzw.hpp"
  19. #include "jsort.hpp"
  20. #include "jdebug.hpp"
  21. #include "jhtree.hpp"
  22. #include "rtlcommon.hpp"
  23. #include "thsortu.hpp"
  24. #include "thactivityutil.ipp"
  25. #include "thormisc.hpp"
  26. #include "thbufdef.hpp"
  27. #include "thexception.hpp"
  28. #include "thmfilemanager.hpp"
  29. #include "csvsplitter.hpp"
  30. #include "thorxmlread.hpp"
  31. #include "../activities/fetch/thfetchcommon.hpp"
  32. #include "../hashdistrib/thhashdistribslave.ipp"
  33. #include "thfetchslave.ipp"
  34. #define NUMSLAVEPORTS 2
  35. struct FPosTableEntryIFileIO : public FPosTableEntry
  36. {
  37. ~FPosTableEntryIFileIO()
  38. {
  39. ::Release(file);
  40. }
  41. IFileIO *file = nullptr;
  42. };
  43. class CFetchStream : public IRowStream, implements IStopInput, implements IFetchStream, public CSimpleInterface
  44. {
  45. Owned<IRowStream> keyIn;
  46. IFetchHandler *iFetchHandler;
  47. bool inputStopped;
  48. Linked<IExpander> eexp;
  49. FPosTableEntryIFileIO *fPosMultiPartTable;
  50. unsigned files, offsetCount;
  51. CriticalSection stopsect;
  52. CPartDescriptorArray parts;
  53. FPosTableEntry *offsetTable;
  54. static int partLookup(const void *_key, const void *e)
  55. {
  56. FPosTableEntryIFileIO &entry = *(FPosTableEntryIFileIO *)e;
  57. offset_t keyFpos = *(offset_t *)_key;
  58. if (keyFpos < entry.base)
  59. return -1;
  60. else if (keyFpos >= entry.top)
  61. return 1;
  62. else
  63. return 0;
  64. }
  65. protected:
  66. IHashDistributor *distributor;
  67. bool abortSoon;
  68. mptag_t tag;
  69. Owned<IRowStream> keyOutStream;
  70. CActivityBase &owner;
  71. Linked<IThorRowInterfaces> keyRowIf, fetchRowIf;
  72. StringAttr logicalFilename;
  73. class CFPosHandler : implements IHash, public CSimpleInterface
  74. {
  75. IFetchHandler &iFetchHandler;
  76. unsigned count;
  77. FPosTableEntry *offsetTable;
  78. static int slaveLookup(const void *_key, const void *e)
  79. {
  80. offset_t key = *(offset_t *)_key;
  81. FPosTableEntry &entry = *(FPosTableEntry *)e;
  82. if (key < entry.base)
  83. return -1;
  84. else if (key >= entry.top)
  85. return 1;
  86. else
  87. return 0;
  88. }
  89. public:
  90. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  91. CFPosHandler(IFetchHandler &_iFetchHandler, unsigned _count, FPosTableEntry *_offsetTable)
  92. : iFetchHandler(_iFetchHandler), count(_count), offsetTable(_offsetTable)
  93. {
  94. }
  95. virtual unsigned hash(const void *data)
  96. {
  97. if (1 == count)
  98. return offsetTable[0].index;
  99. offset_t fpos = iFetchHandler.extractFpos(data);
  100. if (isLocalFpos(fpos))
  101. return getLocalFposPart(fpos);
  102. const void *result = bsearch(&fpos, offsetTable, count, sizeof(FPosTableEntry), slaveLookup);
  103. if (!result)
  104. throw MakeThorException(TE_FetchOutOfRange, "FETCH: Offset not found in offset table; fpos=%" I64F "d", fpos);
  105. return ((FPosTableEntry *)result)->index;
  106. }
  107. } *fposHash;
  108. public:
  109. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  110. CFetchStream(CActivityBase &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp)
  111. : owner(_owner), keyRowIf(_keyRowIf), fetchRowIf(_fetchRowIf), abortSoon(_abortSoon), logicalFilename(_logicalFilename),
  112. iFetchHandler(_iFetchHandler), offsetCount(_offsetCount), tag(_tag), eexp(_eexp)
  113. {
  114. distributor = NULL;
  115. fposHash = NULL;
  116. inputStopped = false;
  117. fPosMultiPartTable = NULL;
  118. ForEachItemIn(f, _parts)
  119. parts.append(*LINK(&_parts.item(f)));
  120. assertex(offsetMapSz == sizeof(FPosTableEntry) * offsetCount);
  121. offsetTable = new FPosTableEntry[offsetCount];
  122. memcpy_iflen(offsetTable, offsetMap, offsetMapSz);
  123. if (!REJECTLOG(MCthorDetailedDebugInfo))
  124. {
  125. for (unsigned c=0; c<offsetCount; c++)
  126. {
  127. FPosTableEntry &e = offsetTable[c];
  128. ActPrintLog(&owner, thorDetailedLogLevel, "Table[%d] : base=%" I64F "d, top=%" I64F "d, slave=%d", c, e.base, e.top, e.index);
  129. }
  130. }
  131. files = parts.ordinality();
  132. if (files)
  133. {
  134. fPosMultiPartTable = new FPosTableEntryIFileIO[files];
  135. unsigned f;
  136. FPosTableEntryIFileIO *e;
  137. for (f=0, e=&fPosMultiPartTable[0]; f<files; f++, e++)
  138. {
  139. IPartDescriptor &part = parts.item(f);
  140. e->base = part.queryProperties().getPropInt64("@offset");
  141. e->top = e->base + part.queryProperties().getPropInt64("@size");
  142. e->index = f;
  143. e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part, nullptr, diskReadPartStatistics); // NB: freed by FPosTableEntryIFileIO dtor
  144. }
  145. }
  146. }
  147. ~CFetchStream()
  148. {
  149. if (fPosMultiPartTable)
  150. delete [] fPosMultiPartTable;
  151. ::Release(fposHash);
  152. ::Release(distributor);
  153. delete [] offsetTable;
  154. }
  155. // IFetchStream
  156. virtual void start(IRowStream *_keyIn) override
  157. {
  158. fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
  159. keyIn.set(_keyIn);
  160. distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, false, this, "FetchStream");
  161. keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL, NULL));
  162. }
  163. virtual IRowStream *queryOutput() override { return this; }
  164. virtual IFileIO *getPartIO(unsigned part) override { assertex(part<files); return LINK(fPosMultiPartTable[part].file); }
  165. virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) override { return getPartFilename(parts.item(part), 0, out, true); }
  166. virtual void abort() override
  167. {
  168. if (distributor)
  169. distributor->abort();
  170. }
  171. // IStopInput
  172. virtual void stopInput()
  173. {
  174. CriticalBlock block(stopsect); // can be called async by distribute
  175. if (!inputStopped)
  176. {
  177. inputStopped = true;
  178. keyIn->stop();
  179. }
  180. }
  181. virtual void stop()
  182. {
  183. if (keyOutStream)
  184. {
  185. keyOutStream->stop();
  186. keyOutStream.clear();
  187. }
  188. if (distributor)
  189. {
  190. distributor->disconnect(true);
  191. distributor->join();
  192. }
  193. stopInput();
  194. }
  195. const void *nextRow()
  196. {
  197. if (abortSoon)
  198. return NULL;
  199. for (;;)
  200. {
  201. OwnedConstThorRow keyRec = keyOutStream->nextRow(); // is this right?
  202. if (!keyRec)
  203. break;
  204. offset_t fpos = iFetchHandler->extractFpos(keyRec);
  205. switch (files)
  206. {
  207. case 0:
  208. assertex(false);
  209. case 1:
  210. {
  211. unsigned __int64 localFpos;
  212. if (isLocalFpos(fpos))
  213. localFpos = getLocalFposOffset(fpos);
  214. else
  215. localFpos = fpos-fPosMultiPartTable[0].base;
  216. RtlDynamicRowBuilder row(fetchRowIf->queryRowAllocator());
  217. size32_t sz = iFetchHandler->fetch(row, keyRec, 0, localFpos, fpos);
  218. if (sz)
  219. return row.finalizeRowClear(sz);
  220. break;
  221. }
  222. default:
  223. {
  224. // which of multiple parts this slave is dealing with.
  225. FPosTableEntryIFileIO *result = (FPosTableEntryIFileIO *)bsearch(&fpos, fPosMultiPartTable, files, sizeof(FPosTableEntryIFileIO), partLookup);
  226. unsigned __int64 localFpos;
  227. if (isLocalFpos(fpos))
  228. localFpos = getLocalFposOffset(fpos);
  229. else
  230. localFpos = fpos-result->base;
  231. RtlDynamicRowBuilder row(fetchRowIf->queryRowAllocator());
  232. size32_t sz = iFetchHandler->fetch(row, keyRec, result->index, localFpos, fpos);
  233. if (sz)
  234. return row.finalizeRowClear(sz);
  235. break;
  236. }
  237. }
  238. }
  239. return NULL;
  240. }
  241. virtual void getFileStats(CRuntimeStatisticCollection & stats) override
  242. {
  243. for (unsigned f=0; f<files; f++)
  244. {
  245. IFileIO *file = fPosMultiPartTable[f].file;
  246. mergeStats(stats, file);
  247. }
  248. }
  249. virtual void getSubFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & subFileStats) override
  250. {
  251. if (subFileStats.size()>0)
  252. {
  253. ISuperFileDescriptor *super = parts.item(0).queryOwner().querySuperFileDescriptor();
  254. dbgassertex(super);
  255. for (unsigned f=0; f<files; f++)
  256. {
  257. IPartDescriptor &part = parts.item(f);
  258. unsigned subfile, lnum;
  259. if(super->mapSubPart(part.queryPartIndex(), subfile, lnum))
  260. {
  261. IFileIO *file = fPosMultiPartTable[f].file;
  262. mergeStats(*subFileStats[subfile], file);
  263. }
  264. }
  265. }
  266. }
  267. };
  268. IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp)
  269. {
  270. return new CFetchStream(owner, keyRowIf, fetchRowIf, abortSoon, logicalFilename, parts, offsetCount, offsetMapSz, offsetMap, iFetchHandler, tag, eexp);
  271. }
  272. class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
  273. {
  274. typedef CSlaveActivity PARENT;
  275. IRowStream *fetchStreamOut = nullptr;
  276. rowcount_t limit = 0;
  277. unsigned offsetCount = 0;
  278. unsigned offsetMapSz = 0;
  279. MemoryBuffer offsetMapBytes;
  280. Owned<IExpander> eexp;
  281. Owned<IEngineRowAllocator> keyRowAllocator;
  282. std::vector<OwnedPtr<CRuntimeStatisticCollection>> subFileStats;
  283. protected:
  284. Owned<IThorRowInterfaces> fetchDiskRowIf;
  285. IFetchStream *fetchStream = nullptr;
  286. IHThorFetchBaseArg *fetchBaseHelper;
  287. unsigned files = 0;
  288. CPartDescriptorArray parts;
  289. IRowStream *keyIn = nullptr;
  290. bool indexRowExtractNeeded = false;
  291. mptag_t mptag = TAG_NULL;
  292. IPointerArrayOf<ISourceRowPrefetcher> prefetchers;
  293. IConstPointerArrayOf<ITranslator> translators;
  294. bool initialized = false;
  295. public:
  296. IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
  297. CFetchSlaveBase(CGraphElementBase *_container) : CSlaveActivity(_container, diskReadActivityStatistics)
  298. {
  299. fetchBaseHelper = (IHThorFetchBaseArg *)queryHelper();
  300. reInit = 0 != (fetchBaseHelper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
  301. appendOutputLinked(this);
  302. }
  303. ~CFetchSlaveBase()
  304. {
  305. ::Release(keyIn);
  306. ::Release(fetchStream);
  307. }
  308. virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
  309. {
  310. if (initialized)
  311. {
  312. parts.kill();
  313. offsetMapBytes.clear();
  314. prefetchers.kill();
  315. translators.kill();
  316. eexp.clear();
  317. }
  318. else
  319. initialized = true;
  320. unsigned numParts;
  321. data.read(numParts);
  322. offsetCount = 0;
  323. offsetMapSz = 0;
  324. if (numParts)
  325. {
  326. parts.ensureCapacity(numParts);
  327. deserializePartFileDescriptors(data, parts);
  328. }
  329. data.read(offsetCount);
  330. if (offsetCount)
  331. {
  332. data.read(offsetMapSz);
  333. offsetMapBytes.append(offsetMapSz, data.readDirect(offsetMapSz));
  334. }
  335. if (!container.queryLocalOrGrouped())
  336. mptag = container.queryJobChannel().deserializeMPTag(data);
  337. files = parts.ordinality();
  338. if (files)
  339. {
  340. unsigned expectedFormatCrc = fetchBaseHelper->getDiskFormatCrc();
  341. unsigned projectedFormatCrc = fetchBaseHelper->getProjectedFormatCrc();
  342. IOutputMetaData *projectedFormat = fetchBaseHelper->queryProjectedDiskRecordSize();
  343. RecordTranslationMode translationMode = getTranslationMode(*this);
  344. OwnedRoxieString fileName = fetchBaseHelper->getFileName();
  345. getLayoutTranslations(translators, fileName, parts, translationMode, expectedFormatCrc, fetchBaseHelper->queryDiskRecordSize(), projectedFormatCrc, projectedFormat);
  346. ForEachItemIn(p, parts)
  347. {
  348. const ITranslator *translator = translators.item(p);
  349. if (translator)
  350. {
  351. Owned<ISourceRowPrefetcher> prefetcher = translator->queryActualFormat().createDiskPrefetcher();
  352. prefetchers.append(prefetcher.getClear());
  353. }
  354. }
  355. ISuperFileDescriptor *super = parts.item(0).queryOwner().querySuperFileDescriptor();
  356. if (super)
  357. for (unsigned i=0; i<files; i++)
  358. subFileStats.push_back(new CRuntimeStatisticCollection(diskReadPartStatistics));
  359. }
  360. unsigned encryptedKeyLen;
  361. void *encryptedKey;
  362. fetchBaseHelper->getFileEncryptKey(encryptedKeyLen,encryptedKey);
  363. if (0 != encryptedKeyLen)
  364. {
  365. bool dfsEncrypted = files?parts.item(0).queryOwner().queryProperties().getPropBool("@encrypted"):false;
  366. if (dfsEncrypted) // otherwise ignore (warning issued by master)
  367. eexp.setown(createAESExpander256(encryptedKeyLen, encryptedKey));
  368. memset(encryptedKey, 0, encryptedKeyLen);
  369. free(encryptedKey);
  370. }
  371. fetchDiskRowIf.setown(createRowInterfaces(fetchBaseHelper->queryDiskRecordSize()));
  372. }
  373. virtual void initializeFileParts()
  374. {
  375. }
  376. // IThorDataLink impl.
  377. virtual void start() override
  378. {
  379. ActivityTimer s(slaveTimerStats, timeActivities);
  380. PARENT::start();
  381. if (!keyRowAllocator && fetchBaseHelper->extractAllJoinFields())
  382. {
  383. IOutputMetaData *keyRowMeta = QUERYINTERFACE(fetchBaseHelper->queryExtractedSize(), IOutputMetaData);
  384. assertex(keyRowMeta);
  385. keyRowAllocator.setown(getRowAllocator(keyRowMeta));
  386. }
  387. limit = (rowcount_t)fetchBaseHelper->getRowLimit(); // MORE - if no filtering going on could keyspan to get count
  388. // NB: indexRowExtractNeeded is a member variable, because referenced by callback IFetchHandler::extractFpos()
  389. indexRowExtractNeeded = fetchBaseHelper->transformNeedsRhs();
  390. class CKeyFieldExtractBase : implements IRowStream, public CSimpleInterface
  391. {
  392. protected:
  393. CFetchSlaveBase *activity;
  394. IRowStream &in;
  395. unsigned maxInSize;
  396. IHThorFetchBaseArg &fetchBaseHelper;
  397. public:
  398. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  399. CKeyFieldExtractBase(CFetchSlaveBase *_activity, IRowStream &_in, IHThorFetchBaseArg &_fetchBaseHelper) : activity(_activity), in(_in), fetchBaseHelper(_fetchBaseHelper)
  400. {
  401. }
  402. virtual ~CKeyFieldExtractBase() {}
  403. // virtual const void *nextRow() = 0; in IRowStream
  404. virtual void stop() { in.stop(); }
  405. };
  406. Owned<IThorRowInterfaces> keyInIf;
  407. if (indexRowExtractNeeded)
  408. {
  409. Linked<IOutputMetaData> keyInMeta;
  410. class CKeyFieldExtract : public CKeyFieldExtractBase
  411. {
  412. public:
  413. CKeyFieldExtract(CFetchSlaveBase *activity, IRowStream &in, IHThorFetchBaseArg &fetchBaseHelper)
  414. : CKeyFieldExtractBase(activity, in, fetchBaseHelper)
  415. {
  416. }
  417. virtual ~CKeyFieldExtract() {}
  418. const void *nextRow()
  419. {
  420. OwnedConstThorRow inRow = in.ungroupedNextRow();
  421. if (inRow)
  422. {
  423. RtlDynamicRowBuilder row(activity->keyRowAllocator);
  424. size32_t sz = fetchBaseHelper.extractJoinFields(row, inRow);
  425. return row.finalizeRowClear(sz);
  426. }
  427. return NULL;
  428. }
  429. };
  430. if (fetchBaseHelper->extractAllJoinFields())
  431. {
  432. keyIn = LINK(inputStream);
  433. keyInMeta.set(input->queryFromActivity()->queryRowMetaData());
  434. }
  435. else
  436. {
  437. keyIn = new CKeyFieldExtract(this, *inputStream, *fetchBaseHelper);
  438. keyInMeta.set(QUERYINTERFACE(fetchBaseHelper->queryExtractedSize(), IOutputMetaData));
  439. }
  440. keyInIf.setown(createRowInterfaces(keyInMeta));
  441. }
  442. else
  443. {
  444. class CKeyFPosExtract : public CKeyFieldExtractBase
  445. {
  446. Linked<IThorRowInterfaces> rowif;
  447. public:
  448. CKeyFPosExtract(IThorRowInterfaces *_rowif, CFetchSlaveBase *activity, IRowStream &in, IHThorFetchBaseArg &fetchBaseHelper)
  449. : CKeyFieldExtractBase(activity, in, fetchBaseHelper), rowif(_rowif)
  450. {
  451. }
  452. virtual ~CKeyFPosExtract() {}
  453. const void *nextRow()
  454. {
  455. OwnedConstThorRow inRow(in.ungroupedNextRow());
  456. if (inRow)
  457. {
  458. OwnedConstThorRow row;
  459. unsigned __int64 fpos = fetchBaseHelper.extractPosition(inRow.get());
  460. row.deserialize(rowif, sizeof(fpos), &fpos);
  461. return row.getClear();
  462. }
  463. return NULL;
  464. }
  465. };
  466. Owned<IOutputMetaData> fmeta = createFixedSizeMetaData(sizeof(offset_t)); // should be provided by Gavin?
  467. keyInIf.setown(createRowInterfaces(fmeta));
  468. keyIn = new CKeyFPosExtract(keyInIf, this, *inputStream, *fetchBaseHelper);
  469. }
  470. Owned<IThorRowInterfaces> rowIf = createRowInterfaces(queryRowMetaData());
  471. OwnedRoxieString fileName = fetchBaseHelper->getFileName();
  472. fetchStream = createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp);
  473. fetchStreamOut = fetchStream->queryOutput();
  474. fetchStream->start(keyIn);
  475. initializeFileParts();
  476. }
  477. virtual void stop() override
  478. {
  479. if (hasStarted())
  480. fetchStreamOut->stop();
  481. dataLinkStop();
  482. }
  483. virtual void abort()
  484. {
  485. if (fetchStream)
  486. fetchStream->abort();
  487. }
  488. CATCH_NEXTROW()
  489. {
  490. ActivityTimer t(slaveTimerStats, timeActivities);
  491. if (abortSoon)
  492. return NULL;
  493. OwnedConstThorRow row = fetchStreamOut->nextRow();
  494. if (row)
  495. {
  496. // JCSMORE - not used afaik, and not implemented correctly, i.e. not global, should use a global limit act in thor at least.
  497. if (getDataLinkCount() >= limit)
  498. onLimitExceeded();
  499. dataLinkIncrement();
  500. return row.getClear();
  501. }
  502. return NULL;
  503. }
  504. virtual bool isGrouped() const override { return false; }
  505. virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
  506. {
  507. initMetaInfo(info);
  508. info.canStall = true;
  509. info.unknownRowsOutput = true;
  510. }
  511. // IFetchHandler
  512. virtual offset_t extractFpos(const void *key)
  513. {
  514. if (indexRowExtractNeeded)
  515. return fetchBaseHelper->extractPosition(key);
  516. else
  517. {
  518. offset_t fpos;
  519. memcpy(&fpos, key, sizeof(fpos));
  520. return fpos;
  521. }
  522. }
  523. virtual void serializeStats(MemoryBuffer &mb) override
  524. {
  525. if (fetchStream)
  526. {
  527. fetchStream->getFileStats(stats);
  528. fetchStream->getSubFileStats(subFileStats);
  529. }
  530. PARENT::serializeStats(mb);
  531. for (auto &stats: subFileStats)
  532. stats->serialize(mb);
  533. }
  534. virtual void onLimitExceeded() = 0;
  535. };
  536. class CFetchSlaveActivity : public CFetchSlaveBase
  537. {
  538. public:
  539. CFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
  540. virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
  541. {
  542. Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
  543. Owned<ISerialStream> stream = createFileSerialStream(partIO, localFpos);
  544. RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
  545. const ITranslator *translator = translators.item(filePartIndex);
  546. size32_t fetchedLen;
  547. if (translator)
  548. {
  549. CThorContiguousRowBuffer prefetchBuffer;
  550. ISourceRowPrefetcher *prefetcher = prefetchers.item(filePartIndex);
  551. dbgassertex(prefetcher);
  552. prefetchBuffer.setStream(stream);
  553. prefetcher->readAhead(prefetchBuffer);
  554. const byte * row = prefetchBuffer.queryRow();
  555. LocalVirtualFieldCallback fieldCallback("<MORE>", fpos, localFpos);
  556. fetchedLen = translator->queryTranslator().translate(fetchedRowBuilder, fieldCallback, row);
  557. prefetchBuffer.finishedRow();
  558. }
  559. else
  560. {
  561. CThorStreamDeserializerSource ds(stream);
  562. fetchedLen = fetchDiskRowIf->queryRowDeserializer()->deserialize(fetchedRowBuilder, ds);
  563. }
  564. OwnedConstThorRow diskFetchRow = fetchedRowBuilder.finalizeRowClear(fetchedLen);
  565. return ((IHThorFetchArg *)fetchBaseHelper)->transform(rowBuilder, diskFetchRow, keyRow, fpos);
  566. }
  567. virtual void onLimitExceeded()
  568. {
  569. ((IHThorFetchArg *)fetchBaseHelper)->onLimitExceeded();
  570. }
  571. };
  572. class CCsvFetchSlaveActivity : public CFetchSlaveBase
  573. {
  574. CSVSplitter csvSplitter;
  575. public:
  576. CCsvFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
  577. virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
  578. {
  579. CFetchSlaveBase::init(data, slaveData);
  580. IHThorCsvFetchArg *helper = (IHThorCsvFetchArg *)fetchBaseHelper;
  581. ICsvParameters *csvInfo = helper->queryCsvParameters();
  582. assertex(!csvInfo->queryEBCDIC());
  583. Owned<IPropertyTree> lFProps = createPTree(data);
  584. const char * quotes = lFProps->hasProp("@csvQuote")?lFProps->queryProp("@csvQuote"):NULL;
  585. const char * separators = lFProps->hasProp("@csvSeparate")?lFProps->queryProp("@csvSeparate"):NULL;
  586. const char * terminators = lFProps->hasProp("@csvTerminate")?lFProps->queryProp("@csvTerminate"):NULL;
  587. const char * escapes = lFProps->hasProp("@csvEscape")?lFProps->queryProp("@csvEscape"):NULL;
  588. csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  589. }
  590. virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
  591. {
  592. Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
  593. Owned<ISerialStream> inputStream = createFileSerialStream(partIO, localFpos);
  594. if (inputStream->eos())
  595. return 0;
  596. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  597. csvSplitter.splitLine(inputStream, maxRowSize);
  598. return ((IHThorCsvFetchArg *)fetchBaseHelper)->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), keyRow, localFpos);
  599. }
  600. virtual void onLimitExceeded()
  601. {
  602. ((IHThorCsvFetchArg *)fetchBaseHelper)->onLimitExceeded();
  603. }
  604. };
  605. class CXmlFetchSlaveActivity : public CFetchSlaveBase
  606. {
  607. Owned<IXMLParse> *parsers;
  608. Owned<IColumnProvider> *lastMatches;
  609. Owned<IFileIOStream> *streams;
  610. Owned<IColumnProvider> *lastMatch;
  611. class CXMLSelect : implements IXMLSelect, public CSimpleInterface
  612. {
  613. CXmlFetchSlaveActivity &owner;
  614. public:
  615. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  616. CXMLSelect(CXmlFetchSlaveActivity &_owner) : owner(_owner) { }
  617. //IXMLSelect impl.
  618. void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
  619. {
  620. owner.lastMatch->set(&entry);
  621. }
  622. } *xmlSelect;
  623. public:
  624. CXmlFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container)
  625. {
  626. parsers = NULL;
  627. lastMatches = NULL;
  628. lastMatch = NULL;
  629. streams = NULL;
  630. xmlSelect = new CXMLSelect(*this);
  631. }
  632. ~CXmlFetchSlaveActivity()
  633. {
  634. delete [] lastMatches;
  635. delete [] parsers;
  636. delete [] streams;
  637. ::Release(xmlSelect);
  638. }
  639. virtual void initializeFileParts()
  640. {
  641. CFetchSlaveBase::initializeFileParts();
  642. unsigned f;
  643. streams = new Owned<IFileIOStream>[files];
  644. parsers = new Owned<IXMLParse>[files];
  645. lastMatches = new Owned<IColumnProvider>[files];
  646. for (f=0; f<files; f++)
  647. {
  648. Owned<IFileIO> partIO = fetchStream->getPartIO(f);
  649. streams[f].setown(createBufferedIOStream(partIO));
  650. // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
  651. // i.e. getXmlIteratorPath not used (or supplied) here.
  652. if (container.getKind()==TAKjsonfetch)
  653. parsers[f].setown(createJSONParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
  654. else
  655. parsers[f].setown(createXMLParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
  656. }
  657. }
  658. virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
  659. {
  660. streams[filePartIndex]->seek(localFpos, IFSbegin);
  661. IXMLParse *parser = parsers[filePartIndex].get();
  662. lastMatch = &lastMatches[filePartIndex];
  663. while (!lastMatch->get())
  664. {
  665. if (!parser->next())
  666. {
  667. StringBuffer tmpStr;
  668. throw MakeActivityException(this, 0, "%s", fetchStream->getPartName(filePartIndex, tmpStr).str());
  669. }
  670. }
  671. size32_t retSz = ((IHThorXmlFetchArg *)fetchBaseHelper)->transform(rowBuilder, lastMatch->get(), keyRow, fpos);
  672. lastMatch->clear();
  673. parser->reset();
  674. return retSz;
  675. }
  676. virtual void onLimitExceeded()
  677. {
  678. ((IHThorXmlFetchArg *)fetchBaseHelper)->onLimitExceeded();
  679. }
  680. friend class CXMLSelect;
  681. };
  682. CActivityBase *createFetchSlave(CGraphElementBase *container)
  683. {
  684. return new CFetchSlaveActivity(container);
  685. }
  686. CActivityBase *createCsvFetchSlave(CGraphElementBase *container)
  687. {
  688. return new CCsvFetchSlaveActivity(container);
  689. }
  690. CActivityBase *createXmlFetchSlave(CGraphElementBase *container)
  691. {
  692. return new CXmlFetchSlaveActivity(container);
  693. }