thactivityutil.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jio.hpp"
  14. #include "jsort.hpp"
  15. #include "jfile.hpp"
  16. #include "jlzw.hpp"
  17. #include "jset.hpp"
  18. #include "commonext.hpp"
  19. #include "dadfs.hpp"
  20. #include "thactivityutil.ipp"
  21. #include "backup.hpp"
  22. #include "slave.ipp"
  23. #include "thbuf.hpp"
  24. #include "thbufdef.hpp"
  25. #include "thexception.hpp"
  26. #include "thmfilemanager.hpp"
  27. #include "thormisc.hpp"
  28. #include "thorport.hpp"
  29. //#define TRACE_STARTSTOP_EXCEPTIONS
  30. #ifdef _DEBUG
  31. //#define _FULL_TRACE
  32. #endif
  33. #define MAX_ROW_ARRAY_SIZE (0x100000*64) // 64MB
  34. #define TRANSFER_TIMEOUT (60*60*1000)
  35. #define JOIN_TIMEOUT (10*60*1000)
  36. class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
  37. {
  38. rowcount_t count;
  39. Linked<IEngineRowStream> inputStream;
  40. IThorRowInterfaces *rowIf;
  41. Owned<ISmartRowBuffer> smartbuf;
  42. size32_t bufsize;
  43. CSlaveActivity &activity;
  44. bool allowspill, preserveGrouping;
  45. ILookAheadStopNotify *notify;
  46. std::atomic<bool> running{false};
  47. bool started = false;
  48. rowcount_t required;
  49. Semaphore startSem;
  50. Owned<IException> getexception;
  51. class CThread: public Thread
  52. {
  53. CRowStreamLookAhead &parent;
  54. public:
  55. CThread(CRowStreamLookAhead &_parent)
  56. : Thread("CRowStreamLookAhead"), parent(_parent)
  57. {
  58. }
  59. int run()
  60. {
  61. return parent.run();
  62. }
  63. } thread;
  64. public:
  65. void doNotify()
  66. {
  67. if (notify)
  68. notify->onInputFinished(count);
  69. if (smartbuf)
  70. smartbuf->queryWriter()->flush();
  71. }
  72. int run()
  73. {
  74. try
  75. {
  76. StringBuffer temp;
  77. if (allowspill)
  78. GetTempFilePath(temp,"lookahd");
  79. assertex(bufsize);
  80. if (allowspill)
  81. smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf));
  82. else
  83. smartbuf.setown(createSmartInMemoryBuffer(&activity, rowIf, bufsize));
  84. startSem.signal();
  85. IRowWriter *writer = smartbuf->queryWriter();
  86. rowcount_t requiredLeft = required;
  87. if (preserveGrouping)
  88. {
  89. while (requiredLeft&&running)
  90. {
  91. OwnedConstThorRow row = inputStream->nextRow();
  92. if (!row)
  93. {
  94. row.setown(inputStream->nextRow());
  95. if (!row)
  96. break;
  97. else
  98. writer->putRow(NULL); // eog
  99. }
  100. ++count;
  101. writer->putRow(row.getClear());
  102. if (requiredLeft!=RCUNBOUND)
  103. requiredLeft--;
  104. }
  105. }
  106. else
  107. {
  108. while (requiredLeft&&running)
  109. {
  110. OwnedConstThorRow row = inputStream->ungroupedNextRow();
  111. if (!row)
  112. break;
  113. ++count;
  114. writer->putRow(row.getClear());
  115. if (requiredLeft!=RCUNBOUND)
  116. requiredLeft--;
  117. }
  118. }
  119. }
  120. catch(IException * e)
  121. {
  122. startSem.signal();
  123. ActPrintLog(&activity, e, "CRowStreamLookAhead get exception");
  124. getexception.setown(e);
  125. }
  126. // notify and flush async, as these can block, but we do not want to block in->stop()
  127. // especially if this is a spilling read ahead, where use case scenarios include not wanting to
  128. // block the upstream input.
  129. // An example is a firstn which if stop() it not called, it may block
  130. // other nodes from pulling because it is blocked upstream on full buffers (which can be discarded
  131. // on stop()), and those in turn are blocking other arms of the graph.
  132. class CNotifyThread : implements IThreaded
  133. {
  134. CThreaded threaded;
  135. CRowStreamLookAhead &owner;
  136. public:
  137. CNotifyThread(CRowStreamLookAhead &_owner) : threaded("Lookahead-CNotifyThread"), owner(_owner)
  138. {
  139. threaded.init(this);
  140. }
  141. ~CNotifyThread()
  142. {
  143. for (;;)
  144. {
  145. if (threaded.join(60000))
  146. break;
  147. PROGLOG("Still waiting on lookahead CNotifyThread thread to complete");
  148. }
  149. }
  150. // IThreaded impl.
  151. virtual void threadmain() override
  152. {
  153. owner.doNotify();
  154. }
  155. } notifyThread(*this);
  156. running = false;
  157. try
  158. {
  159. if (inputStream)
  160. inputStream->stop();
  161. }
  162. catch(IException * e)
  163. {
  164. ActPrintLog(&activity, e, "CRowStreamLookAhead stop exception");
  165. if (!getexception.get())
  166. getexception.setown(e);
  167. }
  168. // NB: Will wait on CNotifyThread to finish before returning
  169. return 0;
  170. }
  171. CRowStreamLookAhead(CSlaveActivity &_activity, IEngineRowStream *_inputStream, IThorRowInterfaces *_rowIf, size32_t _bufsize, bool _allowspill, bool _preserveGrouping, rowcount_t _required, ILookAheadStopNotify *_notify)
  172. : thread(*this), activity(_activity), inputStream(_inputStream), rowIf(_rowIf)
  173. {
  174. #ifdef _FULL_TRACE
  175. ActPrintLog(&activity, "CRowStreamLookAhead create %x",(unsigned)(memsize_t)this);
  176. #endif
  177. allowspill = _allowspill;
  178. preserveGrouping = _preserveGrouping;
  179. assertex((unsigned)-1 != _bufsize); // no longer supported
  180. bufsize = _bufsize?_bufsize:(0x40000*3); // use .75 MB buffer if bufsize omitted
  181. notify = _notify;
  182. running = true;
  183. required = _required;
  184. count = 0;
  185. }
  186. ~CRowStreamLookAhead()
  187. {
  188. if (!thread.join(1000*60))
  189. ActPrintLogEx(&activity.queryContainer(), thorlog_all, MCuserWarning, "CRowStreamLookAhead join timedout");
  190. }
  191. // IEngineRowStream
  192. virtual const void *nextRow() override
  193. {
  194. OwnedConstThorRow row = smartbuf->nextRow();
  195. if (getexception)
  196. throw getexception.getClear();
  197. if (!row)
  198. {
  199. #ifdef _FULL_TRACE
  200. ActPrintLog(&activity, "CRowStreamLookAhead eos %x",(unsigned)(memsize_t)this);
  201. #endif
  202. }
  203. return row.getClear();
  204. }
  205. // IStartableEngineRowStream
  206. virtual void start() override
  207. {
  208. #ifdef _FULL_TRACE
  209. ActPrintLog(&activity, "CRowStreamLookAhead start %x",(unsigned)(memsize_t)this);
  210. #endif
  211. started = true;
  212. running = true;
  213. thread.start();
  214. startSem.wait();
  215. }
  216. // IEngineRowStream
  217. virtual void resetEOF() override { throwUnexpected(); }
  218. // IRowStream
  219. virtual void stop() override
  220. {
  221. #ifdef _FULL_TRACE
  222. ActPrintLog(&activity, "CRowStreamLookAhead stop %x",(unsigned)(memsize_t)this);
  223. #endif
  224. if (!started) // never started
  225. {
  226. // still want to chain stop()'s even if never started
  227. if (inputStream)
  228. inputStream->stop();
  229. }
  230. else
  231. {
  232. running = false;
  233. if (smartbuf)
  234. smartbuf->stop(); // just in case blocked
  235. thread.join();
  236. started = false;
  237. if (getexception)
  238. throw getexception.getClear();
  239. }
  240. }
  241. };
  242. IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool allowspill, bool preserveGrouping, rowcount_t maxcount, ILookAheadStopNotify *notify)
  243. {
  244. return new CRowStreamLookAhead(*activity, inputStream, rowIf, bufsize, allowspill, preserveGrouping, maxcount, notify);
  245. }
  246. void initMetaInfo(ThorDataLinkMetaInfo &info)
  247. {
  248. info = {}; // Reset to default values.
  249. }
  250. void calcMetaInfoSize(ThorDataLinkMetaInfo &info, IThorDataLink *link)
  251. {
  252. if (!info.unknownRowsOutput&&link&&((info.totalRowsMin<=0)||(info.totalRowsMax<0)))
  253. {
  254. ThorDataLinkMetaInfo prev;
  255. link->getMetaInfo(prev);
  256. if (info.totalRowsMin<=0)
  257. {
  258. if (!info.canReduceNumRows)
  259. info.totalRowsMin = prev.totalRowsMin;
  260. else
  261. info.totalRowsMin = 0;
  262. }
  263. if (info.totalRowsMax<0)
  264. {
  265. if (!info.canIncreaseNumRows)
  266. {
  267. info.totalRowsMax = prev.totalRowsMax;
  268. if (info.totalRowsMin>info.totalRowsMax)
  269. info.totalRowsMax = -1;
  270. }
  271. }
  272. if (((offset_t)-1 != prev.byteTotal) && !info.unknownRowsOutput && !info.canReduceNumRows && !info.canIncreaseNumRows)
  273. info.byteTotal = prev.byteTotal;
  274. }
  275. else if (info.totalRowsMin<0)
  276. info.totalRowsMin = 0; // a good bet
  277. }
  278. void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const CThorInputArray &inputs)
  279. {
  280. //IThorDataLink **link,unsigned ninputs;
  281. if (0 == inputs.ordinality())
  282. {
  283. calcMetaInfoSize(info, nullptr);
  284. return;
  285. }
  286. else if (1 == inputs.ordinality())
  287. {
  288. calcMetaInfoSize(info, inputs.item(0).itdl);
  289. return;
  290. }
  291. if (!info.unknownRowsOutput)
  292. {
  293. __int64 min=0;
  294. __int64 max=0;
  295. for (unsigned i=0;i<inputs.ordinality();i++ )
  296. {
  297. CThorInput &input = inputs.item(i);
  298. if (input.itdl)
  299. {
  300. ThorDataLinkMetaInfo prev;
  301. input.itdl->getMetaInfo(prev);
  302. if (min>=0)
  303. {
  304. if (prev.totalRowsMin>=0)
  305. min += prev.totalRowsMin;
  306. else
  307. min = -1;
  308. }
  309. if (max>=0)
  310. {
  311. if (prev.totalRowsMax>=0)
  312. max += prev.totalRowsMax;
  313. else
  314. max = -1;
  315. }
  316. }
  317. }
  318. if (info.totalRowsMin<=0)
  319. {
  320. if (!info.canReduceNumRows)
  321. info.totalRowsMin = min;
  322. else
  323. info.totalRowsMin = 0;
  324. }
  325. if (info.totalRowsMax<0)
  326. {
  327. if (!info.canIncreaseNumRows)
  328. {
  329. info.totalRowsMax = max;
  330. if (info.totalRowsMin>info.totalRowsMax)
  331. info.totalRowsMax = -1;
  332. }
  333. }
  334. }
  335. else if (info.totalRowsMin<0)
  336. info.totalRowsMin = 0; // a good bet
  337. }
  338. void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *infos, unsigned num)
  339. {
  340. if (!infos||(num<=1))
  341. {
  342. if (1 == num)
  343. info = infos[0];
  344. else
  345. {
  346. info.fastThrough = true;
  347. info.totalRowsMin = info.totalRowsMax = 0;
  348. }
  349. return;
  350. }
  351. if (!info.unknownRowsOutput)
  352. {
  353. __int64 min=0;
  354. __int64 max=0;
  355. for (unsigned i=0; i<num; i++)
  356. {
  357. const ThorDataLinkMetaInfo &currentInfo = infos[i];
  358. if (min>=0)
  359. {
  360. if (currentInfo.totalRowsMin>=0)
  361. min += currentInfo.totalRowsMin;
  362. else
  363. min = -1;
  364. }
  365. if (max>=0)
  366. {
  367. if (currentInfo.totalRowsMax>=0)
  368. max += currentInfo.totalRowsMax;
  369. else
  370. max = -1;
  371. }
  372. if (0 == i)
  373. info.fastThrough = currentInfo.fastThrough;
  374. else if (info.fastThrough && !currentInfo.fastThrough) // i.e. if was true and this one is false, set return fastThrough to false
  375. info.fastThrough = false;
  376. }
  377. if (info.totalRowsMin<=0)
  378. {
  379. if (!info.canReduceNumRows)
  380. info.totalRowsMin = min;
  381. else
  382. info.totalRowsMin = 0;
  383. }
  384. if (info.totalRowsMax<0)
  385. {
  386. if (!info.canIncreaseNumRows)
  387. {
  388. info.totalRowsMax = max;
  389. if (info.totalRowsMin>info.totalRowsMax)
  390. info.totalRowsMax = -1;
  391. }
  392. }
  393. }
  394. else if (info.totalRowsMin<0)
  395. info.totalRowsMin = 0; // a good bet
  396. }
  397. bool checkSavedFileCRC(IFile * ifile, bool & timesDiffer, unsigned & storedCrc)
  398. {
  399. StringBuffer s(ifile->queryFilename());
  400. s.append(".crc");
  401. Owned<IFile> crcFile = createIFile(s.str());
  402. size32_t crcSz = (size32_t)crcFile->size();
  403. Owned<IFileIO> crcIO = crcFile->open(IFOread);
  404. bool performCrc = false;
  405. timesDiffer = false;
  406. if (crcIO)
  407. {
  408. Owned<IFileIOStream> crcStream = createIOStream(crcIO);
  409. if (sizeof(storedCrc) == crcSz) // backward compat. if = in size to just crc (no date stamps)
  410. {
  411. verifyex(crcSz == crcStream->read(crcSz, &storedCrc));
  412. performCrc = true;
  413. }
  414. else
  415. {
  416. size32_t sz;
  417. verifyex(sizeof(sz) == crcStream->read(sizeof(sz), &sz));
  418. void *mem = malloc(sz);
  419. MemoryBuffer mb;
  420. mb.setBuffer(sz, mem, true);
  421. verifyex(sz == crcStream->read(sz, mem));
  422. CDateTime storedCreateTime(mb);
  423. CDateTime storedModifiedTime(mb);
  424. CDateTime createTime, modifiedTime, accessedTime;
  425. ifile->getTime(&createTime, &modifiedTime, &accessedTime);
  426. if (!storedCreateTime.equals(createTime) || !storedModifiedTime.equals(modifiedTime))
  427. timesDiffer = true;
  428. else
  429. {
  430. mb.read(storedCrc);
  431. performCrc = true;
  432. }
  433. }
  434. }
  435. return performCrc;
  436. }
  437. static void _doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
  438. {
  439. StringBuffer primaryName;
  440. getPartFilename(partDesc, 0, primaryName);;
  441. RemoteFilename rfn;
  442. unsigned copies = partDesc.numCopies();
  443. unsigned c=1;
  444. for (; c<copies; c++)
  445. {
  446. unsigned replicateCopy;
  447. partDesc.copyClusterNum(c, &replicateCopy);
  448. rfn.clear();
  449. partDesc.getFilename(c, rfn);
  450. StringBuffer dstName;
  451. rfn.getPath(dstName);
  452. assertex(dstName.length());
  453. if (replicateCopy>0 )
  454. {
  455. try
  456. {
  457. queryThor().queryBackup().backup(dstName.str(), primaryName.str());
  458. }
  459. catch (IException *e)
  460. {
  461. Owned<IThorException> re = MakeActivityWarning(activity, e, "Failed to create replicate file '%s'", dstName.str());
  462. e->Release();
  463. activity->fireException(re);
  464. }
  465. }
  466. else // another primary
  467. {
  468. ActPrintLog(activity, "Copying to primary %s", dstName.str());
  469. StringBuffer tmpName(dstName.str());
  470. tmpName.append(".tmp");
  471. OwnedIFile tmpIFile = createIFile(tmpName.str());
  472. OwnedIFile srcFile = createIFile(primaryName.str());
  473. CFIPScope fipScope(tmpName.str());
  474. try
  475. {
  476. try
  477. {
  478. ensureDirectoryForFile(dstName.str());
  479. ::copyFile(tmpIFile, srcFile, 0x100000, iProgress);
  480. }
  481. catch (IException *e)
  482. {
  483. IThorException *re = MakeActivityException(activity, e, "Failed to copy to tmp file '%s' from source file '%s'", tmpIFile->queryFilename(), srcFile->queryFilename());
  484. e->Release();
  485. throw re;
  486. }
  487. try
  488. {
  489. OwnedIFile dstIFile = createIFile(dstName.str());
  490. dstIFile->remove();
  491. tmpIFile->rename(pathTail(dstName.str()));
  492. }
  493. catch (IException *e)
  494. {
  495. IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpName.str(), dstName.str());
  496. e->Release();
  497. throw re;
  498. }
  499. }
  500. catch (IException *)
  501. {
  502. try { tmpIFile->remove(); }
  503. catch (IException *e) { ActPrintLog(&activity->queryContainer(), e); e->Release(); }
  504. throw;
  505. }
  506. }
  507. }
  508. }
  509. void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc)
  510. {
  511. RemoteFilename rfn;
  512. unsigned copies = partDesc.numCopies();
  513. unsigned c=1;
  514. for (; c<copies; c++)
  515. {
  516. unsigned replicateCopy;
  517. partDesc.copyClusterNum(c, &replicateCopy);
  518. rfn.clear();
  519. partDesc.getFilename(c, rfn);
  520. StringBuffer dstName;
  521. rfn.getPath(dstName);
  522. assertex(dstName.length());
  523. if (replicateCopy>0)
  524. {
  525. try
  526. {
  527. queryThor().queryBackup().cancel(dstName.str());
  528. }
  529. catch (IException *e)
  530. {
  531. Owned<IThorException> re = MakeActivityException(activity, e, "Error cancelling backup '%s'", dstName.str());
  532. ActPrintLog(&activity->queryContainer(), e);
  533. e->Release();
  534. }
  535. }
  536. }
  537. }
  538. void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
  539. {
  540. try
  541. {
  542. _doReplicate(activity, partDesc, iProgress);
  543. }
  544. catch (IException *e)
  545. {
  546. Owned<IThorException> e2 = MakeActivityWarning(activity, e, "doReplicate");
  547. e->Release();
  548. activity->fireException(e2);
  549. }
  550. }
  551. class CWriteHandler : implements IFileIO, public CInterface
  552. {
  553. Linked<IFileIO> primaryio;
  554. Linked<IFile> primary;
  555. StringBuffer primaryName;
  556. ICopyFileProgress *iProgress;
  557. bool *aborted;
  558. CActivityBase &activity;
  559. IPartDescriptor &partDesc;
  560. bool remote;
  561. CFIPScope fipScope;
  562. unsigned twFlags;
  563. bool closed = false;
  564. void checkAndHandleClose()
  565. {
  566. if (closed)
  567. return;
  568. closed = true;
  569. primaryio->close();
  570. primaryio.clear();
  571. if (aborted && *aborted)
  572. {
  573. primary->remove(); // i.e. never completed, so remove partial (temp) primary
  574. return;
  575. }
  576. if (twFlags & TW_RenameToPrimary)
  577. {
  578. OwnedIFile tmpIFile;
  579. CFIPScope fipScope;
  580. if (remote && !(twFlags & TW_External))
  581. {
  582. StringBuffer tmpName(primaryName.str());
  583. tmpName.append(".tmp");
  584. tmpIFile.setown(createIFile(tmpName.str()));
  585. fipScope.set(tmpName.str());
  586. try
  587. {
  588. try
  589. {
  590. ensureDirectoryForFile(primaryName.str());
  591. ::copyFile(tmpIFile, primary, 0x100000, iProgress);
  592. }
  593. catch (IException *e)
  594. {
  595. IThorException *re = ThorWrapException(e, "Failed to copy local temp file '%s' to remote temp location '%s'", primary->queryFilename(), tmpIFile->queryFilename());
  596. e->Release();
  597. throw re;
  598. }
  599. }
  600. catch (IException *)
  601. {
  602. try { tmpIFile->remove(); }
  603. catch (IException *e) { ActPrintLog(&activity.queryContainer(), e); e->Release(); }
  604. }
  605. }
  606. else
  607. tmpIFile.setown(createIFile(primary->queryFilename()));
  608. try
  609. {
  610. try
  611. {
  612. OwnedIFile dstIFile = createIFile(primaryName.str());
  613. dstIFile->remove();
  614. tmpIFile->rename(pathTail(primaryName.str()));
  615. }
  616. catch (IException *e)
  617. {
  618. IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpIFile->queryFilename(), primaryName.str());
  619. e->Release();
  620. throw re;
  621. }
  622. }
  623. catch (IException *)
  624. {
  625. try { primary->remove(); }
  626. catch (IException *e) { ActPrintLog(&activity.queryContainer(), e); e->Release(); }
  627. throw;
  628. }
  629. primary->remove();
  630. fipScope.clear();
  631. }
  632. if (partDesc.numCopies()>1)
  633. _doReplicate(&activity, partDesc, iProgress);
  634. }
  635. public:
  636. IMPLEMENT_IINTERFACE_USING(CInterface);
  637. CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, unsigned _twFlags, bool *_aborted)
  638. : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), twFlags(_twFlags), aborted(_aborted), fipScope(primary->queryFilename())
  639. {
  640. RemoteFilename rfn;
  641. partDesc.getFilename(0, rfn);
  642. remote = !rfn.isLocal();
  643. rfn.getPath(primaryName);
  644. if (globals->getPropBool("@replicateAsync", true))
  645. cancelReplicates(&activity, partDesc);
  646. }
  647. virtual void beforeDispose() override
  648. {
  649. // Can't throw in destructor...
  650. // Note that if we do throw the CWriteHandler object is liable to be leaked...
  651. try
  652. {
  653. checkAndHandleClose();
  654. }
  655. catch (IException *e)
  656. {
  657. EXCLOG(e, "CWriteHandler::beforeDispose");
  658. e->Release();
  659. }
  660. }
  661. // IFileIO impl.
  662. virtual size32_t read(offset_t pos, size32_t len, void * data) { return primaryio->read(pos, len, data); }
  663. virtual offset_t size() { return primaryio->size(); }
  664. virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
  665. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
  666. virtual unsigned __int64 getStatistic(StatisticKind kind) { return primaryio->getStatistic(kind); }
  667. virtual void setSize(offset_t size) { primaryio->setSize(size); }
  668. virtual void flush() { primaryio->flush(); }
  669. virtual void close()
  670. {
  671. checkAndHandleClose();
  672. }
  673. };
  674. IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, unsigned twFlags, bool &compress, ICompressor *ecomp, ICopyFileProgress *iProgress, bool *aborted, StringBuffer *_outLocationName)
  675. {
  676. StringBuffer outLocationNameI;
  677. StringBuffer &outLocationName = _outLocationName?*_outLocationName:outLocationNameI;
  678. RemoteFilename rfn;
  679. partDesc.getFilename(0, rfn);
  680. StringBuffer primaryName;
  681. rfn.getPath(primaryName);
  682. if (isUrl(primaryName))
  683. {
  684. twFlags &= ~TW_RenameToPrimary;
  685. twFlags |= TW_Direct;
  686. }
  687. if (twFlags & TW_Direct)
  688. {
  689. if (0 == outLocationName.length())
  690. outLocationName.append(primaryName.str());
  691. }
  692. else
  693. {
  694. // use temp name
  695. if (rfn.isLocal() || (twFlags & TW_External))
  696. {
  697. // ensure local tmp in same directory (and plane) as target
  698. StringBuffer dir;
  699. splitDirTail(primaryName, dir);
  700. addPathSepChar(dir);
  701. GetTempFileName(dir, "partial");
  702. outLocationName.swapWith(dir);
  703. }
  704. else
  705. GetTempFilePath(outLocationName, "partial");
  706. assertex(outLocationName.length());
  707. ensureDirectoryForFile(outLocationName.str());
  708. }
  709. OwnedIFile file = createIFile(outLocationName.str());
  710. Owned<IFileIO> fileio;
  711. if (compress)
  712. {
  713. unsigned compMethod = COMPRESS_METHOD_LZ4;
  714. // rowdif used if recordSize > 0, else fallback to compMethod
  715. if (!ecomp)
  716. {
  717. if (twFlags & TW_Temporary)
  718. {
  719. // if temp file then can use newer compressor
  720. StringBuffer compType;
  721. activity->getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
  722. compMethod = getCompMethod(compType);
  723. }
  724. // force
  725. if (activity->getOptBool(THOROPT_COMP_FORCELZW, false))
  726. {
  727. recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces compMethod.
  728. compMethod = COMPRESS_METHOD_LZW;
  729. }
  730. else if (activity->getOptBool(THOROPT_COMP_FORCEFLZ, false))
  731. compMethod = COMPRESS_METHOD_FASTLZ;
  732. else if (activity->getOptBool(THOROPT_COMP_FORCELZ4, false))
  733. compMethod = COMPRESS_METHOD_LZ4;
  734. else if (activity->getOptBool(THOROPT_COMP_FORCELZ4HC, false))
  735. compMethod = COMPRESS_METHOD_LZ4HC;
  736. }
  737. fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp, compMethod));
  738. if (!fileio)
  739. {
  740. compress = false;
  741. Owned<IThorException> e = MakeActivityWarning(activity, TE_LargeBufferWarning, "Could not write file '%s' compressed", outLocationName.str());
  742. activity->fireException(e);
  743. fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
  744. }
  745. }
  746. else
  747. fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
  748. if (!fileio)
  749. throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
  750. StringBuffer compStr;
  751. if (compress)
  752. {
  753. ICompressedFileIO *icompfio = QUERYINTERFACE(fileio.get(), ICompressedFileIO);
  754. if (icompfio)
  755. {
  756. unsigned compMeth2 = icompfio->method();
  757. if (COMPRESS_METHOD_FASTLZ == compMeth2)
  758. compStr.append("flz");
  759. else if (COMPRESS_METHOD_LZ4 == compMeth2)
  760. compStr.append("lz4");
  761. else if (COMPRESS_METHOD_LZW == compMeth2)
  762. compStr.append("lzw");
  763. else if (COMPRESS_METHOD_ROWDIF == compMeth2)
  764. compStr.append("rdiff");
  765. else
  766. compStr.append("unknown");
  767. }
  768. else
  769. compStr.append("unknown");
  770. }
  771. else
  772. compStr.append("false");
  773. ActPrintLog(activity, "Writing to file: %s, compress=%s", file->queryFilename(), compStr.str());
  774. return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, twFlags, aborted);
  775. }
  776. StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath)
  777. {
  778. unsigned location;
  779. OwnedIFile ifile;
  780. if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(activity, partDesc, ifile, location, filePath):getBestFilePart(activity, partDesc, ifile, location, filePath, activity))
  781. ActPrintLog(activity, "reading physical file '%s' (logical file = %s)", filePath.str(), logicalFilename);
  782. else
  783. {
  784. StringBuffer locations;
  785. IException *e = MakeActivityException(activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename, getFilePartLocations(partDesc, locations).str(), GetLastError());
  786. ActPrintLog(&activity->queryContainer(), e);
  787. throw e;
  788. }
  789. return filePath;
  790. }
  791. IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped)
  792. {
  793. class CSeqPartHandler : implements IRowStream, public CSimpleInterface
  794. {
  795. IArrayOf<IPartDescriptor> &partDescs;
  796. int part, parts;
  797. bool eof, grouped, someInGroup;
  798. Linked<CPartHandler> partHandler;
  799. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  800. public:
  801. CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped)
  802. : partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped)
  803. {
  804. part = 0;
  805. parts = partDescs.ordinality();
  806. someInGroup = false;
  807. if (0==parts)
  808. {
  809. eof = true;
  810. }
  811. else
  812. {
  813. eof = false;
  814. partHandler->setPart(&partDescs.item(0));
  815. }
  816. }
  817. virtual void stop()
  818. {
  819. if (partHandler)
  820. {
  821. partHandler->stop();
  822. partHandler.clear();
  823. }
  824. }
  825. const void *nextRow()
  826. {
  827. if (eof)
  828. {
  829. return NULL;
  830. }
  831. for (;;)
  832. {
  833. OwnedConstThorRow row = partHandler->nextRow();
  834. if (row)
  835. {
  836. someInGroup = true;
  837. return row.getClear();
  838. }
  839. if (grouped && someInGroup)
  840. {
  841. someInGroup = false;
  842. return NULL;
  843. }
  844. ++part;
  845. if (part >= parts)
  846. {
  847. partHandler->stop();
  848. partHandler.clear();
  849. eof = true;
  850. return NULL;
  851. }
  852. partHandler->setPart(&partDescs.item(part));
  853. }
  854. }
  855. };
  856. return new CSeqPartHandler(partHandler, partDescs, grouped);
  857. }