123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- // XRefFilesNode1.cpp: implementation of the CXRefFilesNode class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "XRefFilesNode.hpp"
- #include "jlzw.hpp"
- #include "dautils.hpp"
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- CXRefFilesNode::CXRefFilesNode(IPropertyTree& baseNode,const char* cluster,const char *_rootdir)
- : m_baseTree(baseNode), rootdir(_rootdir)
- {
- baseNode.setProp("@Cluster",cluster);
- m_bChanged = false;
- prefixName.append(cluster);
- }
- bool CXRefFilesNode::IsChanged()
- {
- return m_bChanged;
- }
- void CXRefFilesNode::Commit()
- {
- if (m_bChanged)
- Deserialize(getDataTree());
- m_bChanged = false;
- }
- StringBuffer& CXRefFilesNode::Serialize(StringBuffer& outStr)
- {
- if (!m_bChanged && _data.length() > 0)
- {
- outStr.append(_data);
- return outStr;
- }
- _data.clear();
- MemoryBuffer buff;
- m_baseTree.getPropBin("data",buff);
- if (buff.length())
- {
- outStr.append(buff.length(),buff.toByteArray());
- _data.append(outStr);
- }
- return outStr;
- }
- void CXRefFilesNode::Deserialize(IPropertyTree& inTree)
- {
- CleanTree(inTree);
- StringBuffer datastr;
- toXML(&inTree,datastr);
- m_baseTree.setPropBin("data",datastr.length(),(void*)datastr.str());
- }
- IPropertyTree* CXRefFilesNode::FindNode(const char* NodeName)
- {
- StringBuffer xpath;
- xpath.clear().appendf("File/[Partmask=\"%s\"]", NodeName);
- StringBuffer tmpbuf;
- return getDataTree().getBranch(xpath.str());
- }
-
- IPropertyTree& CXRefFilesNode::getDataTree()
- {
- if (m_DataTree.get() == 0)
- {
- StringBuffer dataStr;
- Serialize(dataStr);
- m_DataTree.setown(createPTreeFromXMLString(dataStr.str()));
- }
- return *m_DataTree.get();
- }
- static bool checkPartsInCluster(const char *title,const char *clustername, IPropertyTree *subBranch, StringBuffer &errstr,bool exists)
- {
- Owned<IGroup> group = queryNamedGroupStore().lookup(clustername);
- if (!group) {
- ERRLOG("%s cluster not found",clustername);
- errstr.appendf("ERROR: %s cluster not found",clustername);
- return false;
- }
- Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
- unsigned i;
- StringBuffer xpath;
- unsigned n = group->ordinality();
- ForEach(*partItr) {
- IPropertyTree& part = partItr->query();
- unsigned pn = part.getPropInt("Num");
- for (int rep=0;rep<2;rep++) {
- i = 0;
- for (;;) {
- i++;
- xpath.clear().appendf(rep?"RNode[%d]":"Node[%d]",i);
- if (!part.hasProp(xpath.str()))
- break;
- SocketEndpoint ep(part.queryProp(xpath.str()));
- ep.port = 0;
- rank_t gn = group->rank(ep);
- if (group->rank(ep)==RANK_NULL) {
- StringBuffer eps;
- ERRLOG("%s %s Part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
- errstr.appendf("ERROR: %s %s part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
- return false;
- }
- if (exists) {
- if ((pn-1+rep)%n==gn) {
- ERRLOG("Logical file for %s exists (part not orphaned?)",title);
- errstr.appendf("Logical file for %s exists (part not orphaned?)",title);
- return false;
- }
- }
- }
- }
- }
- return true;
- }
- bool CXRefFilesNode::RemovePhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
- {
- size32_t startlen = errstr.length();
- IPropertyTree* subBranch = FindNode(Partmask);
- if (!subBranch) {
- ERRLOG("%s branch not found",Partmask);
- errstr.appendf("ERROR: %s branch not found",Partmask);
- return false;
- }
- // sanity check file doesn't (now) exist
- bool exists = false;
- StringBuffer lfn;
- if (LogicalNameFromMask(Partmask,lfn)) {
- if (queryDistributedFileDirectory().exists(lfn.str(),udesc,true))
- exists = true;
- }
- if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,exists))
- return false;
- RemoteFilenameArray files;
- int numparts = subBranch->getPropInt("Numparts");
- Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
- ForEach(*partItr)
- {
- IPropertyTree& part = partItr->query();
- StringBuffer remoteFile;
- expandMask(remoteFile, Partmask, part.getPropInt("Num")-1, numparts);
- /////////////////////////////////
- StringBuffer xpath;
- unsigned i = 0;
- for (;;) {
- i++;
- xpath.clear().appendf("Node[%d]",i);
- if (!part.hasProp(xpath.str()))
- break;
- SocketEndpoint ip(part.queryProp(xpath.str()));
- RemoteFilename rmtFile;
- rmtFile.setPath(ip,remoteFile.str());
- files.append(rmtFile);
- }
- i = 0;
- for (;;) {
- i++;
- xpath.clear().appendf("RNode[%d]",i);
- if (!part.hasProp(xpath.str()))
- break;
- SocketEndpoint ip(part.queryProp(xpath.str()));
- RemoteFilename rmtFile;
- StringBuffer replicateFile;
- if (setReplicateDir(remoteFile.str(),replicateFile))
- rmtFile.setPath(ip,replicateFile.str()); // old semantics
- else
- rmtFile.setPath(ip,remoteFile.str());
- files.append(rmtFile);
- }
- }
-
- CriticalSection crit;
- class casyncfor: public CAsyncFor
- {
- RemoteFilenameArray &files;
- StringBuffer &errstr;
- CriticalSection &crit;
- public:
- casyncfor(RemoteFilenameArray &_files, StringBuffer &_errstr, CriticalSection &_crit)
- : files(_files), errstr(_errstr), crit(_crit)
- {
- }
- void Do(unsigned idx)
- {
- try{
- Owned<IFile> _remoteFile = createIFile(files.item(idx));
- DBGLOG("Removing physical part at %s",_remoteFile->queryFilename());
- if (_remoteFile->exists()) {
- if (!_remoteFile->remove()) {
- StringBuffer errname;
- files.item(idx).getRemotePath(errname);
- ERRLOG("Could not delete file %s",errname.str());
- CriticalBlock block(crit);
- if (errstr.length())
- errstr.append('\n');
- errstr.appendf("ERROR: Could not delete file %s",errname.str());
- }
- }
- }
- catch(IException* e)
- {
- StringBuffer s(" deleting logical part ");
- files.item(idx).getRemotePath(s);
- EXCLOG(e,s.str());
- CriticalBlock block(crit);
- if (errstr.length())
- errstr.append('\n');
- errstr.append("ERROR: ");
- e->errorMessage(errstr);
- errstr.append(s);
- e->Release();
- }
- catch(...)
- {
- StringBuffer errname;
- files.item(idx).getRemotePath(errname);
- DBGLOG("Unknown Exception caught while deleting logical part %s",errname.str());
- CriticalBlock block(crit);
- if (errstr.length())
- errstr.append('\n');
- errstr.appendf("ERROR: Unknown Exception caught while deleting logical part %s",errname.str());
- }
- }
- } afor(files,errstr,crit);
- afor.For(files.ordinality(),10,false,true);
- if (!RemoveTreeNode(Partmask))
- {
- ERRLOG("Error Removing XRef Branch %s",Partmask);
- return false;
- }
- m_bChanged = true;
- return errstr.length()==startlen;
- }
- bool CXRefFilesNode::RemoveLogical(const char* LogicalName,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
- {
- StringBuffer xpath;
- xpath.clear().appendf("File/[Name=\"%s\"]", LogicalName);
- StringBuffer tmpbuf;
-
- IPropertyTree* pLogicalFileNode = getDataTree().getBranch(xpath.str());
- if (!pLogicalFileNode) {
- ERRLOG("Branch %s not found",xpath.str());
- errstr.appendf("Branch %s not found",xpath.str());
- return false;
- }
- if (!checkPartsInCluster(LogicalName,clustername,pLogicalFileNode,errstr,false))
- return false;
- if (queryDistributedFileDirectory().existsPhysical(LogicalName,udesc)) {
- ERRLOG("Logical file %s all parts exist (not lost?))",LogicalName);
- errstr.appendf("Logical file %s all parts exist (not lost?))",LogicalName);
- return false;
- }
- if (!getDataTree().removeTree(pLogicalFileNode)) {
- ERRLOG("Removing XRef Branch %s", xpath.str());
- errstr.appendf("Removing XRef Branch %s", xpath.str());
- return false;
- }
- m_bChanged = true;
- queryDistributedFileDirectory().removeEntry(LogicalName,udesc);
- return true;
- }
- bool CXRefFilesNode::AttachPhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
- {
- IPropertyTree* subBranch = FindNode(Partmask);
- if (!subBranch) {
- ERRLOG("%s node not found",Partmask);
- errstr.appendf("ERROR: %s node not found",Partmask);
- return false;
- }
- if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,false))
- return false;
- StringBuffer logicalName;
- if (!LogicalNameFromMask(Partmask,logicalName)) {
- ERRLOG("%s - could not attach",Partmask);
- errstr.appendf("ERROR: %s - could not attach",Partmask);
- return false;
- }
- if (queryDistributedFileDirectory().exists(logicalName.str(),udesc))
- {
- ERRLOG("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
- errstr.appendf("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
- return false;
- }
- StringBuffer drive,path,tail,ext;
- splitFilename(Partmask, &drive, &path, &tail, &ext);
- //set directory info
- StringBuffer dir;
- dir.append(drive.str());
- dir.append(path.str());
- Owned<IFileDescriptor> fileDesc = createFileDescriptor();
- fileDesc->setDefaultDir(dir.str());
- //use the logical name as the title....
- fileDesc->setTraceName(logicalName.str());
- IPropertyTree & attr = fileDesc->queryProperties();
- //attr.setProp("@size",subBranch->queryProp("Size")); we don't know size (this value isn't right!)
- unsigned numparts = subBranch->getPropInt("Numparts");
- bool isCompressed = false;
- bool first = true;
- offset_t totalSize = 0;
- Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
- for (partItr->first(); partItr->isValid(); partItr->next())
- {
- IPropertyTree& part = partItr->query();
- //get the full file path
- StringBuffer remoteFilePath;
- expandMask(remoteFilePath, Partmask, part.getPropInt("Num")-1, numparts);
- StringBuffer _drive,_path,_tail,_ext,_filename;
- splitFilename(remoteFilePath.str(), &_drive, &_path, &_tail, &_ext);
- _filename.append(_tail.str());
- _filename.append(_ext.str());
- const char* _node = part.queryProp("Node[1]");
- if (!_node||!*_node)
- _node = part.queryProp("RNode[1]");
- if (!*_node||!*_node) {
- ERRLOG("%s - could not attach (missing part info)",Partmask);
- errstr.appendf("ERROR: %s - could not attach (missing part info)",Partmask);
- return false;
- }
- Owned<INode> node = createINode(_node);
- DBGLOG("Setting number %d for Node %s and name %s",part.getPropInt("Num")-1,_node,_filename.str());
- //Num is 0 based...
- unsigned partNo = part.getPropInt("Num")-1;
- RemoteFilename rfn;
- rfn.setPath(node->endpoint(), remoteFilePath);
- Owned<IFile> iFile = createIFile(rfn);
- offset_t physicalSize = iFile->size();
- bool partCompressed = isCompressedFile(iFile);
- if (first)
- {
- first = false;
- isCompressed = partCompressed;
- }
- else if (isCompressed != partCompressed)
- {
- VStringBuffer err("%s - could not attach (mixed compressed/non-compressed physical parts detected)", Partmask);
- ERRLOG("%s", err.str());
- errstr.append(err.str());
- return false;
- }
- Owned<IPropertyTree> partProps = createPTree("Part");
- if (isCompressed)
- partProps->setPropInt64("@compressedSize", physicalSize);
- else
- partProps->setPropInt64("@size", physicalSize);
- totalSize += physicalSize;
- fileDesc->setPart(partNo, node,_filename.str(), partProps);
- }
- IPropertyTree &props = fileDesc->queryProperties();
- if (isCompressed)
- {
- props.setPropBool("@blockCompressed", true);
- props.setPropInt64("@compressedSize", totalSize);
- }
- else
- props.setPropInt64("@size", totalSize);
- Owned<IDistributedFile> dFile = queryDistributedFileDirectory().createNew(fileDesc);
- dFile->attach(logicalName.str(),udesc);
- if (!RemoveTreeNode(Partmask)) {
- ERRLOG("Removing XRef Branch %s",Partmask);
- errstr.appendf("ERROR: Removing XRef Branch %s",Partmask);
- return false;
- }
- m_bChanged = true;
- return true;
- }
- void CXRefFilesNode::DirectoryFromMask(const char* Partmask,StringBuffer& directory)
- {
- if(*Partmask == 0)
- return;
- const char *in = Partmask;
- int counter = 0;
- while (*in)
- {
- if (*in == '.')
- break;
- directory.append(*in);
- }
- }
- bool CXRefFilesNode::LogicalNameFromMask(const char* fname,StringBuffer& logicalName)
- {
- CDfsLogicalFileName lfn;
- if (!lfn.setFromMask(fname,rootdir))
- return false;
- logicalName.append(lfn.get());
- return true;
- }
- bool CXRefFilesNode::RemoveTreeNode(const char* NodeName)
- {
- IPropertyTree* subBranch = FindNode(NodeName);
- if (!subBranch)
- return false;
- StringBuffer tmpbuf;
- return getDataTree().removeTree(subBranch);
- }
- bool CXRefFilesNode::RemoveRemoteFile(const char* fileName, const char* ipAddress)
- {
- SocketEndpoint ip;
- ip.set(ipAddress);
- RemoteFilename rmtFile;
- rmtFile.setPath(ip,fileName); // filename shhould be full windows or unix path
-
- Owned<IFile> _remoteFile = createIFile(rmtFile);
- if (_remoteFile->exists())
- return _remoteFile->remove();
- return false;
- }
- ////////////////////////////////////////////////////////////////////////////////////
- //
- //
- ////////////////////////////////////////////////////////////////////////////////////
- CXRefOrphanFilesNode::CXRefOrphanFilesNode(IPropertyTree& baseNode,const char* cluster,const char* rootdir)
- : CXRefFilesNode(baseNode,cluster,rootdir)
- {
- }
- void CXRefOrphanFilesNode::CleanTree(IPropertyTree& inTree)
- {
- Owned<IPropertyTreeIterator> Itr = inTree.getElements("*");
- Itr->first();
- int partcount = 0;
- while(Itr->isValid())
- {
- IPropertyTree& node = Itr->query();
- if(strcmp(node.queryName(),"Part") == 0)
- {
- partcount++;
- }
- else if(node.hasChildren())
- {
- CleanTree(node);
- }
- Itr->next();
- }
- if(partcount != 0)
- inTree.setPropInt("Partsfound",partcount);
- }
|