XRefNodeManager.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. // XRefNodeManager.cpp: implementation of the CXRefNodeManager class.
  15. //
  16. //////////////////////////////////////////////////////////////////////
  17. #include "jiface.hpp"
  18. #include "jstring.hpp"
  19. #include "jptree.hpp"
  20. #include "jmisc.hpp"
  21. #include "mpcomm.hpp"
  22. #include "platform.h"
  23. #include "jlib.hpp"
  24. #include "mpbase.hpp"
  25. #include "daclient.hpp"
  26. #include "dadiags.hpp"
  27. #include "danqs.hpp"
  28. #include "dadfs.hpp"
  29. #include "dasds.hpp"
  30. #include "dautils.hpp"
  31. #include "daft.hpp"
  32. #include "rmtfile.hpp"
  33. #include "XRefNodeManager.hpp"
  34. //////////////////////////////////////////////////////////////////////
  35. // Construction/Destruction
  36. //////////////////////////////////////////////////////////////////////
  37. IXRefNodeManager * CreateXRefNodeFactory()
  38. {
  39. return new CXRefNodeManager();
  40. }
  41. IXRefNode * CXRefNodeManager::getXRefNode(const char* NodeName)
  42. {
  43. //DBGLOG("Node Name %s:",NodeName);
  44. StringBuffer xpath;
  45. xpath.appendf("Cluster[@name=\"%s\"]", NodeName);
  46. //if not exists, add DFU/XREF/ClusterName branch to SDS
  47. //not linked
  48. Owned<IRemoteConnection> conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_NONE ,INFINITE);
  49. IPropertyTree* cluster_ptree = conn->queryRoot()->queryPropTree(xpath.str());
  50. conn->commit();
  51. if (cluster_ptree==0)
  52. {
  53. DBGLOG("Cluster[@name=%s] can't be found under /DFU/XREF", NodeName);
  54. return 0 ;
  55. }
  56. return new CXRefNode(NodeName,conn);
  57. }
  58. IXRefNode * CXRefNodeManager::CreateXRefNode(const char* NodeName)
  59. {
  60. Owned<IRemoteConnection> conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_LOCK_WRITE ,INFINITE);
  61. IPropertyTree* xref_ptree = conn->queryRoot();
  62. IPropertyTree* cluster_ptree = xref_ptree->addPropTree("Cluster", createPTree());
  63. cluster_ptree->setProp("@name",NodeName);
  64. conn->commit();
  65. conn->changeMode(RTM_NONE);
  66. return new CXRefNode(NodeName,conn);
  67. }
  68. CXRefNode::CXRefNode()
  69. {
  70. m_bChanged = false;
  71. }
  72. CXRefNode::~CXRefNode()
  73. {
  74. }
  75. CXRefNode::CXRefNode(const char* NodeName, IRemoteConnection *_conn)
  76. {
  77. //DBGLOG("CXRefNode::CXRefNode(const char* NodeName)");
  78. m_bChanged = false;
  79. m_conn.set(_conn);
  80. StringBuffer xpath;
  81. xpath.appendf("Cluster[@name=\"%s\"]", NodeName);
  82. IPropertyTree* cluster_ptree = m_conn->queryRoot()->queryPropTree(xpath.str());
  83. m_XRefTree.set(cluster_ptree);
  84. m_XRefTree->getProp("@name",m_origName);
  85. //DBGLOG("returning from CXRefNode::CXRefNode(const char* NodeName)");
  86. }
  87. CXRefNode::CXRefNode(IPropertyTree* pTreeRoot)
  88. {
  89. m_bChanged = false;
  90. try
  91. {
  92. m_XRefTree.set(pTreeRoot);
  93. pTreeRoot->getProp("@name",m_origName);
  94. //load up our tree with the data.....if there is data
  95. MemoryBuffer buff;
  96. pTreeRoot->getPropBin("data",buff);
  97. if (buff.length())
  98. {
  99. m_dataStr.append(buff.length(),buff.toByteArray());
  100. }
  101. //lets check to ensure we have the correct children inplace(Orphan,lost,found)
  102. }
  103. catch(...)
  104. {
  105. ERRLOG("Error in creation of XRefNode...");
  106. }
  107. }
  108. bool CXRefNode::useSasha()
  109. {
  110. if (!m_conn)
  111. return false;
  112. return m_conn->queryRoot()->getPropBool("@useSasha");
  113. }
  114. IPropertyTree& CXRefNode::getDataTree()
  115. {
  116. if(m_XRefDataTree.get() == 0)
  117. m_XRefDataTree.setown(createPTreeFromXMLString(m_dataStr.str()));
  118. return *m_XRefDataTree.get();
  119. }
  120. //IConstXRefNode
  121. StringBuffer & CXRefNode::getName(StringBuffer & str)
  122. {
  123. if(m_XRefTree.get())
  124. m_XRefTree->getProp("@name",str);
  125. return str;
  126. }
  127. StringBuffer& CXRefNode::getStatus(StringBuffer & str)
  128. {
  129. if(m_XRefTree.get())
  130. m_XRefTree->getProp("@status",str);
  131. return str;
  132. }
  133. StringBuffer & CXRefNode::getLastModified(StringBuffer & str)
  134. {
  135. if(m_XRefTree.get())
  136. m_XRefTree->getProp("@modified",str);
  137. return str;
  138. }
  139. StringBuffer& CXRefNode::getXRefData(StringBuffer & str)
  140. {
  141. return toXML(&getDataTree(),str);
  142. }
  143. IXRefFilesNode* CXRefNode::getLostFiles()
  144. {
  145. if(!m_lost.get())
  146. {
  147. IPropertyTree* lostBranch = m_XRefTree->queryPropTree("Lost");
  148. if(lostBranch == 0)
  149. {
  150. lostBranch = m_XRefTree->addPropTree("Lost",createPTree());
  151. commit();
  152. }
  153. const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
  154. StringBuffer tmpbuf;
  155. m_lost.setown(new CXRefFilesNode(*lostBranch,getName(tmpbuf).str(),rootdir));
  156. }
  157. return m_lost.getLink();
  158. }
  159. IXRefFilesNode* CXRefNode::getFoundFiles()
  160. {
  161. if(!m_found.get())
  162. {
  163. IPropertyTree* foundBranch = m_XRefTree->queryPropTree("Found");
  164. if(foundBranch == 0)
  165. {
  166. foundBranch = m_XRefTree->addPropTree("Found",createPTree());
  167. commit();
  168. }
  169. const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
  170. StringBuffer tmpbuf;
  171. m_found.setown(new CXRefFilesNode(*foundBranch,getName(tmpbuf).str(),rootdir));
  172. }
  173. return m_found.getLink();
  174. }
  175. IXRefFilesNode* CXRefNode::getOrphanFiles()
  176. {
  177. if(!m_orphans.get())
  178. {
  179. IPropertyTree* orphanBranch = m_XRefTree->queryPropTree("Orphans");
  180. if(orphanBranch == 0)
  181. {
  182. orphanBranch = m_XRefTree->addPropTree("Orphans",createPTree());
  183. commit();
  184. }
  185. const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
  186. StringBuffer tmpbuf;
  187. m_orphans.setown(new CXRefOrphanFilesNode(*orphanBranch,getName(tmpbuf).str(),rootdir));
  188. }
  189. return m_orphans.getLink();
  190. }
  191. StringBuffer &CXRefNode::serializeMessages(StringBuffer &buf)
  192. {
  193. if(!m_messages.get())
  194. {
  195. IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages");
  196. if(messagesBranch == 0)
  197. {
  198. messagesBranch = m_XRefTree->addPropTree("Messages",createPTree());
  199. commit();
  200. }
  201. StringBuffer tmpbuf;
  202. m_messages.set(messagesBranch);
  203. }
  204. buf.clear();
  205. MemoryBuffer data;
  206. m_messages->getPropBin("data",data);
  207. if (data.length())
  208. {
  209. buf.append(data.length(),data.toByteArray());
  210. }
  211. return buf;
  212. }
  213. void CXRefNode::deserializeMessages(IPropertyTree& inTree)
  214. {
  215. if(!m_messages.get())
  216. {
  217. IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages");
  218. if(messagesBranch == 0)
  219. {
  220. messagesBranch = m_XRefTree->addPropTree("Messages",createPTree());
  221. commit();
  222. }
  223. StringBuffer tmpbuf;
  224. m_messages.set(messagesBranch);
  225. }
  226. StringBuffer datastr;
  227. toXML(&inTree,datastr);
  228. m_messages->setPropBin("data",datastr.length(),(void*)datastr.toCharArray());
  229. }
  230. StringBuffer &CXRefNode::serializeDirectories(StringBuffer &buf)
  231. {
  232. if(!m_directories.get())
  233. {
  234. IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories");
  235. if(directoriesBranch == 0)
  236. {
  237. directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree());
  238. commit();
  239. }
  240. StringBuffer tmpbuf;
  241. m_directories.set(directoriesBranch);
  242. }
  243. buf.clear();
  244. MemoryBuffer data;
  245. m_directories->getPropBin("data",data);
  246. if (data.length())
  247. {
  248. buf.append(data.length(),data.toByteArray());
  249. }
  250. return buf;
  251. }
  252. void CXRefNode::deserializeDirectories(IPropertyTree& inTree)
  253. {
  254. if(!m_directories.get())
  255. {
  256. IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories");
  257. if(directoriesBranch == 0)
  258. {
  259. directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree());
  260. commit();
  261. }
  262. StringBuffer tmpbuf;
  263. m_directories.set(directoriesBranch);
  264. }
  265. StringBuffer datastr;
  266. toXML(&inTree,datastr);
  267. m_directories->setPropBin("data",datastr.length(),(void*)datastr.toCharArray());
  268. }
  269. static int strptrcmprev(char const ** l, char const ** r) { return -strcmp(*l, *r); }
  270. static bool deleteEmptyDir(IFile *dir)
  271. {
  272. // this is a bit odd - basically we already know no files but there may be empty sub-dirs
  273. Owned<IDirectoryIterator> iter = dir->directoryFiles(NULL,false,true);
  274. IArrayOf<IFile> subdirs;
  275. bool candelete = true;
  276. ForEach(*iter) {
  277. if (iter->isDir())
  278. subdirs.append(iter->get());
  279. else
  280. candelete = false;
  281. }
  282. if (!candelete)
  283. return false;
  284. try {
  285. ForEachItemIn(i,subdirs) {
  286. if (!deleteEmptyDir(&subdirs.item(i)))
  287. candelete = false;
  288. }
  289. }
  290. catch (IException *e) {
  291. EXCLOG(e,"deleteEmptyDir");
  292. candelete = false;
  293. }
  294. if (!candelete)
  295. return false;
  296. static CriticalSection sect;
  297. CriticalBlock block(sect); // don't want to actually remove in parallel
  298. dir->remove();
  299. return !dir->exists();
  300. }
  301. static bool recursiveCheckEmptyScope(IPropertyTree &ct)
  302. {
  303. Owned<IPropertyTreeIterator> iter = ct.getElements("*");
  304. ForEach(*iter) {
  305. IPropertyTree &item = iter->query();
  306. const char *n = item.queryName();
  307. if (!n||(strcmp(n,queryDfsXmlBranchName(DXB_Scope))!=0))
  308. return false;
  309. if (!recursiveCheckEmptyScope(item))
  310. return false;
  311. }
  312. return true;
  313. }
  314. static void emptyScopes()
  315. {
  316. PROGLOG("Removing empty scopes");
  317. Owned<IDFScopeIterator> iter = queryDistributedFileDirectory().getScopeIterator(NULL,true,true);
  318. CDfsLogicalFileName dlfn;
  319. StringBuffer s;
  320. StringArray toremove;
  321. ForEach(*iter) {
  322. CDfsLogicalFileName dlfn;
  323. StringBuffer scope;
  324. scope.append(iter->query());
  325. dlfn.set(scope.str(),"x");
  326. dlfn.makeScopeQuery(s.clear(),true);
  327. Owned<IRemoteConnection> conn = querySDS().connect(s.str(),myProcessSession(),RTM_LOCK_READ, INFINITE);
  328. if (!conn)
  329. DBGLOG("Could not connect to '%s' using %s",iter->query(),s.str());
  330. else {
  331. if (recursiveCheckEmptyScope(*conn->queryRoot())) {
  332. PROGLOG("Empty scope %s",iter->query());
  333. toremove.append(iter->query());
  334. }
  335. }
  336. }
  337. iter.clear();
  338. ForEachItemIn(i,toremove) {
  339. PROGLOG("Removed scope %s",toremove.item(i));
  340. queryDistributedFileDirectory().removeEmptyScope(toremove.item(i));
  341. }
  342. }
  343. bool CXRefNode::removeEmptyDirectories(StringBuffer &errstr)
  344. {
  345. StringBuffer dataStr;
  346. serializeDirectories(dataStr);
  347. if (dataStr.length()==0)
  348. return true;
  349. Owned<IPropertyTree> t = createPTreeFromXMLString(dataStr.str());
  350. Owned<IPropertyTreeIterator> iter = t->getElements("Directory");
  351. const char *clustername = t->queryProp("Cluster");
  352. if (!clustername||!*clustername)
  353. return false;
  354. Owned<IGroup> group = queryNamedGroupStore().lookup(clustername);
  355. if (!group) {
  356. ERRLOG("%s cluster not found",clustername);
  357. errstr.appendf("ERROR: %s cluster not found",clustername);
  358. return false;
  359. }
  360. StringArray dellist;
  361. PointerArray todelete;
  362. ForEach(*iter) {
  363. IPropertyTree &dir = iter->query();
  364. if (dir.getPropInt64("Num")==0) {
  365. const char *dirname = dir.queryProp("Name");
  366. if (dirname&&*dirname) {
  367. dellist.append(dirname);
  368. todelete.append(&dir);
  369. }
  370. }
  371. }
  372. dellist.sort(strptrcmprev);
  373. ForEachItemIn(di,dellist) {
  374. const char *dirname = dellist.item(di);
  375. class casyncfor: public CAsyncFor
  376. {
  377. IGroup *grp;
  378. StringAttr name;
  379. public:
  380. casyncfor(IGroup *_grp,const char *_name)
  381. : name(_name)
  382. {
  383. grp = _grp;
  384. }
  385. void Do(unsigned i)
  386. {
  387. RemoteFilename rfn;
  388. rfn.setPath(grp->queryNode(i).endpoint(),name);
  389. StringBuffer eps;
  390. try
  391. {
  392. Owned<IFile> dir = createIFile(rfn);
  393. if (deleteEmptyDir(dir))
  394. PROGLOG("Removed '%s'",dir->queryFilename());
  395. else
  396. WARNLOG("Could not remove '%s'",dir->queryFilename());
  397. }
  398. catch (IException *e)
  399. {
  400. EXCLOG(e,"Could not remove directory");
  401. e->Release();
  402. }
  403. }
  404. } afor(group,dirname);
  405. afor.For(group->ordinality(),10,false,true);
  406. }
  407. iter.clear();
  408. ForEachItemInRev(i,todelete)
  409. t->removeTree((IPropertyTree *)todelete.item(i)); // probably should check succeeded above but next run will correct
  410. toXML(t,dataStr.clear());
  411. m_directories->setPropBin("data",dataStr.length(),(void*)dataStr.toCharArray());
  412. emptyScopes();
  413. return true;
  414. }
  415. //IXRefNode
  416. void CXRefNode::setName(const char* str)
  417. {
  418. m_XRefTree->setProp("@name",str);
  419. if (m_origName.length() == 0)
  420. m_origName.append(str);
  421. }
  422. void CXRefNode::setStatus(const char* str)
  423. {
  424. m_XRefTree->setProp("@status",str);
  425. }
  426. StringBuffer& CXRefNode::getCluster(StringBuffer& Cluster)
  427. {
  428. Cluster.append(m_ClusterName.str());
  429. return Cluster;
  430. }
  431. void CXRefNode::setCluster(const char* str)
  432. {
  433. m_ClusterName.clear();
  434. m_ClusterName.append(str);
  435. }
  436. void CXRefNode::setLastModified(IJlibDateTime& dt )
  437. {
  438. SCMStringBuffer datestr,timestr;
  439. dt.getDateString(datestr);
  440. dt.getTimeString(timestr);
  441. StringBuffer tmpstr(datestr.str());
  442. tmpstr.append(" ");
  443. tmpstr.append(timestr.str());
  444. m_XRefTree->setProp("@modified",tmpstr.str());
  445. }
  446. void CXRefNode::BuildXRefData(IPropertyTree & pTree,const char* Cluster)
  447. {
  448. // DBGLOG("CXRefNode::BuildXRefData");
  449. if(m_XRefTree.get() == 0)
  450. throw MakeStringException(-1, "No XRef Dali Tree available");
  451. Owned<IXRefFilesNode> lost = getLostFiles();
  452. Owned<IXRefFilesNode> found = getFoundFiles() ;
  453. Owned<IXRefFilesNode> orphan = getOrphanFiles();
  454. IPropertyTree* pSubTree = pTree.queryPropTree("Orphans");
  455. pSubTree->setProp("Cluster",Cluster);
  456. orphan->Deserialize(*pSubTree);
  457. pSubTree = pTree.queryPropTree("Lost");
  458. pSubTree->setProp("Cluster",Cluster);
  459. lost->Deserialize(*pSubTree);
  460. pSubTree = pTree.queryPropTree("Found");
  461. pSubTree->setProp("Cluster",Cluster);
  462. found->Deserialize(*pSubTree);
  463. pSubTree = pTree.queryPropTree("Messages");
  464. pSubTree->setProp("Cluster",Cluster);
  465. deserializeMessages(*pSubTree);
  466. pSubTree = pTree.queryPropTree("Directories");
  467. pSubTree->setProp("Cluster",Cluster);
  468. deserializeDirectories(*pSubTree);
  469. Owned<IJlibDateTime> dt = createDateTimeNow();
  470. setLastModified(*dt);
  471. setStatus("Generated");
  472. commit();
  473. }
  474. bool CXRefNode::IsChanged()
  475. {
  476. if ((m_orphans.get() && m_orphans->IsChanged() == true) ||
  477. (m_lost.get() && m_lost->IsChanged() == true) ||
  478. (m_found.get() && m_found->IsChanged() == true) ||
  479. m_bChanged == true )
  480. return true;
  481. return false;
  482. }
  483. void CXRefNode::SetChanged(bool bChanged)
  484. {
  485. m_bChanged = bChanged;
  486. }
  487. void CXRefNode::commit()
  488. {
  489. CriticalSection(commitMutex);
  490. if(m_conn == 0)
  491. return;
  492. Owned<IXRefFilesNode> lost = getLostFiles();
  493. Owned<IXRefFilesNode> found = getFoundFiles() ;
  494. Owned<IXRefFilesNode> orphan = getOrphanFiles();
  495. lost->Commit();
  496. found->Commit();
  497. orphan->Commit();
  498. m_conn->commit();
  499. }
  500. void CXRefNode::progress(const char *text)
  501. {
  502. DBGLOG("PROGRESS: %s\n",text);
  503. setStatus(text);
  504. commit();
  505. }
  506. void CXRefNode::error(const char *text)
  507. {
  508. DBGLOG("ERROR: %s\n",text);
  509. setStatus(text);
  510. commit();
  511. }