dfuutil.cpp 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include "jexcept.hpp"
  15. #include "jptree.hpp"
  16. #include "jsocket.hpp"
  17. #include "jstring.hpp"
  18. #include "jmisc.hpp"
  19. #include "jregexp.hpp"
  20. #include "jprop.hpp"
  21. #include "daclient.hpp"
  22. #include "dadfs.hpp"
  23. #include "dasds.hpp"
  24. #include "dfuerror.hpp"
  25. #include "dautils.hpp"
  26. #include "dalienv.hpp"
  27. #include "rmtfile.hpp"
  28. #include "dfuutil.hpp"
  29. // savemap
  30. // superkey functions
  31. // (logical) directory functions
  32. #define PHYSICAL_COPY_POLL_TIME 10000 // for aborting
  33. #ifdef _DEBUG
  34. //#define LOG_PART_COPY
  35. #endif
  36. static bool physicalPartCopy(IFile *from,const char *tofile, Owned<IException> &exc, StringBuffer *tmpname)
  37. {
  38. StringBuffer tmpnamestr;
  39. if (!tmpname)
  40. tmpname = &tmpnamestr;
  41. tmpname->append(tofile).append("__");
  42. size32_t l = tmpname->length();
  43. genUUID(*tmpname,true); // true for windows
  44. StringAttr uuid(tmpname->str()+l);
  45. tmpname->append(".tmp");
  46. RemoteFilename tmpfn;
  47. tmpfn.setRemotePath(tmpname->str());
  48. //unsigned lastpc;
  49. #ifdef LOG_PART_COPY
  50. PROGLOG("start physicalPartCopy(%s,%s)",from->queryFilename(),tmpname->str());
  51. #endif
  52. try {
  53. recursiveCreateDirectoryForFile(tmpname->str());
  54. while(!asyncCopyFileSection(
  55. uuid,
  56. from,
  57. tmpfn,
  58. (offset_t)-1, // creates file
  59. 0,
  60. (offset_t)-1, // all file
  61. NULL,
  62. PHYSICAL_COPY_POLL_TIME)) {
  63. // Abort check TBD
  64. }
  65. }
  66. catch (IException *e) {
  67. EXCLOG(e,"SingleFileCopy: File copy error");
  68. if (exc)
  69. exc.setown(e);
  70. else
  71. e->Release();
  72. }
  73. Owned<IFile> f = createIFile(tmpfn);
  74. if (!exc.get()&&(tmpnamestr.length()!=0)) {
  75. try {
  76. #ifdef LOG_PART_COPY
  77. PROGLOG("physicalPartCopy rename(%s,%s)",tmpname->str(),pathTail(tofile));
  78. #endif
  79. f->rename(pathTail(tofile));
  80. }
  81. catch (IException *e) {
  82. EXCLOG(e,"SingleFileCopy: File rename error");
  83. if (exc)
  84. exc.setown(e);
  85. else
  86. e->Release();
  87. }
  88. }
  89. if (exc.get()) {
  90. try {
  91. f->remove();
  92. }
  93. catch (IException *e) {
  94. // ignore
  95. e->Release();
  96. }
  97. }
  98. #ifdef LOG_PART_COPY
  99. PROGLOG("done physicalPartCopy %s",(exc.get()==NULL)?"OK":"Failed");
  100. #endif
  101. return exc.get()==NULL;
  102. }
  103. static bool getFileInfo(RemoteFilename &fn, Owned<IFile> &f, offset_t &size,CDateTime &modtime)
  104. {
  105. f.setown(createIFile(fn));
  106. bool isdir = false;
  107. bool ret = f->getInfo(isdir,size,modtime);
  108. if (ret&&isdir) {
  109. StringBuffer fs;
  110. fn.getRemotePath(fs);
  111. throw MakeStringException(-1,"%s is a directory",fs.str());
  112. }
  113. return ret;
  114. }
  115. class CFileCloner
  116. {
  117. public:
  118. StringAttr nameprefix;
  119. Owned<INode> foreigndalinode;
  120. Linked<IUserDescriptor> userdesc;
  121. Linked<IUserDescriptor> foreignuserdesc;
  122. StringAttr srcCluster;
  123. StringAttr cluster1;
  124. StringAttr prefix;
  125. Owned<IGroup> grp1;
  126. ClusterPartDiskMapSpec spec1;
  127. StringAttr cluster2;
  128. Owned<IGroup> grp2;
  129. ClusterPartDiskMapSpec spec2;
  130. unsigned overwriteFlags;
  131. IDistributedFileDirectory *fdir;
  132. bool repeattlk;
  133. bool copyphysical;
  134. unsigned level;
  135. CFileCloner() : overwriteFlags(0) {}
  136. void physicalCopyFile(IFileDescriptor *srcdesc,IFileDescriptor *dstdesc)
  137. {
  138. CriticalSection crit;
  139. Semaphore sem(10); // parallel local
  140. StringArray tmpnames;
  141. StringArray dstnames;
  142. Owned<IException> exc;
  143. class casyncfor: public CAsyncFor
  144. {
  145. CriticalSection &crit;
  146. Semaphore &sem;
  147. IFileDescriptor *srcdesc;
  148. IFileDescriptor *dstdesc;
  149. StringArray &tmpnames;
  150. StringArray &dstnames;
  151. public:
  152. Owned<IException> &exc;
  153. casyncfor(CriticalSection &_crit,Semaphore &_sem,IFileDescriptor *_srcdesc,IFileDescriptor *_dstdesc,Owned<IException> &_exc, StringArray &_tmpnames, StringArray &_dstnames)
  154. : crit(_crit), sem(_sem), exc(_exc), tmpnames(_tmpnames), dstnames(_dstnames)
  155. {
  156. srcdesc = _srcdesc;
  157. dstdesc = _dstdesc;
  158. }
  159. void Do(unsigned pn)
  160. {
  161. CriticalBlock block(crit);
  162. RemoteFilename srcfn;
  163. Owned<IFile> srcf;
  164. IPartDescriptor &srcpart = *srcdesc->queryPart(pn);
  165. bool got = false;
  166. for (unsigned cpy = 0; cpy<srcpart.numCopies(); cpy++) {
  167. srcpart.getFilename(cpy,srcfn);
  168. try {
  169. CriticalUnblock unblock(crit);
  170. sem.wait();
  171. offset_t srcsize;
  172. CDateTime srcmodtime;
  173. got = getFileInfo(srcfn,srcf,srcsize,srcmodtime);
  174. sem.signal();
  175. }
  176. catch (IException *e) {
  177. EXCLOG(e,"copyLogicalFile open source part");
  178. if (exc.get())
  179. e->Release();
  180. else
  181. exc.setown(e);
  182. sem.signal();
  183. return;
  184. }
  185. if (got)
  186. break;
  187. }
  188. if (!got) {
  189. ERRLOG("copyLogicalFile: part %d missing any copies",pn+1);
  190. if (!exc.get())
  191. exc.setown(MakeStringException(-1,"copyLogicalFile: part %d missing any copies",pn+1));
  192. return;
  193. }
  194. RemoteFilename dstfn;
  195. dstdesc->queryPart(pn)->getFilename(0,dstfn);
  196. StringBuffer dstfs;
  197. dstfn.getRemotePath(dstfs);
  198. Owned<IException> ce;
  199. int ret;
  200. StringBuffer tmpname;
  201. {
  202. CriticalUnblock unblock(crit);
  203. ret = physicalPartCopy(srcf,dstfs.str(), ce, &tmpname);
  204. }
  205. if (!ret) {
  206. if (ce.get()&&!exc.get())
  207. exc.setown(ce.getClear());
  208. return;
  209. }
  210. dstnames.append(dstfs.str());
  211. tmpnames.append(tmpname.str());
  212. }
  213. } afor(crit,sem,srcdesc,dstdesc,exc,tmpnames,dstnames);
  214. afor.For(srcdesc->numParts(),100,false,true);
  215. if (!exc.get()) {
  216. class casyncfor2: public CAsyncFor
  217. {
  218. CriticalSection &crit;
  219. Semaphore &sem;
  220. StringArray &tmpnames;
  221. StringArray &dstnames;
  222. public:
  223. Owned<IException> &exc;
  224. casyncfor2(CriticalSection &_crit,Semaphore &_sem,StringArray &_tmpnames,StringArray &_dstnames,Owned<IException> &_exc)
  225. : crit(_crit), sem(_sem), tmpnames(_tmpnames), dstnames(_dstnames), exc(_exc)
  226. {
  227. }
  228. void Do(unsigned pn)
  229. {
  230. CriticalBlock block(crit);
  231. if (exc.get())
  232. return;
  233. RemoteFilename rfn;
  234. rfn.setRemotePath(tmpnames.item(pn));
  235. Owned<IFile> f = createIFile(rfn);
  236. try {
  237. CriticalUnblock unblock(crit);
  238. sem.wait();
  239. #ifdef LOG_PART_COPY
  240. PROGLOG("physicalCopyFile rename(%s,%s)",tmpnames.item(pn),dstnames.item(pn));
  241. #endif
  242. f->rename(pathTail(dstnames.item(pn)));
  243. sem.signal();
  244. }
  245. catch (IException *e) {
  246. EXCLOG(e,"physicalCopyFile: File rename error");
  247. if (!exc.get())
  248. exc.setown(e);
  249. else
  250. e->Release();
  251. sem.signal();
  252. return;
  253. }
  254. tmpnames.replace(dstnames.item(pn),pn); // so as will get cleared up
  255. }
  256. } afor2(crit,sem,tmpnames,dstnames,exc);
  257. afor2.For(srcdesc->numParts(),10,false,true);
  258. }
  259. if (exc.get()) {
  260. ForEachItemIn(i2,tmpnames) {
  261. RemoteFilename rfn;
  262. rfn.setRemotePath(tmpnames.item(i2));
  263. Owned<IFile> f = createIFile(rfn);
  264. try {
  265. f->remove();
  266. }
  267. catch (IException *e) {
  268. // ignore
  269. e->Release();
  270. }
  271. }
  272. throw exc.getClear();
  273. }
  274. }
  275. void physicalReplicateFile(IFileDescriptor *desc,const char *filename) // name already has prefix added
  276. {
  277. // now replicate
  278. CriticalSection crit;
  279. Semaphore sem(10); // parallel local
  280. class casyncfor2: public CAsyncFor
  281. {
  282. CriticalSection &crit;
  283. Semaphore &sem;
  284. IFileDescriptor *desc;
  285. const char * filename;
  286. public:
  287. Owned<IException> exc;
  288. casyncfor2(CriticalSection &_crit,Semaphore &_sem,IFileDescriptor *_desc, const char *_filename)
  289. : crit(_crit), sem(_sem)
  290. {
  291. desc = _desc;
  292. filename = _filename;
  293. }
  294. void Do(unsigned pn)
  295. {
  296. CriticalBlock block(crit);
  297. if (exc)
  298. return;
  299. // first find part that exists
  300. RemoteFilename srcfn;
  301. Owned<IFile> srcf;
  302. IPartDescriptor &part = *desc->queryPart(pn);
  303. unsigned nc = part.numCopies();
  304. unsigned cpy = 0;
  305. offset_t srcsize;
  306. CDateTime srcmodtime;
  307. for (; cpy<nc; cpy++) {
  308. part.getFilename(cpy,srcfn);
  309. bool got= false;
  310. try {
  311. CriticalUnblock unblock(crit);
  312. sem.wait();
  313. got = getFileInfo(srcfn,srcf,srcsize,srcmodtime);
  314. sem.signal();
  315. }
  316. catch (IException *e) {
  317. EXCLOG(e,"replicateLogicalFile open source");
  318. if (exc.get())
  319. e->Release();
  320. else
  321. exc.setown(e);
  322. sem.signal();
  323. }
  324. if (got)
  325. break;
  326. }
  327. if (cpy>=nc) {
  328. ERRLOG("replicateLogicalFile: %s part %d missing any copies",filename,pn+1);
  329. if (!exc.get())
  330. exc.setown(MakeStringException(-1,"replicateLogicalFile: %s part %d missing any copies",filename,pn+1));
  331. return;
  332. }
  333. for (unsigned dstcpy = 0; dstcpy<part.numCopies(); dstcpy++)
  334. if (dstcpy!=cpy) {
  335. RemoteFilename dstfn;
  336. part.getFilename(dstcpy,dstfn);
  337. Owned<IFile> dstf;
  338. bool got = false;
  339. try {
  340. CriticalUnblock unblock(crit);
  341. sem.wait();
  342. offset_t dstsize;
  343. CDateTime dstmodtime;
  344. got = getFileInfo(dstfn,dstf,dstsize,dstmodtime);
  345. // check size/date TBD
  346. sem.signal();
  347. if (got&&(srcsize==dstsize)&&srcmodtime.equals(dstmodtime))
  348. continue;
  349. }
  350. catch (IException *e) {
  351. EXCLOG(e,"cloneLogicalFile open dest");
  352. if (exc.get())
  353. e->Release();
  354. else
  355. exc.setown(e);
  356. sem.signal();
  357. return;
  358. }
  359. StringBuffer dstfs;
  360. dstfn.getRemotePath(dstfs);
  361. Owned<IException> ce;
  362. bool ok;
  363. {
  364. CriticalUnblock unblock(crit);
  365. ok = physicalPartCopy(srcf,dstfs.str(), ce, NULL);
  366. }
  367. if (!ok) {
  368. if (ce.get()&&!exc.get())
  369. exc.setown(ce.getClear());
  370. return;
  371. }
  372. }
  373. }
  374. } afor2(crit,sem,desc,filename);
  375. afor2.For(desc->numParts(),100,false,true);
  376. if (afor2.exc.get())
  377. throw afor2.exc.getClear();
  378. }
  379. void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
  380. {
  381. DBGLOG("updateCloneFrom %s", lfn);
  382. if (!srcdali || srcdali->endpoint().isNull())
  383. attrs.setProp("@cloneFromPeerCluster", srcCluster);
  384. else
  385. {
  386. StringBuffer s;
  387. attrs.setProp("@cloneFrom", srcdali->endpoint().getUrlStr(s).str());
  388. attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
  389. if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources
  390. attrs.setProp("@cloneFromPeerCluster", "-");
  391. if (prefix.length())
  392. attrs.setProp("@cloneFromPrefix", prefix.get());
  393. while(attrs.removeProp("cloneFromGroup"));
  394. unsigned numClusters = srcfdesc->numClusters();
  395. for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
  396. {
  397. StringBuffer sourceGroup;
  398. srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
  399. if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
  400. continue;
  401. Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
  402. groupInfo->setProp("@groupName", sourceGroup);
  403. ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
  404. spec.toProp(groupInfo);
  405. attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
  406. }
  407. }
  408. }
  409. void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
  410. {
  411. DistributedFilePropertyLock lock(dfile);
  412. IPropertyTree &attrs = lock.queryAttributes();
  413. updateCloneFrom(dfile->queryLogicalName(), attrs, srcfdesc, srcdali, srcCluster);
  414. }
  415. void updateCloneFrom(const char *lfn, IFileDescriptor *dstfdesc, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
  416. {
  417. updateCloneFrom(lfn, dstfdesc->queryProperties(), srcfdesc, srcdali, srcCluster);
  418. }
  419. void cloneSubFile(IPropertyTree *ftree,const char *destfilename, INode *srcdali, const char *srcCluster) // name already has prefix added
  420. {
  421. DBGLOG("cloneSubFile %s", destfilename);
  422. Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
  423. const char * kind = srcfdesc->queryProperties().queryProp("@kind");
  424. bool iskey = kind&&(strcmp(kind,"key")==0);
  425. Owned<IFileDescriptor> dstfdesc = createFileDescriptor(srcfdesc->getProperties());
  426. if (!nameprefix.isEmpty())
  427. dstfdesc->queryProperties().setProp("@roxiePrefix", nameprefix.get());
  428. if (!copyphysical)
  429. dstfdesc->queryProperties().setPropInt("@lazy", 1);
  430. // calculate dest dir
  431. CDfsLogicalFileName dstlfn;
  432. if (!dstlfn.setValidate(destfilename,true))
  433. throw MakeStringException(-1,"Logical name %s invalid",destfilename);
  434. ClusterPartDiskMapSpec spec = spec1;
  435. if (iskey&&repeattlk)
  436. spec.setRepeatedCopies(CPDMSRP_lastRepeated,false);
  437. StringBuffer dstpartmask;
  438. getPartMask(dstpartmask,destfilename,srcfdesc->numParts());
  439. dstfdesc->setPartMask(dstpartmask.str());
  440. unsigned np = srcfdesc->numParts();
  441. dstfdesc->setNumParts(srcfdesc->numParts());
  442. DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
  443. StringBuffer dir;
  444. StringBuffer dstdir;
  445. makePhysicalPartName(dstlfn.get(),0,0,dstdir,false,os,spec.defaultBaseDir.get());
  446. dstfdesc->setDefaultDir(dstdir.str());
  447. dstfdesc->addCluster(cluster1,grp1,spec);
  448. if (iskey&&!cluster2.isEmpty())
  449. dstfdesc->addCluster(cluster2,grp2,spec2);
  450. for (unsigned pn=0;pn<srcfdesc->numParts();pn++) {
  451. offset_t sz = srcfdesc->queryPart(pn)->queryProperties().getPropInt64("@size",-1);
  452. if (sz!=(offset_t)-1)
  453. dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz);
  454. StringBuffer dates;
  455. if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates))
  456. dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str());
  457. }
  458. if (copyphysical) {
  459. DBGLOG("copyphysical dst=%s", destfilename);
  460. physicalCopyFile(srcfdesc,dstfdesc);
  461. physicalReplicateFile(dstfdesc,destfilename);
  462. }
  463. updateCloneFrom(destfilename, dstfdesc, srcfdesc, srcdali, srcCluster);
  464. Owned<IDistributedFile> dstfile = fdir->createNew(dstfdesc);
  465. dstfile->attach(destfilename,userdesc);
  466. }
  467. void addCluster(IPropertyTree *ftree,const char *destfilename)
  468. {
  469. CDfsLogicalFileName dstlfn;
  470. if (!dstlfn.setValidate(destfilename,true))
  471. throw MakeStringException(-1,"Logical name %s invalid",destfilename);
  472. Owned<IDistributedFile> dfile = fdir->lookup(dstlfn,userdesc,true);
  473. if (dfile) {
  474. ClusterPartDiskMapSpec spec = spec1;
  475. const char * kind = ftree->queryProp("Attr/@kind");
  476. bool iskey = kind&&(strcmp(kind,"key")==0);
  477. if (iskey&&repeattlk)
  478. spec.setRepeatedCopies(CPDMSRP_lastRepeated,false);
  479. dfile->addCluster(cluster1,spec);
  480. if (iskey&&!cluster2.isEmpty())
  481. dfile->addCluster(cluster2,spec2);
  482. if (copyphysical) {
  483. Owned<IFileDescriptor> fdesc=dfile->getFileDescriptor();
  484. physicalReplicateFile(fdesc,destfilename);
  485. }
  486. }
  487. }
  488. void extendSubFile(IPropertyTree *ftree,const char *destfilename)
  489. {
  490. CDfsLogicalFileName dstlfn;
  491. if (!dstlfn.setValidate(destfilename,true))
  492. throw MakeStringException(-1,"Logical name %s invalid",destfilename);
  493. Owned<IDistributedFile> dfile = fdir->lookup(dstlfn,userdesc,true);
  494. if (dfile) {
  495. ClusterPartDiskMapSpec spec = spec1;
  496. const char * kind = ftree->queryProp("Attr/@kind");
  497. bool iskey = kind&&(strcmp(kind,"key")==0);
  498. if (iskey&&repeattlk)
  499. spec.setRepeatedCopies(CPDMSRP_lastRepeated,false);
  500. dfile->addCluster(cluster1,spec);
  501. if (iskey&&!cluster2.isEmpty())
  502. dfile->addCluster(cluster2,spec2);
  503. if (copyphysical) {
  504. Owned<IFileDescriptor> fdesc=dfile->getFileDescriptor();
  505. physicalReplicateFile(fdesc,destfilename);
  506. }
  507. }
  508. }
  509. public:
  510. void init( const char *_cluster1,
  511. DFUclusterPartDiskMapping _clustmap,
  512. bool _repeattlk,
  513. const char *_cluster2,
  514. IUserDescriptor *_userdesc,
  515. const char *_foreigndali,
  516. IUserDescriptor *_foreignuserdesc,
  517. const char *_nameprefix,
  518. bool _overwrite,
  519. bool _copyphysical
  520. )
  521. {
  522. if (_userdesc)
  523. userdesc.set(_userdesc);
  524. if (_foreignuserdesc)
  525. foreignuserdesc.set(_foreignuserdesc);
  526. else if (_userdesc)
  527. foreignuserdesc.set(_userdesc);
  528. overwriteFlags = _overwrite ? DALI_UPDATEF_REPLACE_FILE : 0;
  529. copyphysical = _copyphysical;
  530. nameprefix.set(_nameprefix);
  531. repeattlk = _repeattlk;
  532. cluster1.set(_cluster1);
  533. level = 0;
  534. if (_foreigndali&&*_foreigndali)
  535. foreigndalinode.setown(createINode(_foreigndali,DALI_SERVER_PORT));
  536. fdir = &queryDistributedFileDirectory();
  537. switch(_clustmap) {
  538. case DFUcpdm_c_replicated_by_d:
  539. spec1.defaultCopies = DFD_DefaultCopies;
  540. break;
  541. case DFUcpdm_c_only:
  542. spec1.defaultCopies = DFD_NoCopies;
  543. break;
  544. case DFUcpdm_d_only:
  545. spec1.defaultCopies = DFD_NoCopies;
  546. spec1.startDrv = 1;
  547. break;
  548. case DFUcpdm_c_then_d:
  549. spec1.defaultCopies = DFD_NoCopies;
  550. spec1.flags = CPDMSF_wrapToNextDrv;
  551. break;
  552. }
  553. StringBuffer defdir1;
  554. GroupType groupType;
  555. grp1.setown(queryNamedGroupStore().lookup(_cluster1, defdir1, groupType));
  556. if (!grp1)
  557. throw MakeStringException(-1,"Cannot find cluster %s",_cluster1);
  558. if (defdir1.length())
  559. spec1.setDefaultBaseDir(defdir1.str());
  560. if (_cluster2&&*_cluster2) {
  561. spec2 = spec1;
  562. cluster2.set(_cluster2);
  563. StringBuffer defdir2;
  564. grp2.setown(queryNamedGroupStore().lookup(_cluster2, defdir2, groupType));
  565. if (!grp2)
  566. throw MakeStringException(-1,"Cannot find cluster %s",_cluster2);
  567. spec2.setRepeatedCopies(CPDMSRP_lastRepeated,true); // only TLK on cluster2
  568. if (defdir2.length())
  569. spec2.setDefaultBaseDir(defdir2.str());
  570. }
  571. }
  572. inline bool checkOverwrite(unsigned flag) { return (overwriteFlags & flag)!=0; }
  573. void cloneSuperFile(const char *filename, CDfsLogicalFileName &dlfn)
  574. {
  575. level++;
  576. Linked<INode> srcdali = foreigndalinode;
  577. CDfsLogicalFileName slfn;
  578. slfn.set(filename);
  579. if (slfn.isForeign()) { // trying to confuse me
  580. SocketEndpoint ep;
  581. slfn.getEp(ep);
  582. slfn.clearForeign();
  583. srcdali.setown(createINode(ep));
  584. }
  585. Owned<IPropertyTree> ftree = fdir->getFileTree(slfn.get(),foreignuserdesc,srcdali, FOREIGN_DALI_TIMEOUT, false);
  586. if (!ftree.get()) {
  587. StringBuffer s;
  588. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",slfn.get(),srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  589. }
  590. const char *dstlfn = slfn.get();
  591. StringBuffer dstname;
  592. if (!nameprefix.isEmpty()) {
  593. dstname.append(nameprefix).append("::");
  594. const char *oldprefix = ftree->queryProp("Attr/@roxiePrefix");
  595. if (oldprefix&&*oldprefix) {
  596. size32_t l = strlen(oldprefix);
  597. if ((l+2<strlen(dstlfn))&&
  598. (memicmp(oldprefix,dstlfn,l)==0)&&
  599. (dstlfn[l]==':') && (dstlfn[l+1]==':'))
  600. dstlfn += l+2;
  601. }
  602. }
  603. dstlfn = dstname.append(dstlfn).str();
  604. dlfn.set(dstname.str());
  605. if (!srcdali.get()||queryCoven().inCoven(srcdali)) {
  606. // if dali is local and filenames same
  607. if (strcmp(slfn.get(),dlfn.get())==0) {
  608. if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))==0) {
  609. extendSubFile(ftree,dlfn.get());
  610. }
  611. else if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_SuperFile))==0) {
  612. Owned<IPropertyTreeIterator> piter = ftree->getElements("SubFile");
  613. ForEach(*piter) {
  614. CDfsLogicalFileName dlfnres;
  615. cloneSuperFile(piter->query().queryProp("@name"),dlfnres);
  616. }
  617. }
  618. else
  619. throw MakeStringException(-1,"Cannot clone %s to itself",dlfn.get());
  620. }
  621. level--;
  622. return;
  623. }
  624. // first see if target exists (and remove if does and overwrite specified)
  625. Owned<IDistributedFile> dfile = fdir->lookup(dlfn,userdesc,true);
  626. if (dfile) {
  627. if (!checkOverwrite(DALI_UPDATEF_REPLACE_FILE))
  628. throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
  629. dfile->detach();
  630. dfile.clear();
  631. }
  632. if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))==0) {
  633. cloneSubFile(ftree,dlfn.get(), srcdali, srcCluster);
  634. }
  635. else if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_SuperFile))==0) {
  636. StringArray subfiles;
  637. Owned<IPropertyTreeIterator> piter = ftree->getElements("SubFile");
  638. ForEach(*piter) {
  639. CDfsLogicalFileName dlfnres;
  640. cloneSuperFile(piter->query().queryProp("@name"),dlfnres);
  641. subfiles.append(dlfnres.get());
  642. }
  643. // now construct the superfile
  644. Owned<IDistributedSuperFile> sfile = fdir->createSuperFile(dlfn.get(),userdesc,true,false);
  645. if (!sfile)
  646. throw MakeStringException(-1,"SuperFile %s could not be created",dlfn.get());
  647. ForEachItemIn(i,subfiles) {
  648. sfile->addSubFile(subfiles.item(i));
  649. }
  650. if (!nameprefix.isEmpty()) {
  651. DistributedFilePropertyLock lock(sfile);
  652. lock.queryAttributes().setProp("@roxiePrefix",nameprefix.get());
  653. }
  654. }
  655. else {
  656. StringBuffer s;
  657. throw MakeStringException(-1,"Source file %s in Dali %s is not a file or superfile",filename,srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  658. }
  659. level--;
  660. }
  661. void cloneFile(const char *filename, const char *destfilename)
  662. {
  663. level++;
  664. Linked<INode> srcdali = foreigndalinode;
  665. CDfsLogicalFileName slfn;
  666. slfn.set(filename);
  667. if (slfn.isForeign()) { // trying to confuse me
  668. SocketEndpoint ep;
  669. slfn.getEp(ep);
  670. slfn.clearForeign();
  671. srcdali.setown(createINode(ep));
  672. }
  673. Owned<IPropertyTree> ftree = fdir->getFileTree(slfn.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, false);
  674. if (!ftree.get()) {
  675. StringBuffer s;
  676. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",slfn.get(),srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  677. }
  678. IPropertyTree *attsrc = ftree->queryPropTree("Attr");
  679. if (!attsrc) {
  680. StringBuffer s;
  681. throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",slfn.get(),srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  682. }
  683. CDfsLogicalFileName dlfn;
  684. dlfn.set(destfilename);
  685. if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))!=0) {
  686. StringBuffer s;
  687. throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename,srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  688. }
  689. if (!srcdali.get()||queryCoven().inCoven(srcdali)) {
  690. // if dali is local and filenames same
  691. if (strcmp(slfn.get(),dlfn.get())==0) {
  692. extendSubFile(ftree,dlfn.get());
  693. level--;
  694. return;
  695. }
  696. }
  697. // first see if target exists (and remove if does and overwrite specified)
  698. Owned<IDistributedFile> dfile = fdir->lookup(dlfn,userdesc,true);
  699. if (dfile) {
  700. if (!checkOverwrite(DALI_UPDATEF_REPLACE_FILE))
  701. throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
  702. IPropertyTree &attloc = dfile->queryAttributes();
  703. if (dfile->numParts() == (unsigned)ftree->getPropInt("@numparts") &&
  704. attsrc->getPropInt("@eclCRC") == attloc.getPropInt("@eclCRC") &&
  705. attsrc->getPropInt("@totalCRC") == attloc.getPropInt64("@totalCRC"))
  706. {
  707. Owned<IFileDescriptor> dstfdesc=dfile->getFileDescriptor();
  708. Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
  709. updateCloneFrom(filename, dstfdesc, srcfdesc, srcdali, srcCluster);
  710. return;
  711. }
  712. dfile->detach();
  713. dfile.clear();
  714. }
  715. cloneSubFile(ftree,dlfn.get(),srcdali, srcCluster);
  716. level--;
  717. }
  718. inline const char *getDaliEndPointStr(INode *daliNode, StringBuffer &s)
  719. {
  720. if (!daliNode)
  721. return "(local)";
  722. return daliNode->endpoint().getUrlStr(s).str();
  723. }
  724. inline bool checkHasCluster(IDistributedFile *dfile)
  725. {
  726. StringArray clusterNames;
  727. dfile->getClusterNames(clusterNames);
  728. return clusterNames.find(cluster1)!=NotFound;
  729. }
  730. inline bool checkIsKey(IPropertyTree *ftree)
  731. {
  732. const char * kind = ftree->queryProp("Attr/@kind");
  733. return kind && streq(kind, "key");
  734. }
  735. void addCluster(IDistributedFile *dfile, IPropertyTree *srcFtree)
  736. {
  737. if (dfile)
  738. {
  739. ClusterPartDiskMapSpec spec = spec1;
  740. if (checkIsKey(srcFtree) && repeattlk)
  741. spec.setRepeatedCopies(CPDMSRP_lastRepeated, false);
  742. DBGLOG("addCluster %s to file %s", cluster1.str(), dfile->queryLogicalName());
  743. dfile->addCluster(cluster1, spec);
  744. }
  745. }
  746. bool checkFileChanged(IDistributedFile *dfile, IPropertyTree *srcftree, IPropertyTree *srcAttrs)
  747. {
  748. IPropertyTree &dstAttrs = dfile->queryAttributes();
  749. return (dfile->numParts() != (unsigned) srcftree->getPropInt("@numparts") ||
  750. srcAttrs->getPropInt("@eclCRC") != dstAttrs.getPropInt("@eclCRC") ||
  751. srcAttrs->getPropInt("@totalCRC") != dstAttrs.getPropInt64("@totalCRC"));
  752. }
  753. inline bool checkValueChanged(const char *s1, const char *s2)
  754. {
  755. if (s1 && s2)
  756. return !streq(s1, s2);
  757. return ((s1 && *s1) || (s2 && *s2));
  758. }
  759. bool checkCloneFromChanged(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
  760. {
  761. if (!srcdali || srcdali->endpoint().isNull())
  762. {
  763. return checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPeerCluster"), srcCluster);
  764. }
  765. else
  766. {
  767. StringBuffer s;
  768. if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getUrlStr(s).str()))
  769. return true;
  770. if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir()))
  771. return true;
  772. if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPeerCluster"), (srcCluster && *srcCluster) ? "-" : NULL))
  773. return true;
  774. if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPrefix"), prefix.get()))
  775. return true;
  776. unsigned groupCount = dfile->queryAttributes().getCount("cloneFromGroup");
  777. if (srcCluster && *srcCluster && groupCount!=1)
  778. return true;
  779. unsigned numSrcClusters = srcfdesc->numClusters();
  780. if (numSrcClusters != groupCount)
  781. return true;
  782. StringArray clusterNames;
  783. dfile->getClusterNames(clusterNames);
  784. for (unsigned clusterNum = 0; clusterNum < numSrcClusters; clusterNum++)
  785. {
  786. StringBuffer sourceGroup;
  787. srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
  788. if (!clusterNames.contains(sourceGroup.str()))
  789. return true;
  790. }
  791. }
  792. return false;
  793. }
  794. void cloneRoxieFile(const char *filename, const char *destfilename)
  795. {
  796. Linked<INode> srcdali = foreigndalinode;
  797. CDfsLogicalFileName srcLFN;
  798. srcLFN.set(filename);
  799. if (srcLFN.isForeign())
  800. {
  801. SocketEndpoint ep;
  802. srcLFN.getEp(ep);
  803. srcLFN.clearForeign();
  804. srcdali.setown(createINode(ep));
  805. }
  806. StringBuffer s;
  807. Owned<IPropertyTree> ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, false);
  808. if (!ftree.get())
  809. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
  810. IPropertyTree *attsrc = ftree->queryPropTree("Attr");
  811. if (!attsrc)
  812. throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
  813. CDfsLogicalFileName dlfn;
  814. dlfn.set(destfilename);
  815. if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File)))
  816. throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s));
  817. if (!srcdali.get()||queryCoven().inCoven(srcdali))
  818. {
  819. // if dali is local and filenames same
  820. if (streq(srcLFN.get(), dlfn.get()))
  821. {
  822. extendSubFile(ftree,dlfn.get());
  823. return;
  824. }
  825. }
  826. //see if target already exists
  827. Owned<IDistributedFile> dfile = fdir->lookup(dlfn, userdesc, true);
  828. if (dfile)
  829. {
  830. if (!checkOverwrite(DALI_UPDATEF_SUBFILE_MASK))
  831. throw MakeStringException(-1, "Destination file %s already exists", dlfn.get());
  832. if (checkOverwrite(DALI_UPDATEF_REPLACE_FILE) && checkFileChanged(dfile, ftree, attsrc)) //complete overwrite
  833. {
  834. DBGLOG("replacing file %s", destfilename);
  835. dfile->detach();
  836. dfile.clear();
  837. }
  838. else
  839. {
  840. if (checkOverwrite(DALI_UPDATEF_APPEND_CLUSTER) && !checkHasCluster(dfile))
  841. addCluster(dfile, ftree);
  842. if (checkOverwrite(DALI_UPDATEF_CLONE_FROM))
  843. {
  844. Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
  845. if (checkCloneFromChanged(dfile, srcfdesc, srcdali, srcCluster))
  846. updateCloneFrom(dfile, srcfdesc, srcdali, srcCluster);
  847. }
  848. return;
  849. }
  850. }
  851. cloneSubFile(ftree,dlfn.get(),srcdali, srcCluster);
  852. }
  853. };
  854. class CDFUhelper: implements IDFUhelper, public CInterface
  855. {
  856. public:
  857. IMPLEMENT_IINTERFACE
  858. void addSuper(const char *superfname, IUserDescriptor *user, unsigned numtoadd, const char **subfiles, const char *before, bool autocreatesuper)
  859. {
  860. if (!numtoadd)
  861. throwError(DFUERR_DNoSubfileToAddToSuperFile);
  862. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  863. transaction->start();
  864. Owned<IDistributedSuperFile> superfile = transaction->lookupSuperFile(superfname);
  865. if (!superfile)
  866. {
  867. if (!autocreatesuper)
  868. throwError1(DFUERR_DSuperFileNotFound, superfname);
  869. superfile.setown(queryDistributedFileDirectory().createSuperFile(superfname,user,true,false,transaction));
  870. }
  871. for (unsigned i=0;i<numtoadd;i++)
  872. {
  873. if (superfile->querySubFileNamed(subfiles[i]))
  874. throwError1(DFUERR_DSuperFileContainsSub, subfiles[i]);
  875. if (before&&*before)
  876. superfile->addSubFile(subfiles[i],true,(stricmp(before,"*")==0)?NULL:before,false,transaction);
  877. else
  878. superfile->addSubFile(subfiles[i],false,NULL,false,transaction);
  879. }
  880. superfile.clear();
  881. transaction->commit();
  882. }
  883. void removeSuper(const char *superfname, IUserDescriptor *user, unsigned numtodelete, const char **subfiles, bool delsub, bool removesuperfile)
  884. {
  885. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  886. // We need this here, since caching only happens with active transactions
  887. // MORE - abstract this with DFSAccess, or at least enable caching with a flag
  888. Owned<IDistributedSuperFile> superfile = transaction->lookupSuperFile(superfname);
  889. if (!superfile)
  890. throwError1(DFUERR_DSuperFileNotFound, superfname);
  891. StringAttrArray toremove;
  892. // Delete All
  893. if (numtodelete == 0) {
  894. if (delsub) {
  895. Owned<IDistributedFileIterator> iter=superfile->getSubFileIterator();
  896. ForEach(*iter) {
  897. StringBuffer name;
  898. iter->getName(name);
  899. toremove.append(*new StringAttrItem(name.str()));
  900. }
  901. }
  902. }
  903. // Delete Some / Wildcard
  904. // MORE - shouldn't we allow wildcard to delete all matching, without the need to specify numtodelete?
  905. else {
  906. for (unsigned i1=0;i1<numtodelete;i1++)
  907. if (strchr(subfiles[i1],'?')||strchr(subfiles[i1],'*')) {
  908. Owned<IDistributedFileIterator> iter=superfile->getSubFileIterator();
  909. ForEach(*iter) {
  910. StringBuffer name;
  911. iter->getName(name);
  912. if (WildMatch(name.str(),subfiles[i1]))
  913. toremove.append(*new StringAttrItem(name.str()));
  914. }
  915. }
  916. else
  917. if (superfile->querySubFileNamed(subfiles[i1]))
  918. toremove.append(*new StringAttrItem(subfiles[i1]));
  919. else
  920. throwError1(DFUERR_DSuperFileDoesntContainSub, subfiles[i1]);
  921. }
  922. if (removesuperfile && toremove.ordinality()!=superfile->numSubFiles())
  923. removesuperfile = false;
  924. // Do we have something to delete?
  925. if (toremove.ordinality() || removesuperfile) {
  926. transaction->start();
  927. ForEachItemIn(i2,toremove)
  928. superfile->removeSubFile(toremove.item(i2).text.get(),delsub,false,transaction);
  929. // Delete superfile if empty
  930. if (removesuperfile)
  931. queryDistributedFileDirectory().removeEntry(superfname, user, transaction);
  932. superfile.clear();
  933. transaction->commit();
  934. }
  935. }
  936. void listSubFiles(const char *superfname,StringAttrArray &out,IUserDescriptor *user)
  937. {
  938. Owned<IDistributedSuperFile> superfile = queryDistributedFileDirectory().lookupSuperFile(superfname,user);
  939. if (!superfile)
  940. throwError1(DFUERR_DSuperFileNotFound, superfname);
  941. Owned<IDistributedFileIterator> iter=superfile->getSubFileIterator();
  942. ForEach(*iter) {
  943. StringBuffer name;
  944. iter->getName(name);
  945. out.append(*new StringAttrItem(name.str()));
  946. }
  947. }
  948. StringBuffer &getFileXML(const char *lfn, StringBuffer &out, IUserDescriptor *user)
  949. {
  950. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lfn, user);
  951. if (!file) {
  952. INamedGroupStore &grpstore= queryNamedGroupStore();
  953. Owned<IGroup> grp = grpstore.lookup(lfn);
  954. if (grp) {
  955. out.append("<Group name=\"").append(lfn).append("\">\n");
  956. ForEachNodeInGroup(i, *grp) {
  957. StringBuffer ip;
  958. grp->getNode(i)->endpoint().getIpText(ip);
  959. out.append(" <Node ip=\"").append(ip).append("\">\n");
  960. }
  961. out.append("</Group>\n");
  962. return out;
  963. }
  964. else
  965. throwError1(DFUERR_DFileNotFound, lfn);
  966. }
  967. else
  968. {
  969. Owned<IPropertyTree> t = queryDistributedFileDirectory().getFileTree(lfn, user);
  970. toXML(t, out, true);
  971. }
  972. return out;
  973. }
  974. void addFileXML(const char *lfn, const StringBuffer &xml, IUserDescriptor *user)
  975. {
  976. Owned<IPropertyTree> t = createPTreeFromXMLString(xml);
  977. IDistributedFileDirectory &dfd = queryDistributedFileDirectory();
  978. if (dfd.exists(lfn, user))
  979. throw MakeStringException(-1, "Destination file '%s' already exists!", lfn);
  980. // Check if this XML is a superfile map
  981. Owned<IDistributedFile> file;
  982. const char * nodeName = t->queryName();
  983. if (0 == strcmp(nodeName, queryDfsXmlBranchName(DXB_SuperFile)))
  984. {
  985. // It seems XML is a super file
  986. file.setown(dfd.createNewSuperFile(t));
  987. }
  988. else if (0 == strcmp(nodeName, queryDfsXmlBranchName(DXB_File)))
  989. {
  990. // Logical file map
  991. Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(t, &queryNamedGroupStore(), 0);
  992. file.setown(dfd.createNew(fdesc, true));
  993. }
  994. else
  995. throw MakeStringException(-1, "Unrecognised file XML root tag detected: '%s'", nodeName);
  996. file->validate();
  997. PROGLOG("Adding %s file %s.", file->querySuperFile()?"super":"logical", lfn);
  998. file->attach(lfn,user);
  999. }
  1000. void addFileRemote(const char *lfn,SocketEndpoint &srcdali,const char *srclfn,IUserDescriptor *user,IUserDescriptor *srcuser=NULL)
  1001. {
  1002. SocketEndpoint daliep = srcdali;
  1003. if (daliep.port==0)
  1004. daliep.port= DALI_SERVER_PORT;
  1005. Owned<INode> node = createINode(daliep);
  1006. Owned<IFileDescriptor> fdesc = queryDistributedFileDirectory().getFileDescriptor(srclfn,srcuser,node);
  1007. if (!fdesc) {
  1008. StringBuffer s;
  1009. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srclfn,daliep.getUrlStr(s).str());
  1010. }
  1011. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1012. if (file)
  1013. file->attach(lfn,user);
  1014. }
  1015. void superForeignCopy(const char *lfn,SocketEndpoint &srcdali,const char *srclfn,IUserDescriptor *srcuser,IUserDescriptor *user, bool overwrite, IDfuFileCopier *copier)
  1016. {
  1017. SocketEndpoint daliep = srcdali;
  1018. CDfsLogicalFileName slfn;
  1019. slfn.set(srclfn);
  1020. if (slfn.isForeign()) // trying to confuse me
  1021. {
  1022. slfn.getEp(daliep);
  1023. slfn.clearForeign();
  1024. }
  1025. if (daliep.port==0)
  1026. daliep.port= DALI_SERVER_PORT;
  1027. Owned<INode> srcnode = createINode(daliep);
  1028. if (queryCoven().inCoven(srcnode))
  1029. {
  1030. // if dali is local and filenames same
  1031. CDfsLogicalFileName dlfn;
  1032. dlfn.set(lfn);
  1033. if (strcmp(slfn.get(),dlfn.get())==0)
  1034. {
  1035. PROGLOG("File copy of %s not done as file local",srclfn);
  1036. return;
  1037. }
  1038. }
  1039. Owned<IPropertyTree> ftree = queryDistributedFileDirectory().getFileTree(srclfn,srcuser,srcnode, FOREIGN_DALI_TIMEOUT, false);
  1040. if (!ftree.get())
  1041. {
  1042. StringBuffer s;
  1043. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srclfn,daliep.getUrlStr(s).str());
  1044. }
  1045. // first see if target exists (and remove if does and overwrite specified)
  1046. Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(lfn,user,true);
  1047. if (dfile)
  1048. {
  1049. if (!overwrite)
  1050. throw MakeStringException(-1,"Destination file %s already exists",lfn);
  1051. if (!dfile->querySuperFile())
  1052. {
  1053. if (ftree->hasProp("Attr/@fileCrc")&&ftree->getPropInt64("Attr/@size")&&
  1054. ((unsigned)ftree->getPropInt64("Attr/@fileCrc")==(unsigned)dfile->queryAttributes().getPropInt64("@fileCrc"))&&
  1055. (ftree->getPropInt64("Attr/@size")==dfile->getFileSize(false,false)))
  1056. {
  1057. PROGLOG("File copy of %s not done as file unchanged",srclfn);
  1058. return;
  1059. }
  1060. }
  1061. dfile->detach();
  1062. dfile.clear();
  1063. }
  1064. if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))==0)
  1065. {
  1066. assertex(copier);
  1067. if (!copier->copyFile(lfn,daliep,srclfn,srcuser,user))
  1068. throw MakeStringException(-1,"File %s could not be copied",lfn);
  1069. }
  1070. else if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_SuperFile))==0)
  1071. {
  1072. StringArray subfiles;
  1073. Owned<IPropertyTreeIterator> piter = ftree->getElements("SubFile");
  1074. ForEach(*piter)
  1075. {
  1076. const char *name = piter->query().queryProp("@name");
  1077. CDfsLogicalFileName dlfn;
  1078. dlfn.set(name);
  1079. superForeignCopy(dlfn.get(true),daliep,name,srcuser,user,overwrite,copier);
  1080. subfiles.append(dlfn.get(true));
  1081. }
  1082. if (!copier->wait()) // should throw exeption if failure
  1083. return;
  1084. // now construct the superfile
  1085. Owned<IDistributedSuperFile> sfile = queryDistributedFileDirectory().createSuperFile(lfn,user,true,false);
  1086. if (!sfile)
  1087. throw MakeStringException(-1,"SuperFile %s could not be created",lfn);
  1088. ForEachItemIn(i,subfiles)
  1089. sfile->addSubFile(subfiles.item(i));
  1090. }
  1091. else
  1092. {
  1093. StringBuffer s;
  1094. throw MakeStringException(-1,"Source file %s in Dali %s is not a file or superfile",srclfn,daliep.getUrlStr(s).str());
  1095. }
  1096. }
  1097. void createFileClone(const char *srcname, // src LFN (can be foreign)
  1098. const char *cluster1, // group name of roxie cluster
  1099. DFUclusterPartDiskMapping clustmap, // how the nodes are mapped
  1100. bool repeattlk, // repeat last part on all nodes if key
  1101. const char *cluster2, // servers cluster (for just tlk)
  1102. IUserDescriptor *userdesc, // user desc for local dali
  1103. const char *foreigndali, // can be omitted if srcname foreign or local
  1104. IUserDescriptor *foreignuserdesc, // user desc for foreign dali if needed
  1105. const char *nameprefix, // prefix for new name
  1106. bool overwrite, // delete destination if exists
  1107. bool dophysicalcopy // NB *not* using DFU server
  1108. )
  1109. {
  1110. CFileCloner cloner;
  1111. cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
  1112. CDfsLogicalFileName dlfn;
  1113. cloner.cloneSuperFile(srcname,dlfn);
  1114. }
  1115. void createSingleFileClone(const char *srcname, // src LFN (can't be super)
  1116. const char *srcCluster, // optional specific cluster to copy data from
  1117. const char *dstname, // dst LFN
  1118. const char *cluster1, // group name of roxie cluster
  1119. const char *prefix,
  1120. DFUclusterPartDiskMapping clustmap, // how the nodes are mapped
  1121. bool repeattlk, // repeat last part on all nodes if key
  1122. const char *cluster2, // servers cluster (for just tlk)
  1123. IUserDescriptor *userdesc, // user desc for local dali
  1124. const char *foreigndali, // can be omitted if srcname foreign or local
  1125. IUserDescriptor *foreignuserdesc, // user desc for foreign dali if needed
  1126. bool overwrite, // delete destination if exists
  1127. bool dophysicalcopy // NB *not* using DFU server
  1128. )
  1129. {
  1130. DBGLOG("createSingleFileClone src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=%d", srcname, srcCluster, dstname, cluster1, prefix, overwrite, dophysicalcopy);
  1131. CFileCloner cloner;
  1132. cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,NULL,overwrite,dophysicalcopy);
  1133. cloner.srcCluster.set(srcCluster);
  1134. cloner.prefix.set(prefix);
  1135. cloner.cloneFile(srcname,dstname);
  1136. }
  1137. virtual void cloneRoxieSubFile(const char *srcLFN, // src LFN (can't be super)
  1138. const char *srcCluster,
  1139. const char *dstLFN, // dst LFN
  1140. const char *dstCluster, // group name of roxie cluster
  1141. const char *prefix,
  1142. unsigned redundancy, // Number of "spare" copies of the data
  1143. unsigned channelsPerNode, // Overloaded and cyclic modes
  1144. int replicateOffset, // Used In cyclic mode only
  1145. const char *defReplicateFolder,
  1146. IUserDescriptor *userdesc, // user desc for local dali
  1147. const char *foreigndali, // can be omitted if srcname foreign or local
  1148. unsigned overwriteFlags // overwrite destination if exists
  1149. )
  1150. {
  1151. DBGLOG("cloneRoxieSubFile src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=false", srcLFN, srcCluster, dstLFN, dstCluster, prefix, overwriteFlags);
  1152. CFileCloner cloner;
  1153. cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, false);
  1154. cloner.overwriteFlags = overwriteFlags;
  1155. cloner.spec1.setRoxie(redundancy, channelsPerNode, replicateOffset);
  1156. if (defReplicateFolder)
  1157. cloner.spec1.setDefaultReplicateDir(defReplicateFolder);
  1158. cloner.srcCluster.set(srcCluster);
  1159. cloner.prefix.set(prefix);
  1160. cloner.cloneRoxieFile(srcLFN, dstLFN);
  1161. }
  1162. void cloneFileRelationships(
  1163. const char *foreigndali, // where src relationships are retrieved from (can be NULL for local)
  1164. StringArray &srcfns, // file names on source
  1165. StringArray &dstfns, // corresponding filenames on dest (must exist)
  1166. IPropertyTree *relationships, // if not NULL, tree will have all relationships filled in
  1167. IUserDescriptor *user
  1168. )
  1169. {
  1170. // not a quick routine! (n^2)
  1171. unsigned n = srcfns.ordinality();
  1172. if (n!=dstfns.ordinality())
  1173. throw MakeStringException(-1,"cloneFileRelationships - src and destination arrays not same size");
  1174. // first find which of dstfns exist
  1175. MemoryAttr ma;
  1176. bool *ex = (bool *)ma.allocate(dstfns.ordinality()*sizeof(bool));
  1177. unsigned i;
  1178. for (i=0;i<n;i++)
  1179. ex[i] = queryDistributedFileDirectory().exists(dstfns.item(i),user);
  1180. for (i=0;i<n;i++) {
  1181. if (!ex[i])
  1182. continue;
  1183. for (unsigned j=0;j<n;j++) {
  1184. if (!ex[j])
  1185. continue;
  1186. Owned<IFileRelationshipIterator> iter = queryDistributedFileDirectory().lookupFileRelationships(srcfns.item(i),srcfns.item(j),NULL,NULL,NULL,NULL,NULL,foreigndali);
  1187. ForEach(*iter) {
  1188. try {
  1189. IFileRelationship &r = iter->query();
  1190. queryDistributedFileDirectory().addFileRelationship(dstfns.item(i),dstfns.item(j),r.queryPrimaryFields(),r.querySecondaryFields(),r.queryKind(),r.queryCardinality(),r.isPayload(),user,r.queryDescription());
  1191. if (relationships)
  1192. {
  1193. IPropertyTree *tree = iter->query().queryTree(); // relies on this being modifiable
  1194. if (tree) {
  1195. tree->Link();
  1196. tree = relationships->addPropTree("Relationship", tree);
  1197. tree->setProp("@primary", dstfns.item(i));
  1198. tree->setProp("@secondary", dstfns.item(j));
  1199. }
  1200. }
  1201. }
  1202. catch (IException *e)
  1203. {
  1204. EXCLOG(e,"cloneFileRelationships");
  1205. e->Release();
  1206. }
  1207. }
  1208. }
  1209. }
  1210. }
  1211. };
  1212. IDFUhelper *createIDFUhelper()
  1213. {
  1214. return new CDFUhelper;
  1215. }