123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- // XRefNodeManager.cpp: implementation of the CXRefNodeManager class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "jiface.hpp"
- #include "jstring.hpp"
- #include "jptree.hpp"
- #include "jmisc.hpp"
- #include "mpcomm.hpp"
- #include "platform.h"
- #include "jlib.hpp"
- #include "mpbase.hpp"
- #include "daclient.hpp"
- #include "dadiags.hpp"
- #include "danqs.hpp"
- #include "dadfs.hpp"
- #include "dasds.hpp"
- #include "dautils.hpp"
- #include "daft.hpp"
- #include "rmtfile.hpp"
- #include "XRefNodeManager.hpp"
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- IXRefNodeManager * CreateXRefNodeFactory()
- {
- return new CXRefNodeManager();
- }
- IXRefNode * CXRefNodeManager::getXRefNode(const char* NodeName)
- {
- //DBGLOG("Node Name %s:",NodeName);
- StringBuffer xpath;
- xpath.appendf("Cluster[@name=\"%s\"]", NodeName);
- //if not exists, add DFU/XREF/ClusterName branch to SDS
- //not linked
- Owned<IRemoteConnection> conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_NONE ,INFINITE);
- IPropertyTree* cluster_ptree = conn->queryRoot()->queryPropTree(xpath.str());
- conn->commit();
- if (cluster_ptree==0)
- {
- DBGLOG("Cluster[@name=%s] can't be found under /DFU/XREF", NodeName);
- return 0 ;
- }
- return new CXRefNode(NodeName,conn);
- }
- IXRefNode * CXRefNodeManager::CreateXRefNode(const char* NodeName)
- {
- Owned<IRemoteConnection> conn = querySDS().connect("/DFU/XREF",myProcessSession(),RTM_CREATE_QUERY|RTM_LOCK_WRITE ,INFINITE);
- IPropertyTree* xref_ptree = conn->queryRoot();
- IPropertyTree* cluster_ptree = xref_ptree->addPropTree("Cluster", createPTree());
- cluster_ptree->setProp("@name",NodeName);
- conn->commit();
- conn->changeMode(RTM_NONE);
- return new CXRefNode(NodeName,conn);
- }
- CXRefNode::CXRefNode()
- {
- m_bChanged = false;
- }
- CXRefNode::~CXRefNode()
- {
- }
- CXRefNode::CXRefNode(const char* NodeName, IRemoteConnection *_conn)
- {
- //DBGLOG("CXRefNode::CXRefNode(const char* NodeName)");
- m_bChanged = false;
- m_conn.set(_conn);
- StringBuffer xpath;
- xpath.appendf("Cluster[@name=\"%s\"]", NodeName);
- IPropertyTree* cluster_ptree = m_conn->queryRoot()->queryPropTree(xpath.str());
- m_XRefTree.set(cluster_ptree);
- m_XRefTree->getProp("@name",m_origName);
- //DBGLOG("returning from CXRefNode::CXRefNode(const char* NodeName)");
- }
- CXRefNode::CXRefNode(IPropertyTree* pTreeRoot)
- {
- m_bChanged = false;
- try
- {
- m_XRefTree.set(pTreeRoot);
- pTreeRoot->getProp("@name",m_origName);
- //load up our tree with the data.....if there is data
- MemoryBuffer buff;
- pTreeRoot->getPropBin("data",buff);
- if (buff.length())
- {
- m_dataStr.append(buff.length(),buff.toByteArray());
- }
- //lets check to ensure we have the correct children inplace(Orphan,lost,found)
- }
- catch(...)
- {
- ERRLOG("Error in creation of XRefNode...");
- }
- }
- bool CXRefNode::useSasha()
- {
- if (!m_conn)
- return false;
- return m_conn->queryRoot()->getPropBool("@useSasha");
- }
- IPropertyTree& CXRefNode::getDataTree()
- {
- if(m_XRefDataTree.get() == 0)
- m_XRefDataTree.setown(createPTreeFromXMLString(m_dataStr.str()));
- return *m_XRefDataTree.get();
- }
- //IConstXRefNode
- StringBuffer & CXRefNode::getName(StringBuffer & str)
- {
- if(m_XRefTree.get())
- m_XRefTree->getProp("@name",str);
- return str;
- }
- StringBuffer& CXRefNode::getStatus(StringBuffer & str)
- {
- if(m_XRefTree.get())
- m_XRefTree->getProp("@status",str);
- return str;
- }
- StringBuffer & CXRefNode::getLastModified(StringBuffer & str)
- {
- if(m_XRefTree.get())
- m_XRefTree->getProp("@modified",str);
- return str;
- }
- StringBuffer& CXRefNode::getXRefData(StringBuffer & str)
- {
- return toXML(&getDataTree(),str);
- }
- IXRefFilesNode* CXRefNode::getLostFiles()
- {
- if(!m_lost.get())
- {
- IPropertyTree* lostBranch = m_XRefTree->queryPropTree("Lost");
- if(lostBranch == 0)
- {
- lostBranch = m_XRefTree->addPropTree("Lost",createPTree());
- commit();
- }
- const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
- StringBuffer tmpbuf;
- m_lost.setown(new CXRefFilesNode(*lostBranch,getName(tmpbuf).str(),rootdir));
- }
- return m_lost.getLink();
- }
- IXRefFilesNode* CXRefNode::getFoundFiles()
- {
- if(!m_found.get())
- {
- IPropertyTree* foundBranch = m_XRefTree->queryPropTree("Found");
- if(foundBranch == 0)
- {
- foundBranch = m_XRefTree->addPropTree("Found",createPTree());
- commit();
- }
- const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
- StringBuffer tmpbuf;
- m_found.setown(new CXRefFilesNode(*foundBranch,getName(tmpbuf).str(),rootdir));
- }
- return m_found.getLink();
- }
- IXRefFilesNode* CXRefNode::getOrphanFiles()
- {
- if(!m_orphans.get())
- {
- IPropertyTree* orphanBranch = m_XRefTree->queryPropTree("Orphans");
- if(orphanBranch == 0)
- {
- orphanBranch = m_XRefTree->addPropTree("Orphans",createPTree());
- commit();
- }
- const char *rootdir = m_XRefTree.get()?m_XRefTree->queryProp("@rootdir"):NULL;
- StringBuffer tmpbuf;
- m_orphans.setown(new CXRefOrphanFilesNode(*orphanBranch,getName(tmpbuf).str(),rootdir));
- }
- return m_orphans.getLink();
- }
- StringBuffer &CXRefNode::serializeMessages(StringBuffer &buf)
- {
- if(!m_messages.get())
- {
- IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages");
- if(messagesBranch == 0)
- {
- messagesBranch = m_XRefTree->addPropTree("Messages",createPTree());
- commit();
- }
- StringBuffer tmpbuf;
- m_messages.set(messagesBranch);
- }
- buf.clear();
- MemoryBuffer data;
- m_messages->getPropBin("data",data);
- if (data.length())
- {
- buf.append(data.length(),data.toByteArray());
- }
- return buf;
- }
- void CXRefNode::deserializeMessages(IPropertyTree& inTree)
- {
- if(!m_messages.get())
- {
- IPropertyTree* messagesBranch = m_XRefTree->queryPropTree("Messages");
- if(messagesBranch == 0)
- {
- messagesBranch = m_XRefTree->addPropTree("Messages",createPTree());
- commit();
- }
- StringBuffer tmpbuf;
- m_messages.set(messagesBranch);
- }
- StringBuffer datastr;
- toXML(&inTree,datastr);
- m_messages->setPropBin("data",datastr.length(),(void*)datastr.toCharArray());
- }
- StringBuffer &CXRefNode::serializeDirectories(StringBuffer &buf)
- {
- if(!m_directories.get())
- {
- IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories");
- if(directoriesBranch == 0)
- {
- directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree());
- commit();
- }
- StringBuffer tmpbuf;
- m_directories.set(directoriesBranch);
- }
- buf.clear();
- MemoryBuffer data;
- m_directories->getPropBin("data",data);
- if (data.length())
- {
- buf.append(data.length(),data.toByteArray());
- }
- return buf;
- }
- void CXRefNode::deserializeDirectories(IPropertyTree& inTree)
- {
- if(!m_directories.get())
- {
- IPropertyTree* directoriesBranch = m_XRefTree->queryPropTree("Directories");
- if(directoriesBranch == 0)
- {
- directoriesBranch = m_XRefTree->addPropTree("Directories",createPTree());
- commit();
- }
- StringBuffer tmpbuf;
- m_directories.set(directoriesBranch);
- }
- StringBuffer datastr;
- toXML(&inTree,datastr);
- m_directories->setPropBin("data",datastr.length(),(void*)datastr.toCharArray());
- }
- static int strptrcmprev(char const ** l, char const ** r) { return -strcmp(*l, *r); }
- static bool deleteEmptyDir(IFile *dir)
- {
- // this is a bit odd - basically we already know no files but there may be empty sub-dirs
- Owned<IDirectoryIterator> iter = dir->directoryFiles(NULL,false,true);
- IArrayOf<IFile> subdirs;
- bool candelete = true;
- ForEach(*iter) {
- if (iter->isDir())
- subdirs.append(iter->get());
- else
- candelete = false;
- }
- if (!candelete)
- return false;
- try {
- ForEachItemIn(i,subdirs) {
- if (!deleteEmptyDir(&subdirs.item(i)))
- candelete = false;
- }
- }
- catch (IException *e) {
- EXCLOG(e,"deleteEmptyDir");
- candelete = false;
- }
- if (!candelete)
- return false;
- static CriticalSection sect;
- CriticalBlock block(sect); // don't want to actually remove in parallel
- dir->remove();
- return !dir->exists();
- }
- static bool recursiveCheckEmptyScope(IPropertyTree &ct)
- {
- Owned<IPropertyTreeIterator> iter = ct.getElements("*");
- ForEach(*iter) {
- IPropertyTree &item = iter->query();
- const char *n = item.queryName();
- if (!n||(strcmp(n,queryDfsXmlBranchName(DXB_Scope))!=0))
- return false;
- if (!recursiveCheckEmptyScope(item))
- return false;
- }
- return true;
- }
- static void emptyScopes()
- {
- PROGLOG("Removing empty scopes");
- Owned<IDFScopeIterator> iter = queryDistributedFileDirectory().getScopeIterator(NULL,true,true);
- CDfsLogicalFileName dlfn;
- StringBuffer s;
- StringArray toremove;
- ForEach(*iter) {
- CDfsLogicalFileName dlfn;
- StringBuffer scope;
- scope.append(iter->query());
- dlfn.set(scope.str(),"x");
- dlfn.makeScopeQuery(s.clear(),true);
- Owned<IRemoteConnection> conn = querySDS().connect(s.str(),myProcessSession(),RTM_LOCK_READ, INFINITE);
- if (!conn)
- DBGLOG("Could not connect to '%s' using %s",iter->query(),s.str());
- else {
- if (recursiveCheckEmptyScope(*conn->queryRoot())) {
- PROGLOG("Empty scope %s",iter->query());
- toremove.append(iter->query());
- }
- }
- }
- iter.clear();
- ForEachItemIn(i,toremove) {
- PROGLOG("Removed scope %s",toremove.item(i));
- queryDistributedFileDirectory().removeEmptyScope(toremove.item(i));
- }
- }
- bool CXRefNode::removeEmptyDirectories(StringBuffer &errstr)
- {
- StringBuffer dataStr;
- serializeDirectories(dataStr);
- if (dataStr.length()==0)
- return true;
- Owned<IPropertyTree> t = createPTreeFromXMLString(dataStr.str());
- Owned<IPropertyTreeIterator> iter = t->getElements("Directory");
- const char *clustername = t->queryProp("Cluster");
- if (!clustername||!*clustername)
- return false;
- Owned<IGroup> group = queryNamedGroupStore().lookup(clustername);
- if (!group) {
- ERRLOG("%s cluster not found",clustername);
- errstr.appendf("ERROR: %s cluster not found",clustername);
- return false;
- }
- StringArray dellist;
- PointerArray todelete;
- ForEach(*iter) {
- IPropertyTree &dir = iter->query();
- if (dir.getPropInt64("Num")==0) {
- const char *dirname = dir.queryProp("Name");
- if (dirname&&*dirname) {
- dellist.append(dirname);
- todelete.append(&dir);
- }
- }
- }
- dellist.sort(strptrcmprev);
- ForEachItemIn(di,dellist) {
- const char *dirname = dellist.item(di);
- class casyncfor: public CAsyncFor
- {
- IGroup *grp;
- StringAttr name;
- public:
- casyncfor(IGroup *_grp,const char *_name)
- : name(_name)
- {
- grp = _grp;
- }
- void Do(unsigned i)
- {
- RemoteFilename rfn;
- rfn.setPath(grp->queryNode(i).endpoint(),name);
- StringBuffer eps;
- try
- {
- Owned<IFile> dir = createIFile(rfn);
- if (deleteEmptyDir(dir))
- PROGLOG("Removed '%s'",dir->queryFilename());
- else
- WARNLOG("Could not remove '%s'",dir->queryFilename());
- }
- catch (IException *e)
- {
- EXCLOG(e,"Could not remove directory");
- e->Release();
- }
- }
- } afor(group,dirname);
- afor.For(group->ordinality(),10,false,true);
- }
- iter.clear();
- ForEachItemInRev(i,todelete)
- t->removeTree((IPropertyTree *)todelete.item(i)); // probably should check succeeded above but next run will correct
- toXML(t,dataStr.clear());
- m_directories->setPropBin("data",dataStr.length(),(void*)dataStr.toCharArray());
- emptyScopes();
- return true;
- }
- //IXRefNode
- void CXRefNode::setName(const char* str)
- {
- m_XRefTree->setProp("@name",str);
- if (m_origName.length() == 0)
- m_origName.append(str);
- }
- void CXRefNode::setStatus(const char* str)
- {
- m_XRefTree->setProp("@status",str);
- }
- StringBuffer& CXRefNode::getCluster(StringBuffer& Cluster)
- {
- Cluster.append(m_ClusterName.str());
- return Cluster;
- }
- void CXRefNode::setCluster(const char* str)
- {
- m_ClusterName.clear();
- m_ClusterName.append(str);
- }
- void CXRefNode::setLastModified(IJlibDateTime& dt )
- {
- SCMStringBuffer datestr,timestr;
- dt.getDateString(datestr);
- dt.getTimeString(timestr);
- StringBuffer tmpstr(datestr.str());
- tmpstr.append(" ");
- tmpstr.append(timestr.str());
- m_XRefTree->setProp("@modified",tmpstr.str());
- }
- void CXRefNode::BuildXRefData(IPropertyTree & pTree,const char* Cluster)
- {
- // DBGLOG("CXRefNode::BuildXRefData");
- if(m_XRefTree.get() == 0)
- throw MakeStringException(-1, "No XRef Dali Tree available");
- Owned<IXRefFilesNode> lost = getLostFiles();
- Owned<IXRefFilesNode> found = getFoundFiles() ;
- Owned<IXRefFilesNode> orphan = getOrphanFiles();
- IPropertyTree* pSubTree = pTree.queryPropTree("Orphans");
- pSubTree->setProp("Cluster",Cluster);
- orphan->Deserialize(*pSubTree);
- pSubTree = pTree.queryPropTree("Lost");
- pSubTree->setProp("Cluster",Cluster);
- lost->Deserialize(*pSubTree);
- pSubTree = pTree.queryPropTree("Found");
- pSubTree->setProp("Cluster",Cluster);
- found->Deserialize(*pSubTree);
- pSubTree = pTree.queryPropTree("Messages");
- pSubTree->setProp("Cluster",Cluster);
- deserializeMessages(*pSubTree);
- pSubTree = pTree.queryPropTree("Directories");
- pSubTree->setProp("Cluster",Cluster);
- deserializeDirectories(*pSubTree);
- Owned<IJlibDateTime> dt = createDateTimeNow();
- setLastModified(*dt);
- setStatus("Generated");
- commit();
- }
- bool CXRefNode::IsChanged()
- {
- if ((m_orphans.get() && m_orphans->IsChanged() == true) ||
- (m_lost.get() && m_lost->IsChanged() == true) ||
- (m_found.get() && m_found->IsChanged() == true) ||
- m_bChanged == true )
- return true;
- return false;
- }
- void CXRefNode::SetChanged(bool bChanged)
- {
- m_bChanged = bChanged;
- }
- void CXRefNode::commit()
- {
-
- CriticalSection(commitMutex);
- if(m_conn == 0)
- return;
- Owned<IXRefFilesNode> lost = getLostFiles();
- Owned<IXRefFilesNode> found = getFoundFiles() ;
- Owned<IXRefFilesNode> orphan = getOrphanFiles();
- lost->Commit();
- found->Commit();
- orphan->Commit();
- m_conn->commit();
- }
- void CXRefNode::progress(const char *text)
- {
- DBGLOG("PROGRESS: %s\n",text);
- setStatus(text);
- commit();
- }
- void CXRefNode::error(const char *text)
- {
- DBGLOG("ERROR: %s\n",text);
- setStatus(text);
- commit();
- }
|