/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ // XRefNodeManager.cpp: implementation of the CXRefNodeManager class. // ////////////////////////////////////////////////////////////////////// #include "jiface.hpp" #include "jstring.hpp" #include "jptree.hpp" #include "jmisc.hpp" #include "mpcomm.hpp" #include "platform.h" #include "jlib.hpp" #include "mpbase.hpp" #include "daclient.hpp" #include "dadiags.hpp" #include "danqs.hpp" #include "dadfs.hpp" #include "dasds.hpp" #include "dautils.hpp" #include "daft.hpp" #include "rmtfile.hpp" #include "XRefNodeManager.hpp" ////////////////////////////////////////////////////////////////////// // Construction/Destruction ////////////////////////////////////////////////////////////////////// IXRefNodeManager * CreateXRefNodeFactory() { return new CXRefNodeManager(); } IXRefNode * CXRefNodeManager::getXRefNode(const char* NodeName) { //DBGLOG("Node Name %s:",NodeName); StringBuffer xpath; xpath.appendf("Cluster[@name=\"%s\"]", NodeName); //if not exists, add DFU/XREF/ClusterName branch to SDS //not linked Owned conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_NONE ,INFINITE); IPropertyTree* cluster_ptree = conn->queryRoot()->queryPropTree(xpath.str()); conn->commit(); if (cluster_ptree==0) { DBGLOG("Cluster[@name=%s] can't be found under /DFU/XREF", NodeName); return 0 ; } return new CXRefNode(NodeName,conn); } IXRefNode * CXRefNodeManager::CreateXRefNode(const char* NodeName) { Owned conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_LOCK_WRITE ,INFINITE); IPropertyTree* xref_ptree = conn->queryRoot(); IPropertyTree* cluster_ptree = xref_ptree->addPropTree("Cluster", createPTree()); cluster_ptree->setProp("@name",NodeName); conn->commit(); conn->changeMode(RTM_NONE); return new CXRefNode(NodeName,conn); } CXRefNode::CXRefNode() { m_bChanged = false; } CXRefNode::~CXRefNode() { } CXRefNode::CXRefNode(const char* NodeName, IRemoteConnection *_conn) { //DBGLOG("CXRefNode::CXRefNode(const char* NodeName)"); m_bChanged = false; m_conn.set(_conn); StringBuffer xpath; xpath.appendf("Cluster[@name=\"%s\"]", NodeName); IPropertyTree* cluster_ptree = m_conn->queryRoot()->queryPropTree(xpath.str()); m_XRefTree.set(cluster_ptree); m_XRefTree->getProp("@name",m_origName); //DBGLOG("returning from CXRefNode::CXRefNode(const char* NodeName)"); } CXRefNode::CXRefNode(IPropertyTree* pTreeRoot) { m_bChanged = false; try { m_XRefTree.set(pTreeRoot); pTreeRoot->getProp("@name",m_origName); //load up our tree with the data.....if there is data MemoryBuffer buff; pTreeRoot->getPropBin("data",buff); if (buff.length()) { m_dataStr.append(buff.length(),buff.toByteArray()); } //lets check to ensure we have the correct children inplace(Orphan,lost,found) } catch(...) { ERRLOG("Error in creation of XRefNode..."); } } bool CXRefNode::useSasha() { if (!m_conn) return false; return m_conn->queryRoot()->getPropBool("@useSasha"); } IPropertyTree& CXRefNode::getDataTree() { if(m_XRefDataTree.get() == 0) m_XRefDataTree.setown(createPTreeFromXMLString(m_dataStr.str())); return *m_XRefDataTree.get(); } //IConstXRefNode StringBuffer & CXRefNode::getName(StringBuffer & str) { if(m_XRefTree.get()) m_XRefTree->getProp("@name",str); return str; } StringBuffer& CXRefNode::getStatus(StringBuffer & str) { if(m_XRefTree.get()) m_XRefTree->getProp("@status",str); return str; } StringBuffer & CXRefNode::getLastModified(StringBuffer & str) { if(m_XRefTree.get()) m_XRefTree->getProp("@modified",str); return str; } StringBuffer& CXRefNode::getXRefData(StringBuffer & str) { return toXML(&getDataTree(),str); } IXRefFilesNode* CXRefNode::getLostFiles() { if(!m_lost.get()) { IPropertyTree* lostBranch = m_XRefTree->queryPropTree("Lost"); if(lostBranch == 0) { lostBranch = m_XRefTree->addPropTree("Lost",createPTree()); commit(); } const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL; StringBuffer tmpbuf; m_lost.setown(new CXRefFilesNode(*lostBranch,getName(tmpbuf).str(),rootdir)); } return m_lost.getLink(); } IXRefFilesNode* CXRefNode::getFoundFiles() { if(!m_found.get()) { IPropertyTree* foundBranch = m_XRefTree->queryPropTree("Found"); if(foundBranch == 0) { foundBranch = m_XRefTree->addPropTree("Found",createPTree()); commit(); } const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL; StringBuffer tmpbuf; m_found.setown(new CXRefFilesNode(*foundBranch,getName(tmpbuf).str(),rootdir)); } return m_found.getLink(); } IXRefFilesNode* CXRefNode::getOrphanFiles() { if(!m_orphans.get()) { IPropertyTree* orphanBranch = m_XRefTree->queryPropTree("Orphans"); if(orphanBranch == 0) { orphanBranch = m_XRefTree->addPropTree("Orphans",createPTree()); commit(); } const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL; StringBuffer tmpbuf; m_orphans.setown(new CXRefOrphanFilesNode(*orphanBranch,getName(tmpbuf).str(),rootdir)); } return m_orphans.getLink(); } StringBuffer &CXRefNode::serializeMessages(StringBuffer &buf) { if(!m_messages.get()) { IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages"); if(messagesBranch == 0) { messagesBranch = m_XRefTree->addPropTree("Messages",createPTree()); commit(); } StringBuffer tmpbuf; m_messages.set(messagesBranch); } buf.clear(); MemoryBuffer data; m_messages->getPropBin("data",data); if (data.length()) { buf.append(data.length(),data.toByteArray()); } return buf; } void CXRefNode::deserializeMessages(IPropertyTree& inTree) { if(!m_messages.get()) { IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages"); if(messagesBranch == 0) { messagesBranch = m_XRefTree->addPropTree("Messages",createPTree()); commit(); } StringBuffer tmpbuf; m_messages.set(messagesBranch); } StringBuffer datastr; toXML(&inTree,datastr); m_messages->setPropBin("data",datastr.length(),(void*)datastr.toCharArray()); } StringBuffer &CXRefNode::serializeDirectories(StringBuffer &buf) { if(!m_directories.get()) { IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories"); if(directoriesBranch == 0) { directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree()); commit(); } StringBuffer tmpbuf; m_directories.set(directoriesBranch); } buf.clear(); MemoryBuffer data; m_directories->getPropBin("data",data); if (data.length()) { buf.append(data.length(),data.toByteArray()); } return buf; } void CXRefNode::deserializeDirectories(IPropertyTree& inTree) { if(!m_directories.get()) { IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories"); if(directoriesBranch == 0) { directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree()); commit(); } StringBuffer tmpbuf; m_directories.set(directoriesBranch); } StringBuffer datastr; toXML(&inTree,datastr); m_directories->setPropBin("data",datastr.length(),(void*)datastr.toCharArray()); } static int strptrcmprev(char const ** l, char const ** r) { return -strcmp(*l, *r); } static bool deleteEmptyDir(IFile *dir) { // this is a bit odd - basically we already know no files but there may be empty sub-dirs Owned iter = dir->directoryFiles(NULL,false,true); IArrayOf subdirs; bool candelete = true; ForEach(*iter) { if (iter->isDir()) subdirs.append(iter->get()); else candelete = false; } if (!candelete) return false; try { ForEachItemIn(i,subdirs) { if (!deleteEmptyDir(&subdirs.item(i))) candelete = false; } } catch (IException *e) { EXCLOG(e,"deleteEmptyDir"); candelete = false; } if (!candelete) return false; static CriticalSection sect; CriticalBlock block(sect); // don't want to actually remove in parallel dir->remove(); return !dir->exists(); } static bool recursiveCheckEmptyScope(IPropertyTree &ct) { Owned iter = ct.getElements("*"); ForEach(*iter) { IPropertyTree &item = iter->query(); const char *n = item.queryName(); if (!n||(strcmp(n,queryDfsXmlBranchName(DXB_Scope))!=0)) return false; if (!recursiveCheckEmptyScope(item)) return false; } return true; } static void emptyScopes() { PROGLOG("Removing empty scopes"); Owned iter = queryDistributedFileDirectory().getScopeIterator(NULL,true,true); CDfsLogicalFileName dlfn; StringBuffer s; StringArray toremove; ForEach(*iter) { CDfsLogicalFileName dlfn; StringBuffer scope; scope.append(iter->query()); dlfn.set(scope.str(),"x"); dlfn.makeScopeQuery(s.clear(),true); Owned conn = querySDS().connect(s.str(),myProcessSession(),RTM_LOCK_READ, INFINITE); if (!conn) DBGLOG("Could not connect to '%s' using %s",iter->query(),s.str()); else { if (recursiveCheckEmptyScope(*conn->queryRoot())) { PROGLOG("Empty scope %s",iter->query()); toremove.append(iter->query()); } } } iter.clear(); ForEachItemIn(i,toremove) { PROGLOG("Removed scope %s",toremove.item(i)); queryDistributedFileDirectory().removeEmptyScope(toremove.item(i)); } } bool CXRefNode::removeEmptyDirectories(StringBuffer &errstr) { StringBuffer dataStr; serializeDirectories(dataStr); if (dataStr.length()==0) return true; Owned t = createPTreeFromXMLString(dataStr.str()); Owned iter = t->getElements("Directory"); const char *clustername = t->queryProp("Cluster"); if (!clustername||!*clustername) return false; Owned group = queryNamedGroupStore().lookup(clustername); if (!group) { ERRLOG("%s cluster not found",clustername); errstr.appendf("ERROR: %s cluster not found",clustername); return false; } StringArray dellist; PointerArray todelete; ForEach(*iter) { IPropertyTree &dir = iter->query(); if (dir.getPropInt64("Num")==0) { const char *dirname = dir.queryProp("Name"); if (dirname&&*dirname) { dellist.append(dirname); todelete.append(&dir); } } } dellist.sort(strptrcmprev); ForEachItemIn(di,dellist) { const char *dirname = dellist.item(di); class casyncfor: public CAsyncFor { IGroup *grp; StringAttr name; public: casyncfor(IGroup *_grp,const char *_name) : name(_name) { grp = _grp; } void Do(unsigned i) { RemoteFilename rfn; rfn.setPath(grp->queryNode(i).endpoint(),name); StringBuffer eps; try { Owned dir = createIFile(rfn); if (deleteEmptyDir(dir)) PROGLOG("Removed '%s'",dir->queryFilename()); else WARNLOG("Could not remove '%s'",dir->queryFilename()); } catch (IException *e) { EXCLOG(e,"Could not remove directory"); e->Release(); } } } afor(group,dirname); afor.For(group->ordinality(),10,false,true); } iter.clear(); ForEachItemInRev(i,todelete) t->removeTree((IPropertyTree *)todelete.item(i)); // probably should check succeeded above but next run will correct toXML(t,dataStr.clear()); m_directories->setPropBin("data",dataStr.length(),(void*)dataStr.toCharArray()); emptyScopes(); return true; } //IXRefNode void CXRefNode::setName(const char* str) { m_XRefTree->setProp("@name",str); if (m_origName.length() == 0) m_origName.append(str); } void CXRefNode::setStatus(const char* str) { m_XRefTree->setProp("@status",str); } StringBuffer& CXRefNode::getCluster(StringBuffer& Cluster) { Cluster.append(m_ClusterName.str()); return Cluster; } void CXRefNode::setCluster(const char* str) { m_ClusterName.clear(); m_ClusterName.append(str); } void CXRefNode::setLastModified(IJlibDateTime& dt ) { SCMStringBuffer datestr,timestr; dt.getDateString(datestr); dt.getTimeString(timestr); StringBuffer tmpstr(datestr.str()); tmpstr.append(" "); tmpstr.append(timestr.str()); m_XRefTree->setProp("@modified",tmpstr.str()); } void CXRefNode::BuildXRefData(IPropertyTree & pTree,const char* Cluster) { // DBGLOG("CXRefNode::BuildXRefData"); if(m_XRefTree.get() == 0) throw MakeStringException(-1, "No XRef Dali Tree available"); Owned lost = getLostFiles(); Owned found = getFoundFiles() ; Owned orphan = getOrphanFiles(); IPropertyTree* pSubTree = pTree.queryPropTree("Orphans"); pSubTree->setProp("Cluster",Cluster); orphan->Deserialize(*pSubTree); pSubTree = pTree.queryPropTree("Lost"); pSubTree->setProp("Cluster",Cluster); lost->Deserialize(*pSubTree); pSubTree = pTree.queryPropTree("Found"); pSubTree->setProp("Cluster",Cluster); found->Deserialize(*pSubTree); pSubTree = pTree.queryPropTree("Messages"); pSubTree->setProp("Cluster",Cluster); deserializeMessages(*pSubTree); pSubTree = pTree.queryPropTree("Directories"); pSubTree->setProp("Cluster",Cluster); deserializeDirectories(*pSubTree); Owned dt = createDateTimeNow(); setLastModified(*dt); setStatus("Generated"); commit(); } bool CXRefNode::IsChanged() { if ((m_orphans.get() && m_orphans->IsChanged() == true) || (m_lost.get() && m_lost->IsChanged() == true) || (m_found.get() && m_found->IsChanged() == true) || m_bChanged == true ) return true; return false; } void CXRefNode::SetChanged(bool bChanged) { m_bChanged = bChanged; } void CXRefNode::commit() { CriticalSection(commitMutex); if(m_conn == 0) return; Owned lost = getLostFiles(); Owned found = getFoundFiles() ; Owned orphan = getOrphanFiles(); lost->Commit(); found->Commit(); orphan->Commit(); m_conn->commit(); } void CXRefNode::progress(const char *text) { DBGLOG("PROGRESS: %s\n",text); setStatus(text); commit(); } void CXRefNode::error(const char *text) { DBGLOG("ERROR: %s\n",text); setStatus(text); commit(); }