XRefFilesNode.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // XRefFilesNode1.cpp: implementation of the CXRefFilesNode class.
  14. //
  15. //////////////////////////////////////////////////////////////////////
  16. #include "XRefFilesNode.hpp"
  17. #include "jlzw.hpp"
  18. #include "dautils.hpp"
  19. //////////////////////////////////////////////////////////////////////
  20. // Construction/Destruction
  21. //////////////////////////////////////////////////////////////////////
  22. CXRefFilesNode::CXRefFilesNode(IPropertyTree& baseNode,const char* cluster,const char *_rootdir)
  23. : m_baseTree(baseNode), rootdir(_rootdir)
  24. {
  25. baseNode.setProp("@Cluster",cluster);
  26. m_bChanged = false;
  27. prefixName.append(cluster);
  28. }
  29. bool CXRefFilesNode::IsChanged()
  30. {
  31. return m_bChanged;
  32. }
  33. void CXRefFilesNode::Commit()
  34. {
  35. if (m_bChanged)
  36. Deserialize(getDataTree());
  37. m_bChanged = false;
  38. }
  39. StringBuffer& CXRefFilesNode::Serialize(StringBuffer& outStr)
  40. {
  41. if (!m_bChanged && _data.length() > 0)
  42. {
  43. outStr.append(_data);
  44. return outStr;
  45. }
  46. _data.clear();
  47. MemoryBuffer buff;
  48. m_baseTree.getPropBin("data",buff);
  49. if (buff.length())
  50. {
  51. outStr.append(buff.length(),buff.toByteArray());
  52. _data.append(outStr);
  53. }
  54. return outStr;
  55. }
  56. void CXRefFilesNode::Deserialize(IPropertyTree& inTree)
  57. {
  58. CleanTree(inTree);
  59. StringBuffer datastr;
  60. toXML(&inTree,datastr);
  61. m_baseTree.setPropBin("data",datastr.length(),(void*)datastr.str());
  62. }
  63. IPropertyTree* CXRefFilesNode::FindNode(const char* NodeName)
  64. {
  65. StringBuffer xpath;
  66. xpath.clear().appendf("File/[Partmask=\"%s\"]", NodeName);
  67. StringBuffer tmpbuf;
  68. return getDataTree().getBranch(xpath.str());
  69. }
  70. IPropertyTree& CXRefFilesNode::getDataTree()
  71. {
  72. if (m_DataTree.get() == 0)
  73. {
  74. StringBuffer dataStr;
  75. Serialize(dataStr);
  76. m_DataTree.setown(createPTreeFromXMLString(dataStr.str()));
  77. }
  78. return *m_DataTree.get();
  79. }
  80. static bool checkPartsInCluster(const char *title,const char *clustername, IPropertyTree *subBranch, StringBuffer &errstr,bool exists)
  81. {
  82. Owned<IGroup> group = queryNamedGroupStore().lookup(clustername);
  83. if (!group) {
  84. ERRLOG("%s cluster not found",clustername);
  85. errstr.appendf("ERROR: %s cluster not found",clustername);
  86. return false;
  87. }
  88. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  89. unsigned i;
  90. StringBuffer xpath;
  91. unsigned n = group->ordinality();
  92. ForEach(*partItr) {
  93. IPropertyTree& part = partItr->query();
  94. unsigned pn = part.getPropInt("Num");
  95. for (int rep=0;rep<2;rep++) {
  96. i = 0;
  97. for (;;) {
  98. i++;
  99. xpath.clear().appendf(rep?"RNode[%d]":"Node[%d]",i);
  100. if (!part.hasProp(xpath.str()))
  101. break;
  102. SocketEndpoint ep(part.queryProp(xpath.str()));
  103. ep.port = 0;
  104. rank_t gn = group->rank(ep);
  105. if (group->rank(ep)==RANK_NULL) {
  106. StringBuffer eps;
  107. ERRLOG("%s %s Part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
  108. errstr.appendf("ERROR: %s %s part %d on %s is not in cluster %s",title,rep?"Replicate":"Primary",pn,ep.getUrlStr(eps).str(),clustername);
  109. return false;
  110. }
  111. if (exists) {
  112. if ((pn-1+rep)%n==gn) {
  113. ERRLOG("Logical file for %s exists (part not orphaned?)",title);
  114. errstr.appendf("Logical file for %s exists (part not orphaned?)",title);
  115. return false;
  116. }
  117. }
  118. }
  119. }
  120. }
  121. return true;
  122. }
  123. bool CXRefFilesNode::RemovePhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  124. {
  125. size32_t startlen = errstr.length();
  126. IPropertyTree* subBranch = FindNode(Partmask);
  127. if (!subBranch) {
  128. ERRLOG("%s branch not found",Partmask);
  129. errstr.appendf("ERROR: %s branch not found",Partmask);
  130. return false;
  131. }
  132. // sanity check file doesn't (now) exist
  133. bool exists = false;
  134. StringBuffer lfn;
  135. if (LogicalNameFromMask(Partmask,lfn)) {
  136. if (queryDistributedFileDirectory().exists(lfn.str(),udesc,true))
  137. exists = true;
  138. }
  139. if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,exists))
  140. return false;
  141. RemoteFilenameArray files;
  142. int numparts = subBranch->getPropInt("Numparts");
  143. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  144. ForEach(*partItr)
  145. {
  146. IPropertyTree& part = partItr->query();
  147. StringBuffer remoteFile;
  148. expandMask(remoteFile, Partmask, part.getPropInt("Num")-1, numparts);
  149. /////////////////////////////////
  150. StringBuffer xpath;
  151. unsigned i = 0;
  152. for (;;) {
  153. i++;
  154. xpath.clear().appendf("Node[%d]",i);
  155. if (!part.hasProp(xpath.str()))
  156. break;
  157. SocketEndpoint ip(part.queryProp(xpath.str()));
  158. RemoteFilename rmtFile;
  159. rmtFile.setPath(ip,remoteFile.str());
  160. files.append(rmtFile);
  161. }
  162. i = 0;
  163. for (;;) {
  164. i++;
  165. xpath.clear().appendf("RNode[%d]",i);
  166. if (!part.hasProp(xpath.str()))
  167. break;
  168. SocketEndpoint ip(part.queryProp(xpath.str()));
  169. RemoteFilename rmtFile;
  170. StringBuffer replicateFile;
  171. if (setReplicateDir(remoteFile.str(),replicateFile))
  172. rmtFile.setPath(ip,replicateFile.str()); // old semantics
  173. else
  174. rmtFile.setPath(ip,remoteFile.str());
  175. files.append(rmtFile);
  176. }
  177. }
  178. CriticalSection crit;
  179. class casyncfor: public CAsyncFor
  180. {
  181. RemoteFilenameArray &files;
  182. StringBuffer &errstr;
  183. CriticalSection &crit;
  184. public:
  185. casyncfor(RemoteFilenameArray &_files, StringBuffer &_errstr, CriticalSection &_crit)
  186. : files(_files), errstr(_errstr), crit(_crit)
  187. {
  188. }
  189. void Do(unsigned idx)
  190. {
  191. try{
  192. Owned<IFile> _remoteFile = createIFile(files.item(idx));
  193. DBGLOG("Removing physical part at %s",_remoteFile->queryFilename());
  194. if (_remoteFile->exists()) {
  195. if (!_remoteFile->remove()) {
  196. StringBuffer errname;
  197. files.item(idx).getRemotePath(errname);
  198. ERRLOG("Could not delete file %s",errname.str());
  199. CriticalBlock block(crit);
  200. if (errstr.length())
  201. errstr.append('\n');
  202. errstr.appendf("ERROR: Could not delete file %s",errname.str());
  203. }
  204. }
  205. }
  206. catch(IException* e)
  207. {
  208. StringBuffer s(" deleting logical part ");
  209. files.item(idx).getRemotePath(s);
  210. EXCLOG(e,s.str());
  211. CriticalBlock block(crit);
  212. if (errstr.length())
  213. errstr.append('\n');
  214. errstr.append("ERROR: ");
  215. e->errorMessage(errstr);
  216. errstr.append(s);
  217. e->Release();
  218. }
  219. catch(...)
  220. {
  221. StringBuffer errname;
  222. files.item(idx).getRemotePath(errname);
  223. DBGLOG("Unknown Exception caught while deleting logical part %s",errname.str());
  224. CriticalBlock block(crit);
  225. if (errstr.length())
  226. errstr.append('\n');
  227. errstr.appendf("ERROR: Unknown Exception caught while deleting logical part %s",errname.str());
  228. }
  229. }
  230. } afor(files,errstr,crit);
  231. afor.For(files.ordinality(),10,false,true);
  232. if (!RemoveTreeNode(Partmask))
  233. {
  234. ERRLOG("Error Removing XRef Branch %s",Partmask);
  235. return false;
  236. }
  237. m_bChanged = true;
  238. return errstr.length()==startlen;
  239. }
  240. bool CXRefFilesNode::RemoveLogical(const char* LogicalName,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  241. {
  242. StringBuffer xpath;
  243. xpath.clear().appendf("File/[Name=\"%s\"]", LogicalName);
  244. StringBuffer tmpbuf;
  245. IPropertyTree* pLogicalFileNode = getDataTree().getBranch(xpath.str());
  246. if (!pLogicalFileNode) {
  247. ERRLOG("Branch %s not found",xpath.str());
  248. errstr.appendf("Branch %s not found",xpath.str());
  249. return false;
  250. }
  251. if (!checkPartsInCluster(LogicalName,clustername,pLogicalFileNode,errstr,false))
  252. return false;
  253. if (queryDistributedFileDirectory().existsPhysical(LogicalName,udesc)) {
  254. ERRLOG("Logical file %s all parts exist (not lost?))",LogicalName);
  255. errstr.appendf("Logical file %s all parts exist (not lost?))",LogicalName);
  256. return false;
  257. }
  258. if (!getDataTree().removeTree(pLogicalFileNode)) {
  259. ERRLOG("Removing XRef Branch %s", xpath.str());
  260. errstr.appendf("Removing XRef Branch %s", xpath.str());
  261. return false;
  262. }
  263. m_bChanged = true;
  264. queryDistributedFileDirectory().removeEntry(LogicalName,udesc);
  265. return true;
  266. }
  267. bool CXRefFilesNode::AttachPhysical(const char *Partmask,IUserDescriptor* udesc, const char *clustername, StringBuffer &errstr)
  268. {
  269. IPropertyTree* subBranch = FindNode(Partmask);
  270. if (!subBranch) {
  271. ERRLOG("%s node not found",Partmask);
  272. errstr.appendf("ERROR: %s node not found",Partmask);
  273. return false;
  274. }
  275. if (!checkPartsInCluster(Partmask,clustername,subBranch,errstr,false))
  276. return false;
  277. StringBuffer logicalName;
  278. if (!LogicalNameFromMask(Partmask,logicalName)) {
  279. ERRLOG("%s - could not attach",Partmask);
  280. errstr.appendf("ERROR: %s - could not attach",Partmask);
  281. return false;
  282. }
  283. if (queryDistributedFileDirectory().exists(logicalName.str(),udesc))
  284. {
  285. ERRLOG("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
  286. errstr.appendf("Logical File %s already Exists. Can not reattach to Dali",logicalName.str());
  287. return false;
  288. }
  289. StringBuffer drive,path,tail,ext;
  290. splitFilename(Partmask, &drive, &path, &tail, &ext);
  291. //set directory info
  292. StringBuffer dir;
  293. dir.append(drive.str());
  294. dir.append(path.str());
  295. Owned<IFileDescriptor> fileDesc = createFileDescriptor();
  296. fileDesc->setDefaultDir(dir.str());
  297. //use the logical name as the title....
  298. fileDesc->setTraceName(logicalName.str());
  299. IPropertyTree & attr = fileDesc->queryProperties();
  300. //attr.setProp("@size",subBranch->queryProp("Size")); we don't know size (this value isn't right!)
  301. unsigned numparts = subBranch->getPropInt("Numparts");
  302. bool isCompressed = false;
  303. bool first = true;
  304. offset_t totalSize = 0;
  305. Owned<IPropertyTreeIterator> partItr = subBranch->getElements("Part");
  306. for (partItr->first(); partItr->isValid(); partItr->next())
  307. {
  308. IPropertyTree& part = partItr->query();
  309. //get the full file path
  310. StringBuffer remoteFilePath;
  311. expandMask(remoteFilePath, Partmask, part.getPropInt("Num")-1, numparts);
  312. StringBuffer _drive,_path,_tail,_ext,_filename;
  313. splitFilename(remoteFilePath.str(), &_drive, &_path, &_tail, &_ext);
  314. _filename.append(_tail.str());
  315. _filename.append(_ext.str());
  316. const char* _node = part.queryProp("Node[1]");
  317. if (!_node||!*_node)
  318. _node = part.queryProp("RNode[1]");
  319. if (!*_node||!*_node) {
  320. ERRLOG("%s - could not attach (missing part info)",Partmask);
  321. errstr.appendf("ERROR: %s - could not attach (missing part info)",Partmask);
  322. return false;
  323. }
  324. Owned<INode> node = createINode(_node);
  325. DBGLOG("Setting number %d for Node %s and name %s",part.getPropInt("Num")-1,_node,_filename.str());
  326. //Num is 0 based...
  327. unsigned partNo = part.getPropInt("Num")-1;
  328. RemoteFilename rfn;
  329. rfn.setPath(node->endpoint(), remoteFilePath);
  330. Owned<IFile> iFile = createIFile(rfn);
  331. offset_t physicalSize = iFile->size();
  332. bool partCompressed = isCompressedFile(iFile);
  333. if (first)
  334. {
  335. first = false;
  336. isCompressed = partCompressed;
  337. }
  338. else if (isCompressed != partCompressed)
  339. {
  340. VStringBuffer err("%s - could not attach (mixed compressed/non-compressed physical parts detected)", Partmask);
  341. ERRLOG("%s", err.str());
  342. errstr.append(err.str());
  343. return false;
  344. }
  345. Owned<IPropertyTree> partProps = createPTree("Part");
  346. if (isCompressed)
  347. partProps->setPropInt64("@compressedSize", physicalSize);
  348. else
  349. partProps->setPropInt64("@size", physicalSize);
  350. totalSize += physicalSize;
  351. fileDesc->setPart(partNo, node,_filename.str(), partProps);
  352. }
  353. IPropertyTree &props = fileDesc->queryProperties();
  354. if (isCompressed)
  355. {
  356. props.setPropBool("@blockCompressed", true);
  357. props.setPropInt64("@compressedSize", totalSize);
  358. }
  359. else
  360. props.setPropInt64("@size", totalSize);
  361. Owned<IDistributedFile> dFile = queryDistributedFileDirectory().createNew(fileDesc);
  362. dFile->attach(logicalName.str(),udesc);
  363. if (!RemoveTreeNode(Partmask)) {
  364. ERRLOG("Removing XRef Branch %s",Partmask);
  365. errstr.appendf("ERROR: Removing XRef Branch %s",Partmask);
  366. return false;
  367. }
  368. m_bChanged = true;
  369. return true;
  370. }
  371. void CXRefFilesNode::DirectoryFromMask(const char* Partmask,StringBuffer& directory)
  372. {
  373. if(*Partmask == 0)
  374. return;
  375. const char *in = Partmask;
  376. int counter = 0;
  377. while (*in)
  378. {
  379. if (*in == '.')
  380. break;
  381. directory.append(*in);
  382. }
  383. }
  384. bool CXRefFilesNode::LogicalNameFromMask(const char* fname,StringBuffer& logicalName)
  385. {
  386. CDfsLogicalFileName lfn;
  387. if (!lfn.setFromMask(fname,rootdir))
  388. return false;
  389. logicalName.append(lfn.get());
  390. return true;
  391. }
  392. bool CXRefFilesNode::RemoveTreeNode(const char* NodeName)
  393. {
  394. IPropertyTree* subBranch = FindNode(NodeName);
  395. if (!subBranch)
  396. return false;
  397. StringBuffer tmpbuf;
  398. return getDataTree().removeTree(subBranch);
  399. }
  400. bool CXRefFilesNode::RemoveRemoteFile(const char* fileName, const char* ipAddress)
  401. {
  402. SocketEndpoint ip;
  403. ip.set(ipAddress);
  404. RemoteFilename rmtFile;
  405. rmtFile.setPath(ip,fileName); // filename shhould be full windows or unix path
  406. Owned<IFile> _remoteFile = createIFile(rmtFile);
  407. if (_remoteFile->exists())
  408. return _remoteFile->remove();
  409. return false;
  410. }
  411. ////////////////////////////////////////////////////////////////////////////////////
  412. //
  413. //
  414. ////////////////////////////////////////////////////////////////////////////////////
  415. CXRefOrphanFilesNode::CXRefOrphanFilesNode(IPropertyTree& baseNode,const char* cluster,const char* rootdir)
  416. : CXRefFilesNode(baseNode,cluster,rootdir)
  417. {
  418. }
  419. void CXRefOrphanFilesNode::CleanTree(IPropertyTree& inTree)
  420. {
  421. Owned<IPropertyTreeIterator> Itr = inTree.getElements("*");
  422. Itr->first();
  423. int partcount = 0;
  424. while(Itr->isValid())
  425. {
  426. IPropertyTree& node = Itr->query();
  427. if(strcmp(node.queryName(),"Part") == 0)
  428. {
  429. partcount++;
  430. }
  431. else if(node.hasChildren())
  432. {
  433. CleanTree(node);
  434. }
  435. Itr->next();
  436. }
  437. if(partcount != 0)
  438. inTree.setPropInt("Partsfound",partcount);
  439. }