thgraphslave.hpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _THGRAPHSLAVE_HPP_
  14. #define _THGRAPHSLAVE_HPP_
  15. #ifdef GRAPHSLAVE_EXPORTS
  16. #define graphslave_decl DECL_EXPORT
  17. #else
  18. #define graphslave_decl DECL_IMPORT
  19. #endif
  20. #include "platform.h"
  21. #include "slave.hpp"
  22. #include "thormisc.hpp"
  23. #include "thorcommon.hpp"
  24. #include "thgraph.hpp"
  25. #include "jdebug.hpp"
  26. #include "traceslave.hpp"
  27. #include "thorstrand.hpp"
  28. interface IStartableEngineRowStream : extends IEngineRowStream
  29. {
  30. virtual void start() = 0;
  31. };
  32. class COutputTiming
  33. {
  34. public:
  35. ActivityTimeAccumulator slaveTimerStats;
  36. COutputTiming() { }
  37. void resetTiming() { slaveTimerStats.reset(); }
  38. ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; }
  39. unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; }
  40. unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; }
  41. unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; }
  42. };
  43. class CEdgeProgress
  44. {
  45. CActivityBase &owner;
  46. rowcount_t count = 0, icount = 0;
  47. unsigned outputId = 0;
  48. public:
  49. explicit CEdgeProgress(CActivityBase *_owner) : owner(*_owner) { }
  50. explicit CEdgeProgress(CActivityBase *_owner, unsigned _outputId) : owner(*_owner), outputId(_outputId) { }
  51. inline void dataLinkStart()
  52. {
  53. owner.ActPrintLog("ITDL starting for output %d", outputId);
  54. if (hasStarted())
  55. {
  56. if (!hasStopped())
  57. throw MakeActivityException(&owner, 0, "Starting without being stopped 1st");
  58. else
  59. throw MakeActivityException(&owner, 0, "Started and stopped states both set");
  60. }
  61. icount = 0;
  62. count = (count & THORDATALINK_COUNT_MASK) | THORDATALINK_STARTED;
  63. }
  64. inline void dataLinkStop()
  65. {
  66. if (hasStarted())
  67. count = (count & THORDATALINK_COUNT_MASK) | THORDATALINK_STOPPED;
  68. owner.ActPrintLog("ITDL output %d stopped, count was %" RCPF "d", outputId, getDataLinkCount());
  69. }
  70. inline void dataLinkIncrement() { dataLinkIncrement(1); }
  71. inline void dataLinkIncrement(rowcount_t v)
  72. {
  73. #ifdef _TESTING
  74. assertex(hasStarted());
  75. #ifdef OUTPUT_RECORDSIZE
  76. if (count==THORDATALINK_STARTED)
  77. {
  78. size32_t rsz = parent.queryRowMetaData(this)->getMinRecordSize();
  79. parent.ActPrintLog("Record size %s= %d", parent.queryRowMetaData(this)->isVariableSize()?"(min) ":"",rsz);
  80. }
  81. #endif
  82. #endif
  83. icount += v;
  84. count += v;
  85. }
  86. inline bool hasStarted() const { return (count & THORDATALINK_STARTED) ? true : false; }
  87. inline bool hasStopped() const { return (count & THORDATALINK_STOPPED) ? true : false; }
  88. inline void dataLinkSerialize(MemoryBuffer &mb) const { mb.append(count); }
  89. inline rowcount_t getDataLinkGlobalCount() { return (count & THORDATALINK_COUNT_MASK); }
  90. inline rowcount_t getDataLinkCount() const { return icount; }
  91. inline rowcount_t getCount() const { return count; }
  92. };
  93. class CThorInput : public CSimpleInterfaceOf<IInterface>
  94. {
  95. Linked<IEngineRowStream> stream;
  96. Linked<IStartableEngineRowStream> lookAhead;
  97. void _startLookAhead()
  98. {
  99. assertex(nullptr != lookAhead);
  100. lookAhead->start();
  101. lookAheadActive = true;
  102. }
  103. public:
  104. unsigned sourceIdx = 0;
  105. Linked<IThorDataLink> itdl;
  106. Linked<IThorDebug> tracingStream;
  107. Linked<IStrandJunction> junction;
  108. bool stopped = false;
  109. bool started = false;
  110. bool persistentLookAhead = false;
  111. bool lookAheadActive = false;
  112. explicit CThorInput() { }
  113. void set(IThorDataLink *_itdl, unsigned idx) { itdl.set(_itdl); sourceIdx = idx; }
  114. void reset()
  115. {
  116. started = stopped = false;
  117. resetJunction(junction);
  118. }
  119. bool isStopped() const { return stopped; }
  120. bool isStarted() const { return started; }
  121. bool isLookAheadActive() const { return lookAheadActive; }
  122. IEngineRowStream *queryStream() const
  123. {
  124. if (lookAhead && lookAheadActive)
  125. return lookAhead;
  126. else
  127. return stream;
  128. }
  129. void setStream(IEngineRowStream *_stream) { stream.setown(_stream); }
  130. bool hasLookAhead() const { return nullptr != lookAhead; }
  131. void setLookAhead(IStartableEngineRowStream *_lookAhead, bool persistent)
  132. {
  133. dbgassertex(!persistentLookAhead); // If persistent, must only be called once
  134. /* NB: if persistent, must be installed before starting input, e.g. during setInputStream wiring.
  135. * if not persistent, must be installed after input started, e.g. in start() after startInput(x).
  136. */
  137. dbgassertex((persistent && !isStarted()) || (!persistent && isStarted()));
  138. lookAhead.setown(_lookAhead); // if pre-existing lookAhead, this will replace.
  139. persistentLookAhead = persistent;
  140. }
  141. void startLookAhead()
  142. {
  143. dbgassertex(!persistentLookAhead);
  144. dbgassertex(isStarted());
  145. _startLookAhead();
  146. }
  147. void start()
  148. {
  149. itdl->start();
  150. startJunction(junction);
  151. if (persistentLookAhead)
  152. _startLookAhead();
  153. stopped = false;
  154. started = true;
  155. }
  156. void stop()
  157. {
  158. // NB: lookAhead can be installed but not used
  159. if (lookAheadActive)
  160. {
  161. lookAhead->stop();
  162. lookAheadActive = false;
  163. }
  164. else if (stream)
  165. stream->stop();
  166. stopped = true;
  167. }
  168. bool isFastThrough() const;
  169. };
  170. typedef IArrayOf<CThorInput> CThorInputArray;
  171. class CSlaveGraphElement;
  172. class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgress, public COutputTiming, implements IThorDataLink, implements IEngineRowStream, implements IThorSlaveActivity
  173. {
  174. mutable MemoryBuffer *data;
  175. mutable CriticalSection crit;
  176. protected:
  177. CThorInputArray inputs;
  178. IPointerArrayOf<IThorDataLink> outputs;
  179. IPointerArrayOf<IEngineRowStream> outputStreams;
  180. IThorDataLink *input = nullptr;
  181. bool inputStopped = false;
  182. unsigned inputSourceIdx = 0;
  183. IEngineRowStream *inputStream = nullptr;
  184. MemoryBuffer startCtx;
  185. bool optStableInput = true; // is the input forced to ordered?
  186. bool optUnstableInput = false; // is the input forced to unordered?
  187. bool optUnordered = false; // is the output specified as unordered?
  188. protected:
  189. unsigned __int64 queryLocalCycles() const;
  190. bool ensureStartFTLookAhead(unsigned index);
  191. bool isInputFastThrough(unsigned index) const;
  192. bool hasLookAhead(unsigned index) const;
  193. void setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead, bool persistent);
  194. void startLookAhead(unsigned index);
  195. bool isLookAheadActive(unsigned index) const;
  196. public:
  197. IMPLEMENT_IINTERFACE_USING(CActivityBase)
  198. CSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping = basicActivityStatistics);
  199. ~CSlaveActivity();
  200. void setRequireInitData(bool tf)
  201. {
  202. // If not required, sets sentActInitdata to true, to prevent it being requested at graph initialization time.
  203. container.sentActInitData->set(0, !tf);
  204. }
  205. virtual void clearConnections();
  206. virtual void releaseIOs();
  207. virtual MemoryBuffer &queryInitializationData(unsigned slave) const;
  208. virtual MemoryBuffer &getInitializationData(unsigned slave, MemoryBuffer &mb) const;
  209. virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) override;
  210. virtual void connectInputStreams(bool consumerOrdered);
  211. IThorDataLink *queryOutput(unsigned index) const;
  212. IThorDataLink *queryInput(unsigned index) const;
  213. IEngineRowStream *queryInputStream(unsigned index) const;
  214. IStrandJunction *queryInputJunction(unsigned index) const;
  215. IEngineRowStream *queryOutputStream(unsigned index) const;
  216. inline bool queryInputStarted(unsigned input) const { return inputs.item(input).isStarted(); }
  217. inline bool queryInputStopped(unsigned input) const { return inputs.item(input).isStopped(); }
  218. unsigned queryInputOutputIndex(unsigned inputIndex) const { return inputs.item(inputIndex).sourceIdx; }
  219. unsigned queryNumInputs() const { return inputs.ordinality(); }
  220. void appendOutput(IThorDataLink *itdl);
  221. void appendOutputLinked(IThorDataLink *itdl);
  222. void startInput(unsigned index, const char *extra=NULL);
  223. void startAllInputs();
  224. void stopInput(unsigned index, const char *extra=NULL);
  225. void stopAllInputs();
  226. virtual void serializeStats(MemoryBuffer &mb);
  227. void debugRequest(unsigned edgeIdx, MemoryBuffer &msg);
  228. bool canStall() const;
  229. bool isFastThrough() const;
  230. // IThorDataLink
  231. virtual CSlaveActivity *queryFromActivity() override { return this; }
  232. virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
  233. virtual void setOutputStream(unsigned index, IEngineRowStream *stream) override;
  234. virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { }
  235. virtual bool isGrouped() const override;
  236. virtual IOutputMetaData * queryOutputMeta() const;
  237. virtual void dataLinkSerialize(MemoryBuffer &mb) const override;
  238. virtual rowcount_t getProgressCount() const override;
  239. virtual bool isInputOrdered(bool consumerOrdered) const override
  240. {
  241. if (optStableInput)
  242. return true;
  243. if (optUnstableInput)
  244. return false;
  245. if (optUnordered)
  246. return false;
  247. return consumerOrdered;
  248. }
  249. virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); }
  250. virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();}
  251. virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); }
  252. virtual void debugRequest(MemoryBuffer &msg) override;
  253. // IThorDataLink
  254. virtual void start() override;
  255. // IEngineRowStream
  256. virtual const void *nextRow() override { throwUnexpected(); }
  257. virtual void stop() override;
  258. virtual void resetEOF() override { throwUnexpected(); }
  259. // IThorSlaveActivity
  260. virtual void init(MemoryBuffer &in, MemoryBuffer &out) { }
  261. virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) override;
  262. virtual void processDone(MemoryBuffer &mb) override { };
  263. virtual void reset() override;
  264. };
  265. class graphslave_decl CSlaveLateStartActivity : public CSlaveActivity
  266. {
  267. bool prefiltered = false;
  268. Owned<CThorInput> nullInput;
  269. protected:
  270. void lateStart(bool any);
  271. public:
  272. CSlaveLateStartActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping) : CSlaveActivity(container, statsMapping)
  273. {
  274. }
  275. virtual void start() override;
  276. virtual void stop() override;
  277. virtual void reset() override;
  278. };
  279. graphslave_decl IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered);
  280. graphslave_decl IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, bool consumerOrdered);
  281. #define STRAND_CATCH_NEXTROWX_CATCH \
  282. catch (IException *_e) \
  283. { \
  284. parent->processAndThrowOwnedException(_e); \
  285. }
  286. #define STRAND_CATCH_NEXTROW() \
  287. virtual const void *nextRow() override \
  288. { \
  289. try \
  290. { \
  291. return nextRowNoCatch(); \
  292. } \
  293. CATCH_NEXTROWX_CATCH \
  294. } \
  295. inline const void *nextRowNoCatch() __attribute__((always_inline))
  296. class CThorStrandedActivity;
  297. class graphslave_decl CThorStrandProcessor : public CInterfaceOf<IEngineRowStream>, public COutputTiming
  298. {
  299. protected:
  300. CThorStrandedActivity &parent;
  301. IEngineRowStream *inputStream;
  302. rowcount_t numProcessedLastGroup = 0;
  303. const bool timeActivities;
  304. bool stopped = false;
  305. unsigned outputId; // if activity had >1 , this identifies (for tracing purposes) which output this strand belongs to.
  306. Linked<IHThorArg> baseHelper;
  307. rowcount_t rowsProcessed;
  308. protected:
  309. inline IHThorArg *queryHelper() const { return baseHelper; }
  310. public:
  311. explicit CThorStrandProcessor(CThorStrandedActivity &_parent, IEngineRowStream *_inputStream, unsigned _outputId);
  312. __declspec(noreturn) void processAndThrowOwnedException(IException *_e) __attribute__((noreturn));
  313. rowcount_t getCount() const { return rowsProcessed; }
  314. virtual void start()
  315. {
  316. rowsProcessed = 0;
  317. numProcessedLastGroup = 0;
  318. resetTiming();
  319. }
  320. virtual void reset()
  321. {
  322. rowsProcessed = 0;
  323. stopped = false;
  324. }
  325. // IRowStream
  326. virtual void stop() override;
  327. // IEngineRowStream
  328. virtual void resetEOF() override
  329. {
  330. inputStream->resetEOF();
  331. }
  332. };
  333. class graphslave_decl CThorStrandedActivity : public CSlaveActivity
  334. {
  335. protected:
  336. CThorStrandOptions strandOptions;
  337. IArrayOf<CThorStrandProcessor> strands;
  338. Owned<IStrandBranch> branch;
  339. Owned<IStrandJunction> splitter;
  340. Owned<IStrandJunction> sourceJunction; // A junction applied to the output of a source activity
  341. std::atomic<unsigned> active;
  342. protected:
  343. void onStartStrands();
  344. public:
  345. CThorStrandedActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping = basicActivityStatistics)
  346. : CSlaveActivity(container, statsMapping), strandOptions(*container), active(0)
  347. {
  348. }
  349. void strandedStop();
  350. virtual void start() override;
  351. virtual void reset() override;
  352. virtual CThorStrandProcessor *createStrandProcessor(IEngineRowStream *instream) = 0;
  353. //MORE: Possibly this class should be split into two for sinks and non sinks...
  354. virtual CThorStrandProcessor *createStrandSourceProcessor(bool inputOrdered) = 0;
  355. inline unsigned numStrands() const { return strands.ordinality(); }
  356. // IThorDataLink
  357. virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
  358. virtual unsigned __int64 queryTotalCycles() const override;
  359. virtual void dataLinkSerialize(MemoryBuffer &mb) const override;
  360. virtual rowcount_t getProgressCount() const override;
  361. };
  362. class graphslave_decl CSlaveGraphElement : public CGraphElementBase
  363. {
  364. public:
  365. CSlaveGraphElement(CGraphBase &owner, IPropertyTree &xgmml, CGraphBase *resultsGraph) : CGraphElementBase(owner, xgmml, resultsGraph)
  366. {
  367. }
  368. };
  369. class CJobSlave;
  370. class graphslave_decl CSlaveGraph : public CGraphBase
  371. {
  372. CJobSlave *jobS;
  373. Semaphore getDoneSem;
  374. CriticalSection progressCrit;
  375. bool doneInit = false;
  376. std::atomic_bool progressActive;
  377. public:
  378. CSlaveGraph(CJobChannel &jobChannel);
  379. ~CSlaveGraph() { }
  380. void connect();
  381. void init(MemoryBuffer &mb);
  382. bool recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract);
  383. void setExecuteReplyTag(mptag_t _executeReplyTag) { executeReplyTag = _executeReplyTag; }
  384. void initWithActData(MemoryBuffer &in, MemoryBuffer &out);
  385. void getDone(MemoryBuffer &doneInfoMb);
  386. void serializeDone(MemoryBuffer &mb);
  387. IThorResult *getGlobalResult(CActivityBase &activity, IThorRowInterfaces *rowIf, activity_id ownerId, unsigned id);
  388. virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract) override;
  389. virtual bool serializeStats(MemoryBuffer &mb);
  390. virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract) override;
  391. virtual void start() override;
  392. virtual void abort(IException *e) override;
  393. virtual void reset() override;
  394. virtual void done() override;
  395. virtual cost_type getDiskAccessCost() override { UNIMPLEMENTED; }
  396. virtual IThorGraphResults *createThorGraphResults(unsigned num);
  397. // IExceptionHandler
  398. virtual bool fireException(IException *e)
  399. {
  400. IThorException *te = QUERYINTERFACE(e, IThorException);
  401. StringBuffer s;
  402. if (!te || !te->queryGraphId())
  403. {
  404. Owned<IThorException> e2 = MakeGraphException(this, e);
  405. e2->setAudience(e->errorAudience());
  406. return CGraphBase::fireException(e2);
  407. }
  408. else
  409. return CGraphBase::fireException(e);
  410. }
  411. };
  412. interface ISlaveWatchdog;
  413. class graphslave_decl CJobSlave : public CJobBase
  414. {
  415. typedef CJobBase PARENT;
  416. ISlaveWatchdog *watchdog;
  417. Owned<IPropertyTree> workUnitInfo;
  418. size32_t oldNodeCacheMem;
  419. unsigned channelMemoryMB;
  420. unsigned actInitWaitTimeMins = DEFAULT_MAX_ACTINITWAITTIME_MINS;
  421. public:
  422. CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _slavemptag);
  423. virtual CJobChannel *addChannel(IMPServer *mpServer) override;
  424. virtual void startJob() override;
  425. virtual void endJob() override;
  426. const char *queryFindString() const { return key.get(); } // for string HT
  427. unsigned queryActInitWaitTimeMins() const { return actInitWaitTimeMins; }
  428. virtual IGraphTempHandler *createTempHandler(bool errorOnMissing);
  429. ISlaveWatchdog *queryProgressHandler() { return watchdog; }
  430. void reportGraphEnd(graph_id gid);
  431. virtual mptag_t deserializeMPTag(MemoryBuffer &mb);
  432. virtual __int64 getWorkUnitValueInt(const char *prop, __int64 defVal) const;
  433. virtual StringBuffer &getWorkUnitValue(const char *prop, StringBuffer &str) const;
  434. virtual bool getWorkUnitValueBool(const char *prop, bool defVal) const;
  435. virtual IThorAllocator *getThorAllocator(unsigned channel);
  436. virtual void debugRequest(MemoryBuffer &msg, const char *request) const;
  437. // IExceptionHandler
  438. virtual bool fireException(IException *e)
  439. {
  440. return queryJobChannel(0).fireException(e);
  441. }
  442. // IThreadFactory
  443. IPooledThread *createNew();
  444. };
  445. class graphslave_decl CJobSlaveChannel : public CJobChannel
  446. {
  447. CriticalSection graphRunCrit;
  448. public:
  449. CJobSlaveChannel(CJobBase &job, IMPServer *mpServer, unsigned channel);
  450. virtual IBarrier *createBarrier(mptag_t tag);
  451. virtual CGraphBase *createGraph()
  452. {
  453. return new CSlaveGraph(*this);
  454. }
  455. // IGraphCallback
  456. virtual void runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract);
  457. // IExceptionHandler
  458. virtual bool fireException(IException *e)
  459. {
  460. CMessageBuffer msg;
  461. msg.append((int)smt_errorMsg);
  462. msg.append(queryMyRank()-1);
  463. IThorException *te = QUERYINTERFACE(e, IThorException);
  464. bool userOrigin = false;
  465. if (te)
  466. {
  467. te->setJobId(queryJob().queryKey());
  468. te->setSlave(queryMyRank());
  469. if (!te->queryOrigin())
  470. {
  471. VStringBuffer msg("SLAVE #%d", queryMyRank());
  472. te->setOrigin(msg);
  473. }
  474. else if (0 == stricmp("user", te->queryOrigin()))
  475. userOrigin = true;
  476. }
  477. serializeThorException(e, msg);
  478. if (userOrigin)
  479. {
  480. // wait for reply
  481. if (!queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  482. EXCLOG(e, "Failed to sendrecv to master");
  483. }
  484. else
  485. {
  486. if (!queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  487. EXCLOG(e, "Failed to send to master");
  488. }
  489. return true;
  490. }
  491. };
  492. interface IActivityReplicatedFile : extends IReplicatedFile
  493. {
  494. virtual IFile *open(CActivityBase &activity) = 0;
  495. };
  496. interface IPartDescriptor;
  497. extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path);
  498. extern graphslave_decl IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc);
  499. extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
  500. extern graphslave_decl bool canStall(IThorDataLink *input);
  501. extern graphslave_decl IDelayedFile *createDelayedFile(IFileIO *iFileIO);
  502. #endif