thormisc.hpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _THORMISC_
  14. #define _THORMISC_
  15. #include "jiface.hpp"
  16. #include "jthread.hpp"
  17. #include "jexcept.hpp"
  18. #include "jarray.hpp"
  19. #include "jfile.hpp"
  20. #include "jprop.hpp"
  21. #include "jutil.hpp"
  22. #include "jlog.hpp"
  23. #include "mpcomm.hpp"
  24. #include "workunit.hpp"
  25. #include "eclhelper.hpp"
  26. #include "thexception.hpp"
  27. #include "thorcommon.hpp"
  28. #include "thor.hpp"
  29. #ifdef _WIN32
  30. #ifdef GRAPH_EXPORTS
  31. #define graph_decl __declspec(dllexport)
  32. #else
  33. #define graph_decl __declspec(dllimport)
  34. #endif
  35. #else
  36. #define graph_decl
  37. #endif
  38. /// Thor options, that can be hints, workunit options, or global settings
  39. #define THOROPT_COMPRESS_SPILLS "compressInternalSpills" // Compress internal spills, e.g. spills created by lookahead or sort gathering (default = true)
  40. #define THOROPT_HDIST_SPILL "hdistSpill" // Allow distribute receiver to spill to disk, rather than blocking (default = true)
  41. #define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize" // Distribute send thread pool size (default = 16)
  42. #define THOROPT_HDIST_BUCKET_SIZE "hd_out_buffer_size" // Distribute target bucket send size (default = 1MB)
  43. #define THOROPT_HDIST_BUFFER_SIZE "hd_in_buffer_size" // Distribute send buffer size (for all targets) (default = 32MB)
  44. #define THOROPT_HDIST_CANDIDATELIMIT "hdCandidateLimit" // Limits # of buckets to push to the writers when send buffer is full (default = is 50% largest)
  45. #define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit" // Limit # of writer threads working on a single target (default = unbound, but picks round-robin)
  46. #define THOROPT_SPLITTER_SPILL "splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1)
  47. #define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000)
  48. #define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0)
  49. #define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true)
  50. #define THOROPT_SORT_MAX_DEVIANCE "sort_max_deviance" // Max (byte) variance allowed during sort partitioning (default = 10Mb)
  51. #define THOROPT_OUTPUT_FLUSH_THRESHOLD "output_flush_threshold" // When above limit, workunit result is flushed (committed to Dali) (default = -1 [off])
  52. #define THOROPT_OUTPUTLIMIT "outputLimit" // OUTPUT Mb limit (default = 10)
  53. #define THOROPT_PARALLEL_MATCH "parallel_match" // Use multi-threaded join helper (retains sort order without unsorted_output) (default = false)
  54. #define THOROPT_UNSORTED_OUTPUT "unsorted_output" // Allow Join results to be reodered, implies parallel match (default = false)
  55. #define THOROPT_JOINHELPER_THREADS "joinHelperThreads" // Number of threads to use in threaded variety of join helper
  56. #define THOROPT_LKJOIN_LOCALFAILOVER "lkjoin_localfailover" // Force SMART to failover to distributed local lookup join (for testing only) (default = false)
  57. #define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only) (default = false)
  58. #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning
  59. #define THOR_SEM_RETRY_TIMEOUT 2
  60. #define THOR_TRACE_LEVEL 5
  61. enum ThorExceptionAction { tea_null, tea_warning, tea_abort, tea_shutdown };
  62. enum RegistryCode { rc_register, rc_deregister };
  63. #define createThorRow(size) malloc(size)
  64. #define destroyThorRow(ptr) free(ptr)
  65. #define reallocThorRow(ptr, size) realloc(ptr, size)
  66. class BooleanOnOff
  67. {
  68. bool &tf;
  69. public:
  70. inline BooleanOnOff(bool &_tf) : tf(_tf) { tf = true; }
  71. inline ~BooleanOnOff() { tf = false; }
  72. };
  73. class CReplyCancelHandler
  74. {
  75. ICommunicator *comm;
  76. mptag_t mpTag;
  77. bool cancelled;
  78. SpinLock lock;
  79. void clear()
  80. {
  81. mpTag = TAG_NULL;
  82. comm = NULL;
  83. }
  84. void clearLock()
  85. {
  86. SpinBlock b(lock);
  87. clear();
  88. }
  89. public:
  90. CReplyCancelHandler()
  91. {
  92. reset();
  93. }
  94. void reset()
  95. {
  96. clear();
  97. cancelled = false;
  98. }
  99. void cancel(rank_t rank)
  100. {
  101. ICommunicator *_comm = NULL;
  102. mptag_t _mpTag = TAG_NULL;
  103. {
  104. SpinBlock b(lock);
  105. if (cancelled)
  106. return;
  107. cancelled = true;
  108. if (TAG_NULL == mpTag)
  109. return;
  110. // stash in case other thread waiting finishing send.
  111. _comm = comm;
  112. _mpTag = mpTag;
  113. }
  114. _comm->cancel(rank, _mpTag);
  115. }
  116. bool recv(ICommunicator &_comm, CMessageBuffer &mb, rank_t rank, const mptag_t &_mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
  117. {
  118. bool ret=false;
  119. {
  120. SpinBlock b(lock);
  121. if (cancelled)
  122. return false;
  123. comm = &_comm;
  124. mpTag = _mpTag; // receiving
  125. }
  126. try
  127. {
  128. ret = _comm.recv(mb, rank, _mpTag, sender, timeout);
  129. }
  130. catch (IException *)
  131. {
  132. clearLock();
  133. throw;
  134. }
  135. clearLock();
  136. return ret;
  137. }
  138. };
  139. class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded
  140. {
  141. bool running;
  142. Semaphore todo;
  143. CriticalSection crit;
  144. unsigned timeout;
  145. StringAttr description;
  146. CThreaded threaded;
  147. protected:
  148. Owned<IException> exception;
  149. public:
  150. CTimeoutTrigger(unsigned _timeout, const char *_description) : timeout(_timeout), description(_description), threaded("TimeoutTrigger")
  151. {
  152. running = (timeout!=0);
  153. threaded.init(this);
  154. }
  155. virtual ~CTimeoutTrigger()
  156. {
  157. stop();
  158. threaded.join();
  159. }
  160. void main()
  161. {
  162. while (running)
  163. {
  164. todo.wait(1000);
  165. CriticalBlock block(crit);
  166. if (exception.get())
  167. {
  168. { CriticalUnblock b(crit);
  169. if (todo.wait(timeout*1000))
  170. { // if signalled during timeout period, wait full timeout
  171. if (running)
  172. todo.wait(timeout*1000);
  173. }
  174. }
  175. if (!running) break;
  176. if (exception.get())
  177. if (action())
  178. break;
  179. }
  180. }
  181. }
  182. void stop() { running = false; todo.signal(); }
  183. void inform(IException *e)
  184. {
  185. LOG(MCdebugProgress, unknownJob, "INFORM [%s]", description.get());
  186. CriticalBlock block(crit);
  187. if (exception.get())
  188. e->Release();
  189. else
  190. {
  191. exception.setown(e);
  192. todo.signal();
  193. }
  194. }
  195. IException *clear()
  196. {
  197. CriticalBlock block(crit);
  198. IException *e = exception.getClear();
  199. if (e)
  200. LOG(MCdebugProgress, unknownJob, "CLEARING TIMEOUT [%s]", description.get());
  201. todo.signal();
  202. return e;
  203. }
  204. virtual bool action() = 0;
  205. };
  206. // simple class which takes ownership of the underlying file and deletes it on destruction
  207. class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
  208. {
  209. OwnedIFile iFile;
  210. public:
  211. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  212. CFileOwner(IFile *_iFile) : iFile(_iFile)
  213. {
  214. }
  215. ~CFileOwner()
  216. {
  217. iFile->remove();
  218. }
  219. IFile &queryIFile() const { return *iFile; }
  220. };
  221. // stream wrapper, that takes ownership of a CFileOwner
  222. class graph_decl CStreamFileOwner : public CSimpleInterface, implements IExtRowStream
  223. {
  224. Linked<CFileOwner> fileOwner;
  225. IExtRowStream *stream;
  226. public:
  227. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  228. CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner)
  229. {
  230. stream = LINK(_stream);
  231. }
  232. ~CStreamFileOwner()
  233. {
  234. stream->Release();
  235. }
  236. // IExtRowStream
  237. virtual const void *nextRow() { return stream->nextRow(); }
  238. virtual void stop() { stream->stop(); }
  239. virtual offset_t getOffset() { return stream->getOffset(); }
  240. virtual void stop(CRC32 *crcout=NULL) { stream->stop(); }
  241. virtual const void *prefetchRow(size32_t *sz=NULL) { return stream->prefetchRow(sz); }
  242. virtual void prefetchDone() { stream->prefetchDone(); }
  243. virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows)
  244. {
  245. stream->reinit(offset, len, maxRows);
  246. }
  247. };
  248. #define DEFAULT_QUERYSO_LIMIT 10
  249. class graph_decl CFifoFileCache : public CSimpleInterface
  250. {
  251. unsigned limit;
  252. StringArray files;
  253. void deleteFile(IFile &ifile);
  254. public:
  255. void init(const char *cacheDir, unsigned _limit, const char *pattern);
  256. void add(const char *filename);
  257. bool isAvailable(const char *filename);
  258. };
  259. interface IBarrierException : extends IException {};
  260. extern graph_decl IBarrierException *createBarrierAbortException();
  261. interface IThorException : extends IException
  262. {
  263. virtual ThorExceptionAction queryAction() = 0;
  264. virtual ThorActivityKind queryActivityKind() = 0;
  265. virtual activity_id queryActivityId() = 0;
  266. virtual graph_id queryGraphId() = 0;
  267. virtual const char *queryJobId() = 0;
  268. virtual void getAssert(StringAttr &file, unsigned &line, unsigned &column) = 0;
  269. virtual const char *queryOrigin() = 0;
  270. virtual WUExceptionSeverity querySeverity() = 0;
  271. virtual const char *queryMessage() = 0;
  272. virtual bool queryNotified() const = 0;
  273. virtual MemoryBuffer &queryData() = 0;
  274. virtual void setNotified() = 0;
  275. virtual void setAction(ThorExceptionAction _action) = 0;
  276. virtual void setActivityKind(ThorActivityKind _kind) = 0;
  277. virtual void setActivityId(activity_id id) = 0;
  278. virtual void setGraphId(graph_id id) = 0;
  279. virtual void setJobId(const char *jobId) = 0;
  280. virtual void setAudience(MessageAudience audience) = 0;
  281. virtual void setSlave(unsigned slave) = 0;
  282. virtual void setMessage(const char *msg) = 0;
  283. virtual void setAssert(const char *file, unsigned line, unsigned column) = 0;
  284. virtual void setOrigin(const char *origin) = 0;
  285. virtual void setSeverity(WUExceptionSeverity severity) = 0;
  286. };
  287. class CGraphElementBase;
  288. class CActivityBase;
  289. class CGraphBase;
  290. interface IRemoteConnection;
  291. enum ActLogEnum { thorlog_null=0,thorlog_ecl=1,thorlog_all=2 };
  292. extern graph_decl StringBuffer &ActPrintLogArgsPrep(StringBuffer &res, const CGraphElementBase *container, const ActLogEnum flags, const char *format, va_list args);
  293. extern graph_decl void ActPrintLogEx(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 4, 5)));
  294. extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args);
  295. extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args);
  296. extern graph_decl void ActPrintLog(const CActivityBase *activity, const char *format, ...) __attribute__((format(printf, 2, 3)));
  297. extern graph_decl void ActPrintLog(const CActivityBase *activity, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  298. extern graph_decl void ActPrintLog(const CActivityBase *activity, IException *e);
  299. inline void ActPrintLog(const CGraphElementBase *container, const char *format, ...) __attribute__((format(printf, 2, 3)));
  300. inline void ActPrintLog(const CGraphElementBase *container, const char *format, ...)
  301. {
  302. va_list args;
  303. va_start(args, format);
  304. ActPrintLogArgs(container, thorlog_ecl, MCdebugProgress, format, args);
  305. va_end(args);
  306. }
  307. inline void ActPrintLogEx(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 5, 6)));
  308. inline void ActPrintLogEx(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...)
  309. {
  310. va_list args;
  311. va_start(args, format);
  312. ActPrintLogArgs(container, e, flags, logCat, format, args);
  313. va_end(args);
  314. }
  315. inline void ActPrintLog(const CGraphElementBase *container, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  316. inline void ActPrintLog(const CGraphElementBase *container, IException *e, const char *format, ...)
  317. {
  318. va_list args;
  319. va_start(args, format);
  320. ActPrintLogArgs(container, e, thorlog_null, MCexception(e, MSGCLS_error), format, args);
  321. va_end(args);
  322. }
  323. inline void ActPrintLog(const CGraphElementBase *container, IException *e)
  324. {
  325. ActPrintLogEx(container, e, thorlog_null, MCexception(e, MSGCLS_error), "%s", "");
  326. }
  327. extern graph_decl void GraphPrintLogArgsPrep(StringBuffer &res, CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args);
  328. extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args);
  329. extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args);
  330. extern graph_decl void GraphPrintLog(CGraphBase *graph, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  331. inline void GraphPrintLogEx(CGraphBase *graph, ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 4, 5)));
  332. inline void GraphPrintLogEx(CGraphBase *graph, ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...)
  333. {
  334. va_list args;
  335. va_start(args, format);
  336. GraphPrintLogArgs(graph, flags, logCat, format, args);
  337. va_end(args);
  338. }
  339. inline void GraphPrintLogEx(CGraphBase *graph, IException *e, ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 5, 6)));
  340. inline void GraphPrintLogEx(CGraphBase *graph, IException *e, ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...)
  341. {
  342. va_list args;
  343. va_start(args, format);
  344. GraphPrintLogArgs(graph, e, flags, logCat, format, args);
  345. va_end(args);
  346. }
  347. inline void GraphPrintLog(CGraphBase *graph, const char *format, ...) __attribute__((format(printf, 2, 3)));
  348. inline void GraphPrintLog(CGraphBase *graph, const char *format, ...)
  349. {
  350. va_list args;
  351. va_start(args, format);
  352. GraphPrintLogArgs(graph, thorlog_null, MCdebugProgress, format, args);
  353. va_end(args);
  354. }
  355. extern graph_decl IThorException *MakeActivityException(CActivityBase *activity, int code, const char *_format, ...) __attribute__((format(printf, 3, 4)));
  356. extern graph_decl IThorException *MakeActivityException(CActivityBase *activity, IException *e, const char *xtra, ...) __attribute__((format(printf, 3, 4)));
  357. extern graph_decl IThorException *MakeActivityException(CActivityBase *activity, IException *e);
  358. extern graph_decl IThorException *MakeActivityWarning(CActivityBase *activity, int code, const char *_format, ...) __attribute__((format(printf, 3, 4)));
  359. extern graph_decl IThorException *MakeActivityWarning(CActivityBase *activity, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  360. extern graph_decl IThorException *MakeActivityException(CGraphElementBase *activity, int code, const char *_format, ...) __attribute__((format(printf, 3, 4)));
  361. extern graph_decl IThorException *MakeActivityException(CGraphElementBase *activity, IException *e, const char *xtra, ...) __attribute__((format(printf, 3, 4)));
  362. extern graph_decl IThorException *MakeActivityException(CGraphElementBase *activity, IException *e);
  363. extern graph_decl IThorException *MakeActivityWarning(CGraphElementBase *activity, int code, const char *_format, ...) __attribute__((format(printf, 3, 4)));
  364. extern graph_decl IThorException *MakeActivityWarning(CGraphElementBase *activity, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  365. extern graph_decl IThorException *MakeGraphException(CGraphBase *graph, int code, const char *format, ...);
  366. extern graph_decl IThorException *MakeThorException(int code, const char *format, ...) __attribute__((format(printf, 2, 3)));
  367. extern graph_decl IThorException *MakeThorException(IException *e);
  368. extern graph_decl IThorException *MakeThorAudienceException(LogMsgAudience audience, int code, const char *format, ...) __attribute__((format(printf, 3, 4)));
  369. extern graph_decl IThorException *MakeThorOperatorException(int code, const char *format, ...) __attribute__((format(printf, 2, 3)));
  370. extern graph_decl IThorException *MakeThorFatal(IException *e, int code, const char *format, ...) __attribute__((format(printf, 3, 4)));
  371. extern graph_decl IThorException *ThorWrapException(IException *e, const char *msg, ...) __attribute__((format(printf, 2, 3)));
  372. extern graph_decl void setExceptionActivityInfo(CGraphElementBase &container, IThorException *e);
  373. extern graph_decl void GetTempName(StringBuffer &name, const char *prefix=NULL,bool altdisk=false);
  374. extern graph_decl void SetTempDir(const char *name, const char *tempPrefix, bool clear);
  375. extern graph_decl void ClearDir(const char *dir);
  376. extern graph_decl void ClearTempDirs();
  377. extern graph_decl const char *queryTempDir(bool altdisk=false);
  378. extern graph_decl void loadCmdProp(IPropertyTree *tree, const char *cmdProp);
  379. extern graph_decl void ensureDirectoryForFile(const char *fName);
  380. extern graph_decl void reportExceptionToWorkunit(IConstWorkUnit &workunit,IException *e, WUExceptionSeverity severity=ExceptionSeverityWarning);
  381. extern graph_decl IPropertyTree *globals;
  382. extern graph_decl mptag_t masterSlaveMpTag;
  383. enum SlaveMsgTypes { smt_errorMsg=1, smt_initGraphReq, smt_initActDataReq, smt_dataReq, smt_getPhysicalName, smt_getFileOffset, smt_actMsg, smt_getresult };
  384. // Logging
  385. extern graph_decl const LogMsgJobInfo thorJob;
  386. extern graph_decl memsize_t queryLargeMemSize();
  387. extern graph_decl StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *queryName, unsigned version);
  388. extern graph_decl void setClusterGroup(IGroup *group);
  389. extern graph_decl bool clusterInitialized();
  390. extern graph_decl ICommunicator &queryClusterComm();
  391. extern graph_decl IGroup &queryClusterGroup();
  392. extern graph_decl IGroup &querySlaveGroup();
  393. extern graph_decl IGroup &queryDfsGroup();
  394. extern graph_decl unsigned queryClusterWidth();
  395. extern graph_decl unsigned queryClusterNode();
  396. extern graph_decl mptag_t allocateClusterMPTag(); // should probably move into so used by master only
  397. extern graph_decl void freeClusterMPTag(mptag_t tag); // ""
  398. extern graph_decl IThorException *deserializeThorException(MemoryBuffer &in);
  399. void graph_decl serializeThorException(IException *e, MemoryBuffer &out);
  400. class CActivityBase;
  401. interface IPartDescriptor;
  402. extern graph_decl bool getBestFilePart(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path, IExceptionHandler *eHandler = NULL);
  403. extern graph_decl StringBuffer &getFilePartLocations(IPartDescriptor &partDesc, StringBuffer &locations);
  404. extern graph_decl StringBuffer &getPartFilename(IPartDescriptor &partDesc, unsigned copy, StringBuffer &filePath, bool localMount=false);
  405. extern graph_decl IOutputMetaData *createFixedSizeMetaData(size32_t sz);
  406. interface IRowServer : extends IInterface
  407. {
  408. virtual void stop() = 0;
  409. };
  410. extern graph_decl IRowStream *createRowStreamFromNode(CActivityBase &activity, unsigned node, ICommunicator &comm, mptag_t mpTag, const bool &abortSoon);
  411. extern graph_decl IRowServer *createRowServer(CActivityBase *activity, IRowStream *seq, ICommunicator &comm, mptag_t mpTag);
  412. extern graph_decl IRowStream *createUngroupStream(IRowStream *input);
  413. interface IRowInterfaces;
  414. extern graph_decl void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpTag, IRowStream *input, IRowInterfaces *rowIf);
  415. extern graph_decl void logDiskSpace();
  416. #endif