XRefFilesNode.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. // XRefFilesNode1.cpp: implementation of the CXRefFilesNode class.
  15. //
  16. //////////////////////////////////////////////////////////////////////
  17. #include "XRefFilesNode.hpp"
  18. #include "dautils.hpp"
  19. //////////////////////////////////////////////////////////////////////
  20. // Construction/Destruction
  21. //////////////////////////////////////////////////////////////////////
  22. CXRefFilesNode::CXRefFilesNode(IPropertyTree& baseNode,const char* cluster,const char *_rootdir)
  23. : m_baseTree(baseNode), rootdir(_rootdir)
  24. {
  25. baseNode.setProp("@Cluster",cluster);
  26. m_bChanged = false;
  27. prefixName.append(cluster);
  28. }
  29. bool CXRefFilesNode::IsChanged()
  30. {
  31. return m_bChanged;
  32. }
  33. void CXRefFilesNode::Commit()
  34. {
  35. if (m_bChanged)
  36. Deserialize(getDataTree());
  37. m_bChanged = false;
  38. }
  39. StringBuffer& CXRefFilesNode::Serialize(StringBuffer& outStr)
  40. {
  41. if (!m_bChanged && _data.length() > 0)
  42. {
  43. outStr.append(_data);
  44. return outStr;
  45. }
  46. _data.clear();
  47. MemoryBuffer buff;
  48. m_baseTree.getPropBin("data",buff);
  49. if (buff.length())
  50. {
  51. outStr.append(buff.length(),buff.toByteArray());
  52. _data.append(outStr);
  53. }
  54. return outStr;
  55. }
  56. void CXRefFilesNode::Deserialize(IPropertyTree& inTree)
  57. {
  58. CleanTree(inTree);
  59. StringBuffer datastr;
  60. toXML(&inTree,datastr);
  61. m_baseTree.setPropBin("data",datastr.length(),(void*)datastr.toCharArray());
  62. }
  63. IPropertyTree* CXRefFilesNode::FindNode(const char* NodeName)
  64. {
  65. StringBuffer xpath;
  66. xpath.clear().appendf("File/[Partmask=\"%s\"]", NodeName);
  67. StringBuffer tmpbuf;
  68. return getDataTree().getBranch(xpath.str());
  69. }
  70. IPropertyTree& CXRefFilesNode::getDataTree()
  71. {
  72. if (m_DataTree.get() == 0)
  73. {
  74. StringBuffer dataStr;
  75. Serialize(dataStr);
  76. m_DataTree.setown(createPTreeFromXMLString(dataStr.str()));
  77. }
  78. return *m_DataTree.get();
  79. }
  80. static bool checkPartsInCluster(const char *title,const char *clustername, IPropertyTree *subBranch, StringBuffer &errstr,bool exists)
  81. {
  82. Owned<IGroup> group = queryNamedGroupStore().lookup(clustername);
  83. if (!group) {
  84. ERRLOG("%s cluster not found",clustername);
  85. errstr.appendf("ERROR: %s cluster not found",clustername);
  86. return false;
  87. }
  88. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  89. unsigned i;
  90. StringBuffer xpath;
  91. unsigned n = group->ordinality();
  92. ForEach(*partItr) {
  93. IPropertyTree& part = partItr->query();
  94. unsigned pn = part.getPropInt("Num");
  95. for (int rep=0;rep<2;rep++) {
  96. i = 0;
  97. loop {
  98. i++;
  99. xpath.clear().appendf(rep?"RNode[%d]":"Node[%d]",i);
  100. if (!part.hasProp(xpath.str()))
  101. break;
  102. SocketEndpoint ep(part.queryProp(xpath.str()));
  103. ep.port = 0;
  104. rank_t gn = group->rank(ep);
  105. if (group->rank(ep)==RANK_NULL) {
  106. StringBuffer eps;
  107. ERRLOG("%s %s Part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
  108. errstr.appendf("ERROR: %s %s part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
  109. return false;
  110. }
  111. if (exists) {
  112. if ((pn-1+rep)%n==gn) {
  113. ERRLOG("Logical file for %s exists (part not orphaned?)",title);
  114. errstr.appendf("Logical file for %s exists (part not orphaned?)",title);
  115. return false;
  116. }
  117. }
  118. }
  119. }
  120. }
  121. return true;
  122. }
  123. bool CXRefFilesNode::RemovePhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  124. {
  125. size32_t startlen = errstr.length();
  126. IPropertyTree* subBranch = FindNode(Partmask);
  127. if (!subBranch) {
  128. ERRLOG("%s branch not found",Partmask);
  129. errstr.appendf("ERROR: %s branch not found",Partmask);
  130. return false;
  131. }
  132. // sanity check file doesn't (now) exist
  133. bool exists = false;
  134. StringBuffer lfn;
  135. if (LogicalNameFromMask(Partmask,lfn)) {
  136. if (queryDistributedFileDirectory().exists(lfn.str(),true))
  137. exists = true;
  138. }
  139. if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,exists))
  140. return false;
  141. RemoteFilenameArray files;
  142. int numparts = subBranch->getPropInt("Numparts");
  143. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  144. ForEach(*partItr)
  145. {
  146. IPropertyTree& part = partItr->query();
  147. StringBuffer remoteFile;
  148. expandMask(remoteFile, Partmask, part.getPropInt("Num")-1, numparts);
  149. /////////////////////////////////
  150. StringBuffer xpath;
  151. unsigned i = 0;
  152. loop {
  153. i++;
  154. xpath.clear().appendf("Node[%d]",i);
  155. if (!part.hasProp(xpath.str()))
  156. break;
  157. SocketEndpoint ip(part.queryProp(xpath.str()));
  158. RemoteFilename rmtFile;
  159. rmtFile.setPath(ip,remoteFile.str());
  160. files.append(rmtFile);
  161. }
  162. i = 0;
  163. loop {
  164. i++;
  165. xpath.clear().appendf("RNode[%d]",i);
  166. if (!part.hasProp(xpath.str()))
  167. break;
  168. SocketEndpoint ip(part.queryProp(xpath.str()));
  169. RemoteFilename rmtFile;
  170. StringBuffer replicateFile;
  171. if (setReplicateDir(remoteFile.str(),replicateFile))
  172. rmtFile.setPath(ip,replicateFile.str()); // old semantics
  173. else
  174. rmtFile.setPath(ip,remoteFile.str());
  175. files.append(rmtFile);
  176. }
  177. }
  178. CriticalSection crit;
  179. class casyncfor: public CAsyncFor
  180. {
  181. RemoteFilenameArray &files;
  182. StringBuffer &errstr;
  183. CriticalSection &crit;
  184. public:
  185. casyncfor(RemoteFilenameArray &_files, StringBuffer &_errstr, CriticalSection &_crit)
  186. : files(_files), errstr(_errstr), crit(_crit)
  187. {
  188. }
  189. void Do(unsigned idx)
  190. {
  191. try{
  192. Owned<IFile> _remoteFile = createIFile(files.item(idx));
  193. DBGLOG("Removing physical part at %s",_remoteFile->queryFilename());
  194. if (_remoteFile->exists()) {
  195. if (!_remoteFile->remove()) {
  196. StringBuffer errname;
  197. files.item(idx).getRemotePath(errname);
  198. ERRLOG("Could not delete file %s",errname.str());
  199. CriticalBlock block(crit);
  200. if (errstr.length())
  201. errstr.append('\n');
  202. errstr.appendf("ERROR: Could not delete file %s",errname.str());
  203. }
  204. }
  205. }
  206. catch(IException* e)
  207. {
  208. StringBuffer s(" deleting logical part ");
  209. files.item(idx).getRemotePath(s);
  210. EXCLOG(e,s.str());
  211. CriticalBlock block(crit);
  212. if (errstr.length())
  213. errstr.append('\n');
  214. errstr.append("ERROR: ");
  215. e->errorMessage(errstr);
  216. errstr.append(s);
  217. e->Release();
  218. }
  219. catch(...)
  220. {
  221. StringBuffer errname;
  222. files.item(idx).getRemotePath(errname);
  223. DBGLOG("Unknown Exception caught while deleting logical part %s",errname.str());
  224. CriticalBlock block(crit);
  225. if (errstr.length())
  226. errstr.append('\n');
  227. errstr.appendf("ERROR: Unknown Exception caught while deleting logical part %s",errname.str());
  228. }
  229. }
  230. } afor(files,errstr,crit);
  231. afor.For(files.ordinality(),10,false,true);
  232. if (!RemoveTreeNode(Partmask))
  233. {
  234. ERRLOG("Error Removing XRef Branch %s",Partmask);
  235. return false;
  236. }
  237. m_bChanged = true;
  238. return errstr.length()==startlen;
  239. }
  240. bool CXRefFilesNode::RemoveLogical(const char* LogicalName,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  241. {
  242. StringBuffer xpath;
  243. xpath.clear().appendf("File/[Name=\"%s\"]", LogicalName);
  244. StringBuffer tmpbuf;
  245. IPropertyTree* pLogicalFileNode = getDataTree().getBranch(xpath.str());
  246. if (!pLogicalFileNode) {
  247. ERRLOG("Branch %s not found",xpath.str());
  248. errstr.appendf("Branch %s not found",xpath.str());
  249. return false;
  250. }
  251. if (!checkPartsInCluster(LogicalName,clustername,pLogicalFileNode,errstr,false))
  252. return false;
  253. if (queryDistributedFileDirectory().existsPhysical(LogicalName,udesc)) {
  254. ERRLOG("Logical file %s all parts exist (not lost?))",LogicalName);
  255. errstr.appendf("Logical file %s all parts exist (not lost?))",LogicalName);
  256. return false;
  257. }
  258. if (!getDataTree().removeTree(pLogicalFileNode)) {
  259. ERRLOG("Removing XRef Branch %s", xpath.str());
  260. errstr.appendf("Removing XRef Branch %s", xpath.str());
  261. return false;
  262. }
  263. m_bChanged = true;
  264. queryDistributedFileDirectory().removeEntry(LogicalName,udesc);
  265. return true;
  266. }
  267. bool CXRefFilesNode::AttachPhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  268. {
  269. IPropertyTree* subBranch = FindNode(Partmask);
  270. if (!subBranch) {
  271. ERRLOG("%s node not found",Partmask);
  272. errstr.appendf("ERROR: %s node not found",Partmask);
  273. return false;
  274. }
  275. if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,false))
  276. return false;
  277. StringBuffer logicalName;
  278. if (!LogicalNameFromMask(Partmask,logicalName)) {
  279. ERRLOG("%s - could not attach",Partmask);
  280. errstr.appendf("ERROR: %s - could not attach",Partmask);
  281. return false;
  282. }
  283. if (queryDistributedFileDirectory().exists(logicalName.toCharArray()))
  284. {
  285. ERRLOG("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
  286. errstr.appendf("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
  287. return false;
  288. }
  289. StringBuffer drive,path,tail,ext;
  290. splitFilename(Partmask, &drive, &path, &tail, &ext);
  291. //set directory info
  292. StringBuffer dir;
  293. dir.append(drive.str());
  294. dir.append(path.str());
  295. Owned<IFileDescriptor> fileDesc = createFileDescriptor();
  296. fileDesc->setDefaultDir(dir.str());
  297. //use the logical name as the title....
  298. fileDesc->setTraceName(logicalName.str());
  299. IPropertyTree & attr = fileDesc->queryProperties();
  300. //attr.setProp("@size",subBranch->queryProp("Size")); we don't know size (this value isn't right!)
  301. unsigned numparts = subBranch->getPropInt("Numparts");
  302. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  303. for (partItr->first(); partItr->isValid(); partItr->next())
  304. {
  305. IPropertyTree& part = partItr->query();
  306. //get the full file path
  307. StringBuffer remoteFilePath;
  308. expandMask(remoteFilePath, Partmask, part.getPropInt("Num")-1, numparts);
  309. StringBuffer _drive,_path,_tail,_ext,_filename;
  310. splitFilename(remoteFilePath.str(), &_drive, &_path, &_tail, &_ext);
  311. _filename.append(_tail.str());
  312. _filename.append(_ext.str());
  313. const char* _node = part.queryProp("Node[1]");
  314. if (!_node||!*_node)
  315. _node = part.queryProp("RNode[1]");
  316. if (!*_node||!*_node) {
  317. ERRLOG("%s - could not attach (missing part info)",Partmask);
  318. errstr.appendf("ERROR: %s - could not attach (missing part info)",Partmask);
  319. return false;
  320. }
  321. Owned<INode> node = createINode(_node);
  322. DBGLOG("Setting number %d for Node %s and name %s",part.getPropInt("Num")-1,_node,_filename.str());
  323. //Num is 0 based...
  324. fileDesc->setPart(part.getPropInt("Num")-1,node,_filename.str());
  325. }
  326. Owned<IDistributedFile> dFile = queryDistributedFileDirectory().createNew(fileDesc);
  327. dFile->attach(logicalName.toCharArray(),udesc);
  328. if (!RemoveTreeNode(Partmask)) {
  329. ERRLOG("Removing XRef Branch %s",Partmask);
  330. errstr.appendf("ERROR: Removing XRef Branch %s",Partmask);
  331. return false;
  332. }
  333. m_bChanged = true;
  334. return true;
  335. }
  336. void CXRefFilesNode::DirectoryFromMask(const char* Partmask,StringBuffer& directory)
  337. {
  338. if(*Partmask == 0)
  339. return;
  340. const char *in = Partmask;
  341. int counter = 0;
  342. while (*in)
  343. {
  344. if (*in == '.')
  345. break;
  346. directory.append(*in);
  347. }
  348. }
  349. bool CXRefFilesNode::LogicalNameFromMask(const char* fname,StringBuffer& logicalName)
  350. {
  351. CDfsLogicalFileName lfn;
  352. if (!lfn.setFromMask(fname,rootdir))
  353. return false;
  354. logicalName.append(lfn.get());
  355. return true;
  356. }
  357. bool CXRefFilesNode::RemoveTreeNode(const char* NodeName)
  358. {
  359. IPropertyTree* subBranch = FindNode(NodeName);
  360. if (!subBranch)
  361. return false;
  362. StringBuffer tmpbuf;
  363. return getDataTree().removeTree(subBranch);
  364. }
  365. bool CXRefFilesNode::RemoveRemoteFile(const char* fileName, const char* ipAddress)
  366. {
  367. SocketEndpoint ip;
  368. ip.set(ipAddress);
  369. RemoteFilename rmtFile;
  370. rmtFile.setPath(ip,fileName); // filename shhould be full windows or unix path
  371. Owned<IFile> _remoteFile = createIFile(rmtFile);
  372. if (_remoteFile->exists())
  373. return _remoteFile->remove();
  374. return false;
  375. }
  376. ////////////////////////////////////////////////////////////////////////////////////
  377. //
  378. //
  379. ////////////////////////////////////////////////////////////////////////////////////
  380. CXRefOrphanFilesNode::CXRefOrphanFilesNode(IPropertyTree& baseNode,const char* cluster,const char* rootdir)
  381. : CXRefFilesNode(baseNode,cluster,rootdir)
  382. {
  383. }
  384. void CXRefOrphanFilesNode::CleanTree(IPropertyTree& inTree)
  385. {
  386. Owned<IPropertyTreeIterator> Itr = inTree.getElements("*");
  387. Itr->first();
  388. int partcount = 0;
  389. while(Itr->isValid())
  390. {
  391. IPropertyTree& node = Itr->query();
  392. if(strcmp(node.queryName(),"Part") == 0)
  393. {
  394. partcount++;
  395. }
  396. else if(node.hasChildren())
  397. {
  398. CleanTree(node);
  399. }
  400. Itr->next();
  401. }
  402. if(partcount != 0)
  403. inTree.setPropInt("Partsfound",partcount);
  404. }