dautdfs.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #include "platform.h"
  2. #include "jlib.hpp"
  3. #include "jmisc.hpp"
  4. #include "dadfs.hpp"
  5. #include "dafdesc.hpp"
  6. #include "dautils.hpp"
  7. #include <cppunit/ui/text/TestRunner.h>
  8. #include <cppunit/extensions/HelperMacros.h>
  9. /*##############################################################################
  10. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. ############################################################################## */
  21. #define DFSUTGROUP "dfsutgroup"
  22. #define DFSUTSCOPE "test::dfsut"
  23. #define DFSUTDIR "test/dfsut"
  24. class TTestDFS : public CPPUNIT_NS::TestFixture
  25. {
  26. CPPUNIT_TEST_SUITE(TTestDFS);
  27. CPPUNIT_TEST(testDFS);
  28. CPPUNIT_TEST_SUITE_END();
  29. INamedGroupStore *dfsgroup;
  30. IDistributedFileDirectory *dfsdir;
  31. void removeGroups()
  32. {
  33. dfsgroup->remove(DFSUTGROUP "1");
  34. dfsgroup->remove(DFSUTGROUP "7");
  35. dfsgroup->remove(DFSUTGROUP "400");
  36. dfsgroup->remove(DFSUTGROUP "400b");
  37. }
  38. void removeFiles()
  39. {
  40. StringArray superfiles;
  41. StringArray files;
  42. Owned<IDistributedFileIterator> iter = dfsdir->getIterator(DFSUTSCOPE "::*", true, UNKNOWN_USER, defaultNonPrivilegedUser);
  43. ForEach(*iter) {
  44. IDistributedFile &file = iter->query();
  45. if (file.querySuperFile())
  46. superfiles.appendUniq(file.queryLogicalName()); // should recurse owning but maybe later
  47. else
  48. files.appendUniq(file.queryLogicalName());
  49. }
  50. iter.clear();
  51. ForEachItemIn(i,superfiles) {
  52. dfsdir->removeEntry(superfiles.item(i), UNKNOWN_USER);
  53. }
  54. ForEachItemIn(j,files) {
  55. dfsdir->removeEntry(files.item(j), UNKNOWN_USER);
  56. }
  57. }
  58. public:
  59. void setUp()
  60. {
  61. dfsgroup = &queryNamedGroupStore();
  62. dfsdir = &queryDistributedFileDirectory();
  63. try {
  64. removeFiles();
  65. removeGroups();
  66. }
  67. catch (IException *e) {
  68. EXCLOG(e,"TTestDFS::setUp");
  69. }
  70. }
  71. void tearDown()
  72. {
  73. try {
  74. //removeFiles();
  75. //removeGroups();
  76. }
  77. catch (IException *e) {
  78. EXCLOG(e,"TTestDFS::tearDown");
  79. }
  80. }
  81. protected:
  82. void testDFS()
  83. {
  84. dfsgroup->add(DFSUTGROUP "1", { "192.168.1.1" }, true);
  85. std::vector<std::string> hosts;
  86. unsigned n;
  87. StringBuffer s;
  88. for (n=0;n<7;n++)
  89. {
  90. s.clear().append("192.168.2.").append(n+1);
  91. hosts.push_back(s.str());
  92. }
  93. dfsgroup->add(DFSUTGROUP "7", hosts, true);
  94. hosts.clear();
  95. for (n=0;n<400;n++)
  96. {
  97. s.clear().append("192.168.").append(n/256+3).append('.').append(n%256+1);
  98. hosts.push_back(s.str());
  99. }
  100. dfsgroup->add(DFSUTGROUP "400", hosts, true);
  101. hosts.clear();
  102. for (n=0;n<400;n++)
  103. {
  104. s.clear().append("192.168.").append(n/256+5).append('.').append(n%256+1);
  105. hosts.push_back(s.str());
  106. }
  107. dfsgroup->add(DFSUTGROUP "400b", hosts, true);
  108. hosts.clear();
  109. Owned<IGroup> grp = dfsgroup->lookup(DFSUTGROUP "400");
  110. CPPUNIT_ASSERT(grp.get()!=NULL);
  111. CPPUNIT_ASSERT(grp->ordinality()==400);
  112. for (n=0;n<grp->ordinality();n++)
  113. {
  114. s.clear().append("192.168.").append(n/256+3).append('.').append(n%256+1);
  115. SocketEndpoint ep(s.str());
  116. CPPUNIT_ASSERT(ep.equals(grp->queryNode(n).endpoint()));
  117. }
  118. SocketEndpointArray epa;
  119. for (n=0;n<grp->ordinality();n++)
  120. {
  121. s.clear().append("192.168.").append(n/256+5).append('.').append(n%256+1);
  122. SocketEndpoint ep(s.str());
  123. epa.append(ep);
  124. }
  125. grp.setown(createIGroup(epa));
  126. CPPUNIT_ASSERT(dfsgroup->find(grp, s.clear()));
  127. CPPUNIT_ASSERT(strcmp(DFSUTGROUP "400b",s.str())==0);
  128. epa.kill();
  129. grp.clear();
  130. for (n=0;n<7;n++)
  131. {
  132. s.clear().append("192.168.7.").append(n+1);
  133. hosts.push_back(s.str());
  134. }
  135. dfsgroup->add(DFSUTGROUP "7b", hosts, true, "/c$/altdir");
  136. hosts.clear();
  137. ClusterPartDiskMapSpec mapping;
  138. // now create a simple one part file
  139. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  140. grp.setown(dfsgroup->lookup(DFSUTGROUP "1"));
  141. fdesc->setDefaultDir("/c$/thordata/" DFSUTDIR);
  142. fdesc->setPartMask("testfile1._$P$_of_$N$");
  143. fdesc->setNumParts(1);
  144. fdesc->addCluster(grp,mapping);
  145. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  146. file->attach(DFSUTSCOPE "::testfile1", UNKNOWN_USER);
  147. file.clear();
  148. file.setown(dfsdir->lookup(DFSUTSCOPE "::testfile1", UNKNOWN_USER, AccessMode::tbdRead, false, false, nullptr, true));
  149. CPPUNIT_ASSERT(file.get()!=NULL);
  150. CPPUNIT_ASSERT(file->numParts()==1);
  151. CPPUNIT_ASSERT(file->numCopies(0)==2);
  152. Owned<IDistributedFilePart> part=file->getPart(0);
  153. CPPUNIT_ASSERT(part.get()!=NULL);
  154. part->getPartName(s.clear());
  155. CPPUNIT_ASSERT(stricmp(s.str(),"testfile1._1_of_1")==0);
  156. SocketEndpoint ep;
  157. ep.set("192.168.1.1");
  158. Owned<INode> node;
  159. RemoteFilename rfn;
  160. unsigned cpi;
  161. StringBuffer t;
  162. for (cpi=0;cpi<2;cpi++) {
  163. node.setown(part->getNode(cpi));
  164. CPPUNIT_ASSERT(node.get()!=NULL);
  165. CPPUNIT_ASSERT(node->endpoint().equals(ep));
  166. part->getPartDirectory(s.clear(),cpi);
  167. t.clear().appendf("/%c$/thordata/test/dfsut",cpi?'d':'c');
  168. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  169. t.insert(0,"//192.168.1.1");
  170. t.append("/testfile1._1_of_1");
  171. part->getFilename(rfn,cpi);
  172. rfn.getRemotePath(s.clear());
  173. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  174. }
  175. // now create a simple multi part file with one extra part (e.g. tlk)
  176. fdesc.setown(createFileDescriptor());
  177. grp.setown(dfsgroup->lookup(DFSUTGROUP "7"));
  178. fdesc->setDefaultDir("/c$/thordata/" DFSUTDIR);
  179. fdesc->setPartMask("testfile2._$P$_of_$N$");
  180. fdesc->setNumParts(8);
  181. fdesc->addCluster(grp,mapping);
  182. file.setown(queryDistributedFileDirectory().createNew(fdesc));
  183. file->attach(DFSUTSCOPE "::testfile2", UNKNOWN_USER);
  184. file.clear();
  185. file.setown(dfsdir->lookup(DFSUTSCOPE "::testfile2", UNKNOWN_USER, AccessMode::tbdRead, false, false, nullptr, true));
  186. CPPUNIT_ASSERT(file.get()!=NULL);
  187. CPPUNIT_ASSERT(file->numParts()==8);
  188. unsigned pi;
  189. StringBuffer tmp;
  190. for (pi=0;pi<8;pi++) {
  191. tmp.clear().appendf("192.168.2.%d",pi%7+1);
  192. SocketEndpoint ep(tmp.str());
  193. tmp.clear().appendf("192.168.2.%d",((pi+1)%7)+1);
  194. SocketEndpoint rep(tmp.str());
  195. CPPUNIT_ASSERT(file->numCopies(pi)==2);
  196. part.setown(file->getPart(pi));
  197. CPPUNIT_ASSERT(part.get()!=NULL);
  198. part->getPartName(s.clear());
  199. t.clear().appendf("testfile2._%d_of_8",pi+1);
  200. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  201. for (cpi=0;cpi<2;cpi++) {
  202. node.setown(part->getNode(cpi));
  203. CPPUNIT_ASSERT(node.get()!=NULL);
  204. CPPUNIT_ASSERT(node->endpoint().equals((cpi&1)?rep:ep));
  205. part->getPartDirectory(s.clear(),cpi);
  206. StringBuffer t;
  207. t.clear().appendf("/%c$/thordata/test/dfsut",cpi?'d':'c');
  208. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  209. t.insert(0,(pi+(cpi&1))%7+1);
  210. t.insert(0,"//192.168.2.");
  211. t.appendf("/testfile2._%d_of_8",pi+1);
  212. part->getFilename(rfn,cpi);
  213. rfn.getRemotePath(s.clear());
  214. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  215. }
  216. }
  217. // now create a multi part file with last part repeated on every node
  218. fdesc.setown(createFileDescriptor());
  219. grp.setown(dfsgroup->lookup(DFSUTGROUP "7"));
  220. fdesc->setDefaultDir("/c$/thordata/" DFSUTDIR);
  221. fdesc->setPartMask("testfile3._$P$_of_$N$");
  222. fdesc->setNumParts(8);
  223. mapping.setRepeatedCopies(7,false);
  224. fdesc->addCluster(grp,mapping);
  225. file.setown(queryDistributedFileDirectory().createNew(fdesc));
  226. file->attach(DFSUTSCOPE "::testfile3", UNKNOWN_USER);
  227. file.clear();
  228. file.setown(dfsdir->lookup(DFSUTSCOPE "::testfile3", UNKNOWN_USER, AccessMode::tbdRead, false, false, nullptr, true));
  229. CPPUNIT_ASSERT(file.get()!=NULL);
  230. CPPUNIT_ASSERT(file->numParts()==8);
  231. for (pi=0;pi<8;pi++) {
  232. unsigned nc = (pi==7)?14:2;
  233. CPPUNIT_ASSERT(file->numCopies(pi)==nc);
  234. part.setown(file->getPart(pi));
  235. CPPUNIT_ASSERT(part.get()!=NULL);
  236. part->getPartName(s.clear());
  237. t.clear().appendf("testfile3._%d_of_8",pi+1);
  238. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  239. for (cpi=0;cpi<nc;cpi++) {
  240. bool isrep = cpi&1;
  241. tmp.clear().appendf("192.168.2.%d",(pi+(cpi/2))%7+1);
  242. SocketEndpoint ep(tmp.str());
  243. tmp.clear().appendf("192.168.2.%d",(pi+(cpi/2)+1)%7+1);
  244. SocketEndpoint rep(tmp.str());
  245. node.setown(part->getNode(cpi));
  246. CPPUNIT_ASSERT(node.get()!=NULL);
  247. CPPUNIT_ASSERT(node->endpoint().equals(isrep?rep:ep));
  248. part->getPartDirectory(s.clear(),cpi);
  249. StringBuffer t;
  250. t.clear().appendf("/%c$/thordata/test/dfsut",isrep?'d':'c');
  251. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  252. StringBuffer eps;
  253. if (isrep)
  254. rep.getUrlStr(eps);
  255. else
  256. ep.getUrlStr(eps);
  257. t.insert(0,eps.str());
  258. t.insert(0,"//");
  259. t.appendf("/testfile3._%d_of_8",pi+1);
  260. part->getFilename(rfn,cpi);
  261. rfn.getRemotePath(s.clear());
  262. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  263. }
  264. }
  265. // now create a multi part file with last part copied on every node
  266. // and also with that part repeated on a different cluster (e.g. like a roxie cluster)
  267. // with a different directory
  268. fdesc.setown(createFileDescriptor());
  269. grp.setown(dfsgroup->lookup(DFSUTGROUP "7"));
  270. fdesc->setDefaultDir("/c$/thordata/" DFSUTDIR);
  271. fdesc->setPartMask("testfile4._$P$_of_$N$");
  272. fdesc->setNumParts(8);
  273. mapping.setRepeatedCopies(7,false);
  274. fdesc->addCluster(grp,mapping);
  275. StringBuffer dir2;
  276. GroupType groupType;
  277. grp.setown(dfsgroup->lookup(DFSUTGROUP "7b", dir2, groupType));
  278. ClusterPartDiskMapSpec mapping2;
  279. mapping2.setDefaultBaseDir(dir2);
  280. mapping2.setRepeatedCopies(7,true);
  281. fdesc->addCluster(grp,mapping2);
  282. file.setown(queryDistributedFileDirectory().createNew(fdesc));
  283. file->attach(DFSUTSCOPE "::testfile4", UNKNOWN_USER);
  284. file.clear();
  285. file.setown(dfsdir->lookup(DFSUTSCOPE "::testfile4", UNKNOWN_USER, AccessMode::tbdRead, false, false, nullptr, true));
  286. CPPUNIT_ASSERT(file.get()!=NULL);
  287. CPPUNIT_ASSERT(file->numParts()==8);
  288. for (pi=0;pi<8;pi++) {
  289. unsigned nc = (pi==7)?28:2;
  290. CPPUNIT_ASSERT(file->numCopies(pi)==nc);
  291. part.setown(file->getPart(pi));
  292. CPPUNIT_ASSERT(part.get()!=NULL);
  293. part->getPartName(s.clear());
  294. t.clear().appendf("testfile4._%d_of_8",pi+1);
  295. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  296. for (unsigned cpi=0;cpi<nc;cpi++) {
  297. unsigned cpin = cpi%14; // normalize for each cluster
  298. bool isrep = cpi&1;
  299. node.setown(part->getNode(cpi));
  300. CPPUNIT_ASSERT(node.get()!=NULL);
  301. SocketEndpoint cmp("192.168.2.0");
  302. bool isclustb = (node->endpoint().ipcompare(cmp)>0);
  303. tmp.clear().appendf("192.168.%d.%d",isclustb?7:2,(pi+(cpin/2))%7+1);
  304. SocketEndpoint ep(tmp.str());
  305. tmp.clear().appendf("192.168.%d.%d",isclustb?7:2,(pi+(cpin/2)+1)%7+1);
  306. SocketEndpoint rep(tmp.str());
  307. CPPUNIT_ASSERT(node->endpoint().equals(isrep?rep:ep));
  308. part->getPartDirectory(s.clear(),cpi);
  309. StringBuffer t;
  310. t.clear().appendf("/%c$/%s/test/dfsut",isrep?'d':'c',isclustb?"altdir":"thordata");
  311. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  312. StringBuffer eps;
  313. if (isrep)
  314. rep.getUrlStr(eps);
  315. else
  316. ep.getUrlStr(eps);
  317. t.insert(0,eps.str());
  318. t.insert(0,"//");
  319. t.appendf("/testfile4._%d_of_8",pi+1);
  320. part->getFilename(rfn,cpi);
  321. rfn.getRemotePath(s.clear());
  322. CPPUNIT_ASSERT(stricmp(s.str(),t.str())==0);
  323. }
  324. }
  325. }
  326. };
  327. CPPUNIT_TEST_SUITE_REGISTRATION(TTestDFS);