thactivityutil.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jio.hpp"
  15. #include "jsort.hpp"
  16. #include "jfile.hpp"
  17. #include "jlzw.hpp"
  18. #include "jset.hpp"
  19. #include "commonext.hpp"
  20. #include "dadfs.hpp"
  21. #include "thactivityutil.ipp"
  22. #include "backup.hpp"
  23. #include "slave.ipp"
  24. #include "thbuf.hpp"
  25. #include "thbufdef.hpp"
  26. #include "thcrc.hpp"
  27. #include "thexception.hpp"
  28. #include "thmfilemanager.hpp"
  29. #include "thormisc.hpp"
  30. #include "thorport.hpp"
  31. //#define TRACE_STARTSTOP_EXCEPTIONS
  32. #ifdef _DEBUG
  33. //#define _FULL_TRACE
  34. #endif
  35. #define MAX_ROW_ARRAY_SIZE (0x100000*64) // 64MB
  36. #define TRANSFER_TIMEOUT (60*60*1000)
  37. #define JOIN_TIMEOUT (10*60*1000)
  38. #ifdef _MSC_VER
  39. #pragma warning(push)
  40. #pragma warning( disable : 4355 )
  41. #endif
  42. class ThorLookaheadCache: public IThorDataLink, public CSimpleInterface
  43. {
  44. rowcount_t count;
  45. Linked<IThorDataLink> in;
  46. Owned<ISmartRowBuffer> smartbuf;
  47. size32_t bufsize;
  48. CActivityBase &activity;
  49. bool allowspill, preserveLhsGrouping;
  50. ISmartBufferNotify *notify;
  51. bool running;
  52. bool stopped;
  53. rowcount_t required;
  54. Semaphore startsem;
  55. bool started;
  56. Owned<IException> startexception;
  57. Owned<IException> getexception;
  58. bool asyncstart;
  59. class Cthread: public Thread
  60. {
  61. ThorLookaheadCache &parent;
  62. public:
  63. Cthread(ThorLookaheadCache &_parent)
  64. : Thread("ThorLookaheadCache"), parent(_parent)
  65. {
  66. }
  67. int run()
  68. {
  69. return parent.run();
  70. }
  71. } thread;
  72. public:
  73. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  74. int run()
  75. {
  76. if (!started) {
  77. try {
  78. in->start();
  79. started = true;
  80. }
  81. catch(IException * e)
  82. {
  83. ActPrintLog(&activity, e, "ThorLookaheadCache starting input");
  84. startexception.setown(e);
  85. if (asyncstart)
  86. notify->onInputStarted(startexception);
  87. running = false;
  88. stopped = true;
  89. startsem.signal();
  90. return 0;
  91. }
  92. }
  93. try {
  94. StringBuffer temp;
  95. if (allowspill)
  96. GetTempName(temp,"lookahd",true);
  97. assertex(bufsize);
  98. if (allowspill)
  99. smartbuf.setown(createSmartBuffer(&activity, temp.toCharArray(), bufsize, queryRowInterfaces(in)));
  100. else
  101. smartbuf.setown(createSmartInMemoryBuffer(&activity, queryRowInterfaces(in), bufsize));
  102. if (notify)
  103. notify->onInputStarted(NULL);
  104. startsem.signal();
  105. Linked<IRowWriter> writer = smartbuf->queryWriter();
  106. if (preserveLhsGrouping)
  107. {
  108. while (required&&running)
  109. {
  110. OwnedConstThorRow row = in->nextRow();
  111. if (!row)
  112. {
  113. row.setown(in->nextRow());
  114. if (!row)
  115. break;
  116. else
  117. writer->putRow(NULL); // eog
  118. }
  119. ++count;
  120. writer->putRow(row.getClear());
  121. if (required!=RCUNBOUND)
  122. required--;
  123. }
  124. }
  125. else
  126. {
  127. while (required&&running)
  128. {
  129. OwnedConstThorRow row = in->ungroupedNextRow();
  130. if (!row)
  131. break;
  132. ++count;
  133. writer->putRow(row.getClear());
  134. if (required!=RCUNBOUND)
  135. required--;
  136. }
  137. }
  138. }
  139. catch(IException * e)
  140. {
  141. ActPrintLog(&activity, e, "ThorLookaheadCache get exception");
  142. getexception.setown(e);
  143. }
  144. if (notify)
  145. notify->onInputFinished(count);
  146. if (smartbuf)
  147. smartbuf->queryWriter()->flush();
  148. running = false;
  149. try {
  150. if (in)
  151. in->stop();
  152. }
  153. catch(IException * e)
  154. {
  155. ActPrintLog(&activity, e, "ThorLookaheadCache stop exception");
  156. if (!getexception.get())
  157. getexception.setown(e);
  158. }
  159. return 0;
  160. }
  161. ThorLookaheadCache(CActivityBase &_activity, IThorDataLink *_in,size32_t _bufsize,bool _allowspill,bool _preserveLhsGrouping, rowcount_t _required,ISmartBufferNotify *_notify, bool _instarted, IDiskUsage *_iDiskUsage)
  162. : thread(*this), activity(_activity), in(_in)
  163. {
  164. #ifdef _FULL_TRACE
  165. ActPrintLog(&activity, "ThorLookaheadCache create %x",(unsigned)(memsize_t)this);
  166. #endif
  167. asyncstart = false;
  168. allowspill = _allowspill;
  169. preserveLhsGrouping = _preserveLhsGrouping;
  170. assertex((unsigned)-1 != _bufsize); // no longer supported
  171. bufsize = _bufsize?_bufsize:(0x40000*3); // use .75 MB buffer if bufsize omitted
  172. notify = _notify;
  173. running = true;
  174. required = _required;
  175. count = 0;
  176. stopped = true;
  177. started = _instarted;
  178. }
  179. ~ThorLookaheadCache()
  180. {
  181. if (!thread.join(1000*60))
  182. ActPrintLogEx(&activity.queryContainer(), thorlog_all, MCuserWarning, "ThorLookaheadCache join timedout");
  183. }
  184. void start()
  185. {
  186. #ifdef _FULL_TRACE
  187. ActPrintLog(&activity, "ThorLookaheadCache start %x",(unsigned)(memsize_t)this);
  188. #endif
  189. stopped = false;
  190. asyncstart = notify&&notify->startAsync();
  191. thread.start();
  192. if (!asyncstart) {
  193. startsem.wait();
  194. if (startexception)
  195. throw startexception.getClear();
  196. }
  197. }
  198. void stop()
  199. {
  200. #ifdef _FULL_TRACE
  201. ActPrintLog(&activity, "ThorLookaheadCache stop %x",(unsigned)(memsize_t)this);
  202. #endif
  203. if (!stopped) {
  204. running = false;
  205. if (smartbuf)
  206. smartbuf->stop(); // just in case blocked
  207. thread.join();
  208. stopped = true;
  209. if (getexception)
  210. throw getexception.getClear();
  211. }
  212. }
  213. const void *nextRow()
  214. {
  215. OwnedConstThorRow row = smartbuf->nextRow();
  216. if (getexception)
  217. throw getexception.getClear();
  218. if (!row) {
  219. #ifdef _FULL_TRACE
  220. ActPrintLog(&activity, "ThorLookaheadCache eos %x",(unsigned)(memsize_t)this);
  221. #endif
  222. }
  223. return row.getClear();
  224. }
  225. bool isGrouped() { return false; }
  226. void getMetaInfo(ThorDataLinkMetaInfo &info)
  227. {
  228. memset(&info,0,sizeof(info));
  229. in->getMetaInfo(info);
  230. // more TBD
  231. }
  232. CActivityBase *queryFromActivity()
  233. {
  234. return in->queryFromActivity();
  235. }
  236. void dataLinkSerialize(MemoryBuffer &mb)
  237. {
  238. // no serialization information (yet)
  239. }
  240. unsigned __int64 queryTotalCycles() const { return in->queryTotalCycles(); }
  241. };
  242. #ifdef _MSC_VER
  243. #pragma warning(pop)
  244. #endif
  245. IThorDataLink *createDataLinkSmartBuffer(CActivityBase *activity, IThorDataLink *in, size32_t bufsize, bool allowspill, bool preserveLhsGrouping, rowcount_t maxcount, ISmartBufferNotify *notify, bool instarted, IDiskUsage *iDiskUsage)
  246. {
  247. return new ThorLookaheadCache(*activity, in,bufsize,allowspill,preserveLhsGrouping,maxcount,notify,instarted,iDiskUsage);
  248. }
  249. void CThorDataLink::initMetaInfo(ThorDataLinkMetaInfo &info)
  250. {
  251. memset(&info,0,sizeof(info));
  252. //info.rowsdone = xx;
  253. info.totalRowsMin = 0;
  254. info.totalRowsMax = -1; // rely on inputs to set
  255. info.spilled = (offset_t)-1;
  256. info.byteTotal = (offset_t)-1;
  257. info.rowsOutput = getDataLinkCount();
  258. // more
  259. }
  260. void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info,IThorDataLink *link)
  261. {
  262. if (!info.unknownRowsOutput&&link&&((info.totalRowsMin<=0)||(info.totalRowsMax<0))) {
  263. ThorDataLinkMetaInfo prev;
  264. link->getMetaInfo(prev);
  265. if (info.totalRowsMin<=0) {
  266. if (!info.canReduceNumRows)
  267. info.totalRowsMin = prev.totalRowsMin;
  268. else
  269. info.totalRowsMin = 0;
  270. }
  271. if (info.totalRowsMax<0) {
  272. if (!info.canIncreaseNumRows) {
  273. info.totalRowsMax = prev.totalRowsMax;
  274. if (info.totalRowsMin>info.totalRowsMax)
  275. info.totalRowsMax = -1;
  276. }
  277. }
  278. if (((offset_t)-1 != prev.byteTotal) && info.totalRowsMin == info.totalRowsMax)
  279. info.byteTotal = prev.byteTotal;
  280. }
  281. else if (info.totalRowsMin<0)
  282. info.totalRowsMin = 0; // a good bet
  283. }
  284. void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info,IThorDataLink **link,unsigned ninputs)
  285. {
  286. if (!link||(ninputs<=1)) {
  287. calcMetaInfoSize(info,link&&(ninputs==1)?link[0]:NULL);
  288. return ;
  289. }
  290. if (!info.unknownRowsOutput) {
  291. __int64 min=0;
  292. __int64 max=0;
  293. for (unsigned i=0;i<ninputs;i++ ) {
  294. if (link[i]) {
  295. ThorDataLinkMetaInfo prev;
  296. link[i]->getMetaInfo(prev);
  297. if (min>=0) {
  298. if (prev.totalRowsMin>=0)
  299. min += prev.totalRowsMin;
  300. else
  301. min = -1;
  302. }
  303. if (max>=0) {
  304. if (prev.totalRowsMax>=0)
  305. max += prev.totalRowsMax;
  306. else
  307. max = -1;
  308. }
  309. }
  310. }
  311. if (info.totalRowsMin<=0) {
  312. if (!info.canReduceNumRows)
  313. info.totalRowsMin = min;
  314. else
  315. info.totalRowsMin = 0;
  316. }
  317. if (info.totalRowsMax<0) {
  318. if (!info.canIncreaseNumRows) {
  319. info.totalRowsMax = max;
  320. if (info.totalRowsMin>info.totalRowsMax)
  321. info.totalRowsMax = -1;
  322. }
  323. }
  324. }
  325. else if (info.totalRowsMin<0)
  326. info.totalRowsMin = 0; // a good bet
  327. }
  328. void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info, ThorDataLinkMetaInfo *infos,unsigned num)
  329. {
  330. if (!infos||(num<=1)) {
  331. if (1 == num)
  332. info = infos[0];
  333. return;
  334. }
  335. if (!info.unknownRowsOutput) {
  336. __int64 min=0;
  337. __int64 max=0;
  338. for (unsigned i=0;i<num;i++ ) {
  339. ThorDataLinkMetaInfo &prev = infos[i];
  340. if (min>=0) {
  341. if (prev.totalRowsMin>=0)
  342. min += prev.totalRowsMin;
  343. else
  344. min = -1;
  345. }
  346. if (max>=0) {
  347. if (prev.totalRowsMax>=0)
  348. max += prev.totalRowsMax;
  349. else
  350. max = -1;
  351. }
  352. }
  353. if (info.totalRowsMin<=0) {
  354. if (!info.canReduceNumRows)
  355. info.totalRowsMin = min;
  356. else
  357. info.totalRowsMin = 0;
  358. }
  359. if (info.totalRowsMax<0) {
  360. if (!info.canIncreaseNumRows) {
  361. info.totalRowsMax = max;
  362. if (info.totalRowsMin>info.totalRowsMax)
  363. info.totalRowsMax = -1;
  364. }
  365. }
  366. }
  367. else if (info.totalRowsMin<0)
  368. info.totalRowsMin = 0; // a good bet
  369. }
  370. static bool canStall(CActivityBase *act)
  371. {
  372. if (!act)
  373. return false;
  374. unsigned i=0;
  375. IThorDataLink *inp;
  376. while ((inp=((CSlaveActivity *)act)->queryInput(i++))!=NULL) {
  377. ThorDataLinkMetaInfo info;
  378. inp->getMetaInfo(info);
  379. if (info.canStall)
  380. return true;
  381. if (!info.isSource&&!info.buffersInput&&!info.canBufferInput)
  382. if (canStall((CSlaveActivity *)inp->queryFromActivity()))
  383. return true;
  384. }
  385. return false;
  386. }
  387. bool isSmartBufferSpillNeeded(CActivityBase *act)
  388. {
  389. // two part - forward and reverse checking
  390. // first reverse looking for stalling activities
  391. if (!canStall((CSlaveActivity *)act))
  392. return false;
  393. // now check
  394. return true;
  395. }
  396. bool checkSavedFileCRC(IFile * ifile, bool & timesDiffer, unsigned & storedCrc)
  397. {
  398. StringBuffer s(ifile->queryFilename());
  399. s.append(".crc");
  400. Owned<IFile> crcFile = createIFile(s.str());
  401. size32_t crcSz = (size32_t)crcFile->size();
  402. Owned<IFileIO> crcIO = crcFile->open(IFOread);
  403. bool performCrc = false;
  404. timesDiffer = false;
  405. if (crcIO)
  406. {
  407. Owned<IFileIOStream> crcStream = createIOStream(crcIO);
  408. if (sizeof(storedCrc) == crcSz) // backward compat. if = in size to just crc (no date stamps)
  409. {
  410. verifyex(crcSz == crcStream->read(crcSz, &storedCrc));
  411. performCrc = true;
  412. }
  413. else
  414. {
  415. size32_t sz;
  416. verifyex(sizeof(sz) == crcStream->read(sizeof(sz), &sz));
  417. void *mem = malloc(sz);
  418. MemoryBuffer mb;
  419. mb.setBuffer(sz, mem, true);
  420. verifyex(sz == crcStream->read(sz, mem));
  421. CDateTime storedCreateTime(mb);
  422. CDateTime storedModifiedTime(mb);
  423. CDateTime createTime, modifiedTime, accessedTime;
  424. ifile->getTime(&createTime, &modifiedTime, &accessedTime);
  425. if (!storedCreateTime.equals(createTime) || !storedModifiedTime.equals(modifiedTime))
  426. timesDiffer = true;
  427. else
  428. {
  429. mb.read(storedCrc);
  430. performCrc = true;
  431. }
  432. }
  433. }
  434. return performCrc;
  435. }
  436. static void _doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
  437. {
  438. StringBuffer primaryName;
  439. getPartFilename(partDesc, 0, primaryName);;
  440. RemoteFilename rfn;
  441. IFileDescriptor &fileDesc = partDesc.queryOwner();
  442. unsigned copies = partDesc.numCopies();
  443. unsigned c=1;
  444. for (; c<copies; c++)
  445. {
  446. unsigned replicateCopy;
  447. unsigned clusterNum = partDesc.copyClusterNum(c, &replicateCopy);
  448. rfn.clear();
  449. partDesc.getFilename(c, rfn);
  450. StringBuffer dstName;
  451. rfn.getPath(dstName);
  452. assertex(dstName.length());
  453. if (replicateCopy>0 )
  454. {
  455. try
  456. {
  457. queryThor().queryBackup().backup(dstName.str(), primaryName.str());
  458. }
  459. catch (IException *e)
  460. {
  461. Owned<IThorException> re = MakeActivityWarning(activity, e, "Failed to create replicate file '%s'", dstName.str());
  462. e->Release();
  463. activity->fireException(re);
  464. }
  465. }
  466. else // another primary
  467. {
  468. ActPrintLog(activity, "Copying to primary %s", dstName.str());
  469. StringBuffer tmpName(dstName.str());
  470. tmpName.append(".tmp");
  471. OwnedIFile tmpIFile = createIFile(tmpName.str());
  472. OwnedIFile srcFile = createIFile(primaryName.str());
  473. CFIPScope fipScope(tmpName.str());
  474. try
  475. {
  476. try
  477. {
  478. ensureDirectoryForFile(dstName.str());
  479. ::copyFile(tmpIFile, srcFile, 0x100000, iProgress);
  480. }
  481. catch (IException *e)
  482. {
  483. IThorException *re = MakeActivityException(activity, e, "Failed to copy to tmp file '%s' from source file '%s'", tmpIFile->queryFilename(), srcFile->queryFilename());
  484. e->Release();
  485. throw re;
  486. }
  487. try
  488. {
  489. OwnedIFile dstIFile = createIFile(dstName.str());
  490. dstIFile->remove();
  491. tmpIFile->rename(pathTail(dstName.str()));
  492. }
  493. catch (IException *e)
  494. {
  495. IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpName.str(), dstName.str());
  496. e->Release();
  497. throw re;
  498. }
  499. }
  500. catch (IException *)
  501. {
  502. try { tmpIFile->remove(); }
  503. catch (IException *e) { ActPrintLog(&activity->queryContainer(), e, NULL); e->Release(); }
  504. throw;
  505. }
  506. }
  507. }
  508. }
  509. void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc)
  510. {
  511. RemoteFilename rfn;
  512. IFileDescriptor &fileDesc = partDesc.queryOwner();
  513. unsigned copies = partDesc.numCopies();
  514. unsigned c=1;
  515. for (; c<copies; c++)
  516. {
  517. unsigned replicateCopy;
  518. unsigned clusterNum = partDesc.copyClusterNum(c, &replicateCopy);
  519. rfn.clear();
  520. partDesc.getFilename(c, rfn);
  521. StringBuffer dstName;
  522. rfn.getPath(dstName);
  523. assertex(dstName.length());
  524. if (replicateCopy>0)
  525. {
  526. try
  527. {
  528. queryThor().queryBackup().cancel(dstName.str());
  529. }
  530. catch (IException *e)
  531. {
  532. Owned<IThorException> re = MakeActivityException(activity, e, "Error cancelling backup '%s'", dstName.str());
  533. ActPrintLog(&activity->queryContainer(), e, NULL);
  534. e->Release();
  535. }
  536. }
  537. }
  538. }
  539. void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
  540. {
  541. try
  542. {
  543. _doReplicate(activity, partDesc, iProgress);
  544. }
  545. catch (IException *e)
  546. {
  547. Owned<IThorException> e2 = MakeActivityWarning(activity, e, "doReplicate");
  548. e->Release();
  549. activity->fireException(e2);
  550. }
  551. }
  552. class CWriteHandler : public CSimpleInterface, implements IFileIO
  553. {
  554. Linked<IFileIO> primaryio;
  555. Linked<IFile> primary;
  556. StringBuffer primaryName;
  557. ICopyFileProgress *iProgress;
  558. bool *aborted;
  559. CActivityBase &activity;
  560. IPartDescriptor &partDesc;
  561. bool remote, direct, renameToPrimary;
  562. CFIPScope fipScope;
  563. public:
  564. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  565. CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, bool _direct, bool _renameToPrimary, bool *_aborted)
  566. : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), direct(_direct), renameToPrimary(_renameToPrimary), aborted(_aborted), fipScope(primary->queryFilename())
  567. {
  568. RemoteFilename rfn;
  569. partDesc.getFilename(0, rfn);
  570. remote = !rfn.isLocal();
  571. rfn.getPath(primaryName);
  572. if (globals->getPropBool("@replicateAsync", true))
  573. cancelReplicates(&activity, partDesc);
  574. }
  575. ~CWriteHandler()
  576. {
  577. primaryio.clear(); // should close
  578. if (aborted && *aborted) return;
  579. if (renameToPrimary)
  580. {
  581. OwnedIFile tmpIFile;
  582. CFIPScope fipScope;
  583. if (remote)
  584. {
  585. StringBuffer tmpName(primaryName.str());
  586. tmpName.append(".tmp");
  587. tmpIFile.setown(createIFile(tmpName.str()));
  588. fipScope.set(tmpName.str());
  589. try
  590. {
  591. try
  592. {
  593. ensureDirectoryForFile(primaryName.str());
  594. ::copyFile(tmpIFile, primary, 0x100000, iProgress);
  595. }
  596. catch (IException *e)
  597. {
  598. IThorException *re = ThorWrapException(e, "Failed to copy local temp file '%s' to remote temp location '%s'", primary->queryFilename(), tmpIFile->queryFilename());
  599. e->Release();
  600. throw re;
  601. }
  602. }
  603. catch (IException *)
  604. {
  605. try { tmpIFile->remove(); }
  606. catch (IException *e) { ActPrintLog(&activity.queryContainer(), e, NULL); e->Release(); }
  607. }
  608. }
  609. else
  610. tmpIFile.setown(createIFile(primary->queryFilename()));
  611. try
  612. {
  613. try
  614. {
  615. OwnedIFile dstIFile = createIFile(primaryName.str());
  616. dstIFile->remove();
  617. tmpIFile->rename(pathTail(primaryName.str()));
  618. }
  619. catch (IException *e)
  620. {
  621. IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpIFile->queryFilename(), primaryName.str());
  622. e->Release();
  623. throw re;
  624. }
  625. }
  626. catch (IException *)
  627. {
  628. try { primary->remove(); }
  629. catch (IException *e) { ActPrintLog(&activity.queryContainer(), e, NULL); e->Release(); }
  630. throw;
  631. }
  632. primary->remove();
  633. fipScope.clear();
  634. }
  635. if (partDesc.numCopies()>1)
  636. _doReplicate(&activity, partDesc, iProgress);
  637. }
  638. // IFileIO impl.
  639. virtual size32_t read(offset_t pos, size32_t len, void * data) { return primaryio->read(pos, len, data); }
  640. virtual offset_t size() { return primaryio->size(); }
  641. virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
  642. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
  643. virtual void setSize(offset_t size) { primaryio->setSize(size); }
  644. virtual void flush() { primaryio->flush(); }
  645. virtual void close() { primaryio->close(); }
  646. };
  647. IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)
  648. {
  649. StringBuffer outLocationNameI;
  650. StringBuffer &outLocationName = _outLocationName?*_outLocationName:outLocationNameI;
  651. RemoteFilename rfn;
  652. partDesc.getFilename(0, rfn);
  653. StringBuffer primaryName;
  654. rfn.getPath(primaryName);
  655. if (direct)
  656. {
  657. if (0 == outLocationName.length())
  658. outLocationName.append(primaryName.str());
  659. }
  660. else
  661. {
  662. // use temp name
  663. GetTempName(outLocationName, "partial");
  664. if (rfn.isLocal())
  665. { // ensure local tmp in same directory as target
  666. StringBuffer dir;
  667. splitDirTail(primaryName, dir);
  668. addPathSepChar(dir);
  669. dir.append(pathTail(outLocationName));
  670. outLocationName.swapWith(dir);
  671. }
  672. assertex(outLocationName.length());
  673. ensureDirectoryForFile(outLocationName.str());
  674. }
  675. OwnedIFile file = createIFile(outLocationName.str());
  676. Owned<IFileIO> fileio;
  677. if (compress)
  678. {
  679. fileio.setown(createCompressedFileWriter(file, recordSize, extend, true, ecomp));
  680. if (!fileio)
  681. {
  682. compress = false;
  683. Owned<IThorException> e = MakeActivityWarning(activity, TE_LargeBufferWarning, "Could not write file '%s' compressed", outLocationName.str());
  684. activity->fireException(e);
  685. fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
  686. }
  687. }
  688. else
  689. fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
  690. if (!fileio)
  691. throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
  692. ActPrintLog(activity, "Writing to file: %s", file->queryFilename());
  693. return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, direct, renameToPrimary, aborted);
  694. }
  695. StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath)
  696. {
  697. unsigned location;
  698. OwnedIFile ifile;
  699. if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(activity, partDesc, ifile, location, filePath):getBestFilePart(activity, partDesc, ifile, location, filePath, activity))
  700. ActPrintLog(activity, "reading physical file '%s' (logical file = %s)", filePath.str(), logicalFilename);
  701. else
  702. {
  703. StringBuffer locations;
  704. IException *e = MakeActivityException(activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename, getFilePartLocations(partDesc, locations).str(), GetLastError());
  705. ActPrintLog(&activity->queryContainer(), e, NULL);
  706. throw e;
  707. }
  708. return filePath;
  709. }
  710. IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped)
  711. {
  712. class CSeqPartHandler : public CSimpleInterface, implements IRowStream
  713. {
  714. IArrayOf<IPartDescriptor> &partDescs;
  715. int part, parts;
  716. bool eof, grouped, someInGroup;
  717. Linked<CPartHandler> partHandler;
  718. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  719. public:
  720. CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped)
  721. : partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped)
  722. {
  723. part = 0;
  724. parts = partDescs.ordinality();
  725. someInGroup = false;
  726. if (0==parts)
  727. {
  728. eof = true;
  729. }
  730. else
  731. {
  732. eof = false;
  733. partHandler->setPart(&partDescs.item(0), 0);
  734. }
  735. }
  736. virtual void stop()
  737. {
  738. if (partHandler)
  739. {
  740. partHandler->stop();
  741. partHandler.clear();
  742. }
  743. }
  744. const void *nextRow()
  745. {
  746. if (eof)
  747. {
  748. return NULL;
  749. }
  750. loop
  751. {
  752. OwnedConstThorRow row = partHandler->nextRow();
  753. if (row)
  754. {
  755. someInGroup = true;
  756. return row.getClear();
  757. }
  758. if (grouped && someInGroup)
  759. {
  760. someInGroup = false;
  761. return NULL;
  762. }
  763. partHandler->stop();
  764. ++part;
  765. if (part >= parts)
  766. {
  767. partHandler.clear();
  768. eof = true;
  769. return NULL;
  770. }
  771. partHandler->setPart(&partDescs.item(part), part);
  772. }
  773. }
  774. };
  775. return new CSeqPartHandler(partHandler, partDescs, grouped);
  776. }
  777. // CThorRowAggregator impl.
  778. AggregateRowBuilder &CThorRowAggregator::addRow(const void * row)
  779. {
  780. return RowAggregator::addRow(row);
  781. }
  782. void CThorRowAggregator::mergeElement(const void * otherElement)
  783. {
  784. RowAggregator::mergeElement(otherElement);
  785. }