dfurepl.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include "thirdparty.h"
  15. #include <stdio.h>
  16. #include <limits.h>
  17. #include "jexcept.hpp"
  18. #include "jptree.hpp"
  19. #include "jsocket.hpp"
  20. #include "jstring.hpp"
  21. #include "jmisc.hpp"
  22. #include "jprop.hpp"
  23. #include "jio.hpp"
  24. #include "dadfs.hpp"
  25. #include "danqs.hpp"
  26. #include "dautils.hpp"
  27. #include "dasds.hpp"
  28. #include "dalienv.hpp"
  29. #include "mpcomm.hpp"
  30. #include "mplog.hpp"
  31. #include "rmtfile.hpp"
  32. #include "dfuerror.hpp"
  33. #include "dfurepl.hpp"
  34. #define LOGPFX "REPLICATE: "
  35. #define SDS_TIMEOUT (60*60*1000) // 1hr
  36. #define MAX_REPLICATING_THREADS 20
  37. struct ReplicatePartItem;
  38. struct ReplicateFileItem;
  39. class CReplicateServer;
  40. enum ReplicatePartCopyState
  41. {
  42. RPCS_exists,
  43. RPCS_missing,
  44. RPCS_tempcopied,
  45. RPCS_failed,
  46. RPCS_failedtempcopy
  47. };
  48. struct ReplicatePartCopyItem: extends CInterface
  49. {
  50. Owned<IFile> file;
  51. RemoteFilename tmpname;
  52. ReplicatePartCopyState state;
  53. ReplicatePartCopyItem *from;
  54. ReplicatePartItem &parent;
  55. ReplicatePartCopyItem(ReplicatePartItem &parent,IFile *_file,unsigned pn);
  56. bool startCopy(ReplicatePartCopyItem &_from) // returns true if not done in one
  57. {
  58. from = &_from;
  59. return !doneCopy(0);
  60. }
  61. bool doneCopy(unsigned timeout);
  62. void commit(bool rollback)
  63. {
  64. if (state==RPCS_failedtempcopy)
  65. rollback = true;
  66. else if (state!=RPCS_tempcopied)
  67. return;
  68. Owned<IFile> tmpfile;
  69. try { // failures don't affect other parts
  70. tmpfile.setown(createIFile(tmpname));
  71. if (!rollback&&tmpfile.get()) {
  72. tmpfile->rename(pathTail(file->queryFilename()));
  73. return;
  74. }
  75. }
  76. catch (IException *e) {
  77. StringBuffer s(LOGPFX "Error renaming temp file ");
  78. tmpname.getRemotePath(s);
  79. s.append(" to ").append(pathTail(file->queryFilename()));
  80. EXCLOG(e,s.str());
  81. e->Release();
  82. }
  83. if (tmpfile) { // failed/rollback
  84. try { // failures don't affect other parts
  85. tmpfile->remove();
  86. }
  87. catch (IException *e) {
  88. // suppress delete errors on rollback (already reported elsewhere)
  89. e->Release();
  90. }
  91. }
  92. }
  93. };
  94. struct ReplicatePartItem: extends CInterface
  95. {
  96. CIArrayOf<ReplicatePartCopyItem> copies;
  97. ReplicateFileItem &parent;
  98. ReplicatePartItem(ReplicateFileItem &parent,IDistributedFilePart &part);
  99. unsigned startReplicate()
  100. {
  101. unsigned ret = 0;
  102. UnsignedArray src; // bit OTT when probably only 2 but you never know
  103. UnsignedArray dst;
  104. ForEachItemIn(i1,copies) {
  105. switch(copies.item(i1).state) {
  106. case RPCS_exists:
  107. src.append(i1);
  108. break;
  109. case RPCS_missing:
  110. dst.append(i1);
  111. break;
  112. }
  113. }
  114. unsigned i3=0;
  115. ForEachItemIn(i2,dst) {
  116. ReplicatePartCopyItem &dstcopy = copies.item(dst.item(i2));
  117. if (i3>=src.ordinality()) {
  118. if (i3==0) {
  119. OERRLOG(LOGPFX "Cannot find copy for %s",dstcopy.file->queryFilename());
  120. dstcopy.state = RPCS_failed;
  121. }
  122. else
  123. i3 = 0;
  124. }
  125. if (dstcopy.startCopy(copies.item(src.item(i3))))
  126. ret++;
  127. }
  128. return ret;
  129. }
  130. void waitReplicate()
  131. {
  132. for (;;) {
  133. bool alldone=true;
  134. ForEachItemIn(i1,copies) {
  135. if (!copies.item(i1).doneCopy(1000*60))
  136. alldone = false;
  137. }
  138. if (alldone)
  139. break;
  140. Sleep(1000); // actually not really needed as doneCopy will delay (but JIC)
  141. }
  142. }
  143. void commit(bool rollback)
  144. {
  145. ForEachItemIn(i1,copies) {
  146. copies.item(i1).commit(rollback);
  147. }
  148. }
  149. };
  150. struct ReplicateFileItem: extends CInterface
  151. {
  152. CDfsLogicalFileName dlfn;
  153. Linked<IUserDescriptor> userdesc;
  154. Owned<IRemoteConnection> conn;
  155. CReplicateServer &parent;
  156. CDateTime filedt; // used to validate same file
  157. ReplicateFileItem(CReplicateServer &_parent);
  158. StringAttr uuid;
  159. CIArrayOf<ReplicatePartItem> parts;
  160. Semaphore sem;
  161. CriticalSection sect;
  162. bool stopping;
  163. class cThread: public Thread
  164. {
  165. ReplicateFileItem &parent;
  166. public:
  167. cThread(ReplicateFileItem &_parent)
  168. : Thread("ReplicateFileThread"), parent(_parent)
  169. {
  170. }
  171. int run()
  172. {
  173. try {
  174. parent.run();
  175. }
  176. catch (IException *e) {
  177. EXCLOG(e,LOGPFX "Replicate thread error(1)");
  178. e->Release();
  179. }
  180. try {
  181. parent.done();
  182. }
  183. catch (IException *e) {
  184. EXCLOG(e,LOGPFX "Replicate thread error(2)");
  185. e->Release();
  186. }
  187. return 1;
  188. }
  189. } thread;
  190. ReplicateFileItem(CReplicateServer &_parent,CDfsLogicalFileName _dlfn,IUserDescriptor *_userdesc,CDateTime _filedt)
  191. : thread(*this), parent(_parent), dlfn(_dlfn), userdesc(_userdesc), filedt(_filedt)
  192. {
  193. stopping = true; // set false once started
  194. }
  195. ~ReplicateFileItem()
  196. {
  197. stop();
  198. }
  199. void start()
  200. {
  201. CriticalBlock block(sect);
  202. stopping = false;
  203. thread.start();
  204. }
  205. void stop()
  206. {
  207. {
  208. CriticalBlock block(sect);
  209. if (stopping)
  210. return;
  211. stopping = true;
  212. sem.signal();
  213. }
  214. thread.join();
  215. }
  216. void done();
  217. void run()
  218. {
  219. StringBuffer tmp;
  220. const char *lfn = dlfn.get();
  221. if (dlfn.isExternal()) {
  222. OERRLOG(LOGPFX "Cannot replicate external file %s",lfn);
  223. return;
  224. }
  225. if (dlfn.isForeign()) {
  226. OERRLOG(LOGPFX "Cannot replicate foreign file %s",lfn);
  227. return;
  228. }
  229. Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(dlfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser);
  230. if (!dfile) {
  231. UWARNLOG(LOGPFX "Cannot find file %s, perhaps deleted",lfn);
  232. return;
  233. }
  234. CDateTime dt;
  235. dfile->getModificationTime(dt); // check not modified while queued
  236. if (!filedt.equals(dt)) {
  237. dt.getString(filedt.getString(tmp.clear()).append(','));
  238. UWARNLOG(LOGPFX "File %s changed (%s), ignoring replicate",lfn,tmp.str());
  239. return;
  240. }
  241. // see if already replicating
  242. Owned<IRemoteConnection> pconn = querySDS().connect("DFU/Replicating", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_TIMEOUT);
  243. if (!pconn.get()) {
  244. OERRLOG(LOGPFX "Connect to DFU/Replicating %s failed",lfn);
  245. return;
  246. }
  247. StringBuffer xpath;
  248. xpath.appendf("File[@name=\"%s\"]",lfn);
  249. if (pconn->queryRoot()->hasProp(xpath.str())) {
  250. // read date
  251. xpath.append("/@filedt");
  252. pconn->queryRoot()->getProp(xpath.str(),tmp.clear());
  253. dt.setDateString(tmp.str());
  254. if (filedt.equals(dt)) {
  255. UWARNLOG(LOGPFX "Already replicating %s, ignoring",lfn);
  256. return;
  257. }
  258. else
  259. UWARNLOG(LOGPFX "Already replicating %s, contining",lfn);
  260. }
  261. // now as long as SDS doesn't lock children this should be OK
  262. conn.setown(querySDS().connect("DFU/Replicating/File", myProcessSession(), RTM_CREATE_ADD | RTM_LOCK_READ, 5*60*1000));
  263. if (!conn.get()) {
  264. OERRLOG(LOGPFX "Create of DFU/Replicating/File %s failed",lfn);
  265. return;
  266. }
  267. genUUID(tmp.clear(),true); // true for windows
  268. uuid.set(tmp.str());
  269. IPropertyTree &root = *conn->queryRoot();
  270. root.setProp("@name",lfn);
  271. StringBuffer node;
  272. queryMyNode()->endpoint().getIpText(node);
  273. root.setProp("@node",node.str());
  274. root.setPropInt("@mpport",queryMyNode()->endpoint().port);
  275. dt.setNow();
  276. root.setProp("@started",dt.getString(tmp.clear()).str());
  277. root.setProp("@filedt",filedt.getString(tmp.clear()).str());
  278. root.setProp("@uuid",uuid.get());
  279. root.setPropInt64("@session",myProcessSession());
  280. conn->commit();
  281. pconn.clear(); // don't need parent now
  282. Owned<IDistributedFilePartIterator> piter = dfile->getIterator();
  283. ForEach(*piter) {
  284. parts.append(*new ReplicatePartItem(*this,piter->query()));
  285. }
  286. // now free file and get to work
  287. dfile.clear();
  288. PROGLOG(LOGPFX "Starting replicate of %s",lfn);
  289. unsigned numtoreplicate = 0;
  290. ForEachItemIn(i1,parts) {
  291. numtoreplicate += parts.item(i1).startReplicate();
  292. }
  293. if (numtoreplicate==0) {
  294. PROGLOG(LOGPFX "Nothing to do for %s",lfn);
  295. return;
  296. }
  297. PROGLOG(LOGPFX "Started replicate of %s (%d parts left to copy)",lfn,numtoreplicate);
  298. ForEachItemIn(i2,parts) {
  299. parts.item(i2).waitReplicate();
  300. }
  301. bool abort = true;
  302. try {
  303. dfile.setown(queryDistributedFileDirectory().lookup(dlfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser));
  304. if (dfile) {
  305. CDateTime newfiledt;
  306. dfile->getModificationTime(newfiledt);
  307. if (filedt.equals(newfiledt))
  308. abort = false;
  309. else {
  310. newfiledt.getString(filedt.getString(tmp.clear()).append(','));
  311. UWARNLOG(LOGPFX "File %s changed (%s)",lfn,tmp.str());
  312. }
  313. }
  314. else
  315. UWARNLOG(LOGPFX "Cannot find file %s, perhaps deleted",lfn);
  316. }
  317. catch (IException *e) { // actually don't expect (unless maybe dali down or something)
  318. EXCLOG(e,LOGPFX "Replicate error(3)");
  319. e->Release();
  320. }
  321. PROGLOG(LOGPFX "%s parts copy for %s, start temp file %s",abort?"Aborting":"Finalizing",lfn,abort?"delete":"rename");
  322. ForEachItemIn(i3,parts) {
  323. parts.item(i3).commit(abort);
  324. }
  325. PROGLOG(LOGPFX "%s replicate of %s",abort?"Aborted":"Finished",lfn);
  326. }
  327. };
  328. ReplicatePartItem::ReplicatePartItem(ReplicateFileItem &_parent,IDistributedFilePart &part)
  329. : parent(_parent)
  330. {
  331. for (unsigned c=0;c<part.numCopies();c++) {
  332. RemoteFilename rfn;
  333. part.getFilename(rfn,c);
  334. try {
  335. IFile *file = createIFile(rfn);
  336. copies.append(*new ReplicatePartCopyItem(*this,file,part.getPartIndex()+1));
  337. }
  338. catch (IException *e) { // not sure if possible but in case
  339. StringBuffer s(LOGPFX "Error replicating part ");
  340. rfn.getRemotePath(s);
  341. EXCLOG(e,s.str());
  342. e->Release();
  343. }
  344. }
  345. }
  346. ReplicatePartCopyItem::ReplicatePartCopyItem(ReplicatePartItem &_parent,IFile *_file,unsigned pn)
  347. : parent(_parent),file(_file)
  348. {
  349. from = NULL;
  350. try {
  351. if (file->exists())
  352. state = RPCS_exists;
  353. else {
  354. state = RPCS_missing;
  355. StringBuffer dst;
  356. splitDirTail(file->queryFilename(),dst);
  357. addPathSepChar(dst).append("R_").append(parent.parent.uuid).append('_').append(pn).append(".tmp");
  358. tmpname.setRemotePath(dst.str());
  359. }
  360. }
  361. catch (IException *e) {
  362. state = RPCS_failed;
  363. StringBuffer s(LOGPFX "calling exists ");
  364. s.append(file->queryFilename());
  365. EXCLOG(e,s.str());
  366. e->Release();
  367. }
  368. }
  369. bool ReplicatePartCopyItem::doneCopy(unsigned timeout)
  370. {
  371. if (state==RPCS_missing) {
  372. try {
  373. if (asyncCopyFileSection(
  374. parent.parent.uuid,
  375. from->file,
  376. tmpname,
  377. (offset_t)-1, // creates file
  378. 0,
  379. (offset_t)-1, // all file
  380. NULL, // no progress needed (yet)
  381. timeout,
  382. CFflush_rdwr))
  383. state = RPCS_tempcopied; // done
  384. }
  385. catch (IException *e) {
  386. EXCLOG(e,LOGPFX "Replicate part copy error");
  387. e->Release();
  388. state = RPCS_failedtempcopy; // tmpfile will be deleted later
  389. }
  390. }
  391. return state!=RPCS_missing;
  392. }
  393. class CReplicateServer: public CInterface, implements IThreaded, implements IReplicateServer
  394. {
  395. CriticalSection runningsect;
  396. Semaphore runningsem;
  397. CIArrayOf<ReplicateFileItem> running;
  398. Owned<IQueueChannel> qchannel;
  399. bool stopping;
  400. Owned<CThreaded> thread;
  401. StringAttr qname;
  402. public:
  403. IMPLEMENT_IINTERFACE;
  404. CReplicateServer(const char *_qname)
  405. : qname(_qname)
  406. {
  407. runningsem.signal(MAX_REPLICATING_THREADS);
  408. }
  409. void replicateFile(const char *lfn,IUserDescriptor *userdesc, CDateTime &filedt)
  410. {
  411. CDfsLogicalFileName dlfn;
  412. dlfn.set(lfn);
  413. while (!runningsem.wait(1000*60))
  414. PROGLOG(LOGPFX "too many replications active, waiting");
  415. CriticalBlock block(runningsect);
  416. ReplicateFileItem &fitem = *new ReplicateFileItem(*this,dlfn,userdesc,filedt);
  417. running.append(fitem);
  418. fitem.start();
  419. }
  420. void done(ReplicateFileItem *item)
  421. {
  422. CriticalBlock block(runningsect);
  423. if (running.zap(*item))
  424. runningsem.signal();
  425. }
  426. void runServer()
  427. {
  428. stopping = false;
  429. thread.setown(new CThreaded("ReplicateServerThread"));
  430. thread->init(this);
  431. }
  432. virtual void threadmain() override
  433. {
  434. Owned<INamedQueueConnection> qconn = createNamedQueueConnection(0);
  435. qchannel.setown(qconn->open(qname));
  436. MemoryBuffer mb;
  437. while (!stopping) {
  438. try {
  439. PROGLOG(LOGPFX "Waiting on queue %s", qname.get());
  440. qchannel->get(mb.clear());
  441. byte fn;
  442. if (stopping||(mb.length()==0))
  443. fn = DRQ_STOP;
  444. else
  445. mb.read(fn);
  446. switch (fn) {
  447. case DRQ_STOP:
  448. stopping = true;
  449. break;
  450. case DRQ_REPLICATE: {
  451. StringAttr lfn;
  452. mb.read(lfn);
  453. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  454. userdesc->deserialize(mb);
  455. CDateTime filedt;
  456. filedt.deserialize(mb);
  457. replicateFile(lfn,userdesc,filedt);
  458. }
  459. break;
  460. }
  461. }
  462. catch (IException *e) {
  463. EXCLOG(e,LOGPFX "Server thread(1)"); // exit DFU server? (e.g. Dali down)
  464. return;
  465. }
  466. }
  467. qchannel.clear();
  468. }
  469. void stopServer()
  470. {
  471. if (!stopping&&qchannel) {
  472. stopping = true;
  473. qchannel->cancelGet();
  474. }
  475. if (thread)
  476. thread->join();
  477. }
  478. };
  479. void ReplicateFileItem::done()
  480. {
  481. parent.done(this);
  482. }
  483. IReplicateServer *createReplicateServer(const char *qname)
  484. {
  485. return new CReplicateServer(qname);
  486. }