dalitests.cpp 84 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. /*
  14. * Dali Quick Regression Suite: Tests Dali functionality on a programmatic way.
  15. *
  16. * Add as much as possible here to avoid having to run the Hthor/Thor regressions
  17. * all the time for Dali tests, since most of it can be tested quickly from here.
  18. */
  19. #ifdef _USE_CPPUNIT
  20. #include "mpbase.hpp"
  21. #include "mpcomm.hpp"
  22. #include "daclient.hpp"
  23. #include "dadfs.hpp"
  24. #include "dafdesc.hpp"
  25. #include "dasds.hpp"
  26. #include "danqs.hpp"
  27. #include "dautils.hpp"
  28. #include "unittests.hpp"
  29. //#define COMPAT
  30. // ======================================================================= Support Functions / Classes
  31. static __int64 subchangetotal;
  32. static unsigned subchangenum;
  33. static CriticalSection subchangesect;
  34. static IRemoteConnection *Rconn;
  35. static IDistributedFileDirectory & dir = queryDistributedFileDirectory();
  36. static IUserDescriptor *user = createUserDescriptor();
  37. static unsigned initCounter = 0; // counter for initialiser
  38. // Declared in dadfs.cpp *only* when CPPUNIT is active
  39. extern void removeLogical(const char *fname, IUserDescriptor *user);
  40. void init() {
  41. // Only initialise on first pass
  42. if (initCounter != 0)
  43. return;
  44. InitModuleObjects();
  45. user->set("user", "passwd");
  46. // Connect to local Dali
  47. SocketEndpoint ep;
  48. ep.set(".", 7070);
  49. SocketEndpointArray epa;
  50. epa.append(ep);
  51. Owned<IGroup> group = createIGroup(epa);
  52. initClientProcess(group, DCR_Other);
  53. initCounter++;
  54. }
  55. void destroy() {
  56. // Only destroy on last pass
  57. if (initCounter != 0)
  58. return;
  59. // Cleanup
  60. releaseAtoms();
  61. closedownClientProcess();
  62. setNodeCaching(false);
  63. initCounter--;
  64. }
  65. class CCSub : public CInterface, implements ISDSConnectionSubscription, implements ISDSSubscription
  66. {
  67. unsigned n;
  68. unsigned &count;
  69. public:
  70. IMPLEMENT_IINTERFACE;
  71. CCSub(unsigned _n,unsigned &_count)
  72. : count(_count)
  73. {
  74. n = _n;
  75. }
  76. virtual void notify()
  77. {
  78. CriticalBlock block(subchangesect);
  79. subchangetotal += n;
  80. subchangenum++;
  81. count++;
  82. }
  83. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  84. {
  85. CriticalBlock block(subchangesect);
  86. subchangetotal += n;
  87. subchangenum++;
  88. subchangetotal += (unsigned)flags;
  89. subchangetotal += crc32(xpath,strlen(xpath),0);
  90. if (valueLen) {
  91. subchangetotal += crc32((const char *)valueData,valueLen,0);
  92. }
  93. count++;
  94. }
  95. };
  96. class CChange : public Thread
  97. {
  98. Owned<IRemoteConnection> conn;
  99. Owned<CCSub> sub;
  100. StringAttr path;
  101. SubscriptionId id[10];
  102. unsigned n;
  103. unsigned count;
  104. public:
  105. Semaphore stopsem;
  106. CChange(unsigned _n)
  107. {
  108. n = _n;
  109. StringBuffer s("/DAREGRESS/CONSUB");
  110. s.append(n+1);
  111. path.set(s.str());
  112. conn.setown(querySDS().connect(path, myProcessSession(), RTM_CREATE|RTM_DELETE_ON_DISCONNECT, 1000000));
  113. unsigned i;
  114. for (i=0;i<5;i++)
  115. id[i] = conn->subscribe(*new CCSub(n*1000+i,count));
  116. s.append("/testprop");
  117. for (;i<10;i++)
  118. id[i] = querySDS().subscribe(s.str(),*new CCSub(n*1000+i,count),false,true);
  119. count = 0;
  120. start();
  121. }
  122. virtual int run()
  123. {
  124. unsigned i;
  125. for (i = 0;i<10; i++) {
  126. conn->queryRoot()->setPropInt("testprop", (i*17+n*21)%100);
  127. conn->commit();
  128. for (unsigned j=0;j<1000;j++) {
  129. {
  130. CriticalBlock block(subchangesect);
  131. if (count>=(i+1)*10)
  132. break;
  133. }
  134. Sleep(10);
  135. }
  136. }
  137. stopsem.wait();
  138. for (i=0;i<10;i++)
  139. conn->unsubscribe(id[i]);
  140. return 0;
  141. }
  142. };
  143. interface IChecker
  144. {
  145. virtual void title(unsigned n,const char *s)=0;
  146. virtual void add(const char *s,__int64 v)=0;
  147. virtual void add(const char *s,const char* v)=0;
  148. virtual void add(unsigned n,const char *s,__int64 v)=0;
  149. virtual void add(unsigned n,const char *s,const char* v)=0;
  150. virtual void error(const char *txt)=0;
  151. };
  152. void checkFilePart(IChecker *checker,IDistributedFilePart *part,bool blocked)
  153. {
  154. StringBuffer tmp;
  155. checker->add("getPartIndex",part->getPartIndex());
  156. unsigned n = part->numCopies();
  157. checker->add("numCopies",part->numCopies());
  158. checker->add("maxCopies",n);
  159. RemoteFilename rfn;
  160. for (unsigned copy=0;copy<n;copy++) {
  161. INode *node = part->queryNode(copy);
  162. if (node)
  163. checker->add(copy,"queryNode",node->endpoint().getUrlStr(tmp.clear()).str());
  164. else
  165. checker->error("missing node");
  166. checker->add(copy,"getFilename",part->getFilename(rfn,copy).getRemotePath(tmp.clear()).str());
  167. }
  168. checker->add("getPartName",part->getPartName(tmp.clear()).str());
  169. #ifndef COMPAT
  170. checker->add("getPartDirectory",part->getPartDirectory(tmp.clear()).str());
  171. #endif
  172. checker->add("queryProperties()",toXML(&part->queryAttributes(),tmp.clear()).str());
  173. checker->add("isHost",part->isHost()?1:0);
  174. checker->add("getFileSize",part->getFileSize(false,false));
  175. CDateTime dt;
  176. if (part->getModifiedTime(false,false,dt))
  177. dt.getString(tmp.clear());
  178. else
  179. tmp.clear().append("nodatetime");
  180. checker->add("getModifiedTime",tmp.str());
  181. unsigned crc;
  182. if (part->getCrc(crc)&&!blocked)
  183. checker->add("getCrc",crc);
  184. else
  185. checker->add("getCrc","nocrc");
  186. }
  187. void checkFile(IChecker *checker,IDistributedFile *file)
  188. {
  189. StringBuffer tmp;
  190. checker->add("queryLogicalName",file->queryLogicalName());
  191. unsigned np = file->numParts();
  192. checker->add("numParts",np);
  193. checker->add("queryDefaultDir",file->queryDefaultDir());
  194. if (np>1)
  195. checker->add("queryPartMask",file->queryPartMask());
  196. checker->add("queryProperties()",toXML(&file->queryAttributes(),tmp.clear()).str());
  197. CDateTime dt;
  198. if (file->getModificationTime(dt))
  199. dt.getString(tmp.clear());
  200. else
  201. tmp.clear().append("nodatetime");
  202. // Owned<IFileDescriptor> desc = getFileDescriptor();
  203. // checkFileDescriptor(checker,desc);
  204. //virtual bool existsPhysicalPartFiles(unsigned short port) = 0; // returns true if physical patrs all exist (on primary OR secondary)
  205. //Owned<IPropertyTree> tree = getTreeCopy();
  206. //checker->add("queryProperties()",toXML(tree,tmp.clear()).str());
  207. checker->add("getFileSize",file->getFileSize(false,false));
  208. bool blocked;
  209. checker->add("isCompressed",file->isCompressed(&blocked)?1:0);
  210. checker->add("blocked",blocked?1:0);
  211. unsigned csum;
  212. if (file->getFileCheckSum(csum)&&!blocked)
  213. checker->add("getFileCheckSum",csum);
  214. else
  215. checker->add("getFileCheckSum","nochecksum");
  216. checker->add("isSubFile",file->isSubFile()?1:0);
  217. StringBuffer clustname;
  218. checker->add("queryClusterName(0)",file->getClusterName(0,clustname).str());
  219. for (unsigned i=0;i<np;i++) {
  220. Owned<IDistributedFilePart> part = file->getPart(i);
  221. if (part)
  222. checkFilePart(checker,part,blocked);
  223. }
  224. }
  225. void checkFiles(const char *fn)
  226. {
  227. class cChecker: implements IChecker
  228. {
  229. public:
  230. virtual void title(unsigned n,const char *s)
  231. {
  232. printf("Title[%d]='%s'\n",n,s);
  233. }
  234. virtual void add(const char *s,__int64 v)
  235. {
  236. printf("%s=%"I64F"d\n",s,v);
  237. }
  238. virtual void add(const char *s,const char* v)
  239. {
  240. printf("%s='%s'\n",s,v);
  241. }
  242. virtual void add(unsigned n,const char *s,__int64 v)
  243. {
  244. printf("%s[%d]=%"I64F"d\n",s,n,v);
  245. }
  246. virtual void add(unsigned n,const char *s,const char* v)
  247. {
  248. printf("%s[%d]='%s'\n",s,n,v);
  249. }
  250. virtual void error(const char *txt)
  251. {
  252. printf("ERROR '%s'\n",txt);
  253. }
  254. } checker;
  255. unsigned start = msTick();
  256. unsigned slowest = 0;
  257. StringAttr slowname;
  258. if (fn) {
  259. checker.title(1,fn);
  260. try {
  261. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fn,user);
  262. if (!file)
  263. printf("file '%s' not found\n",fn);
  264. else
  265. checkFile(&checker,file);
  266. }
  267. catch (IException *e) {
  268. StringBuffer str;
  269. e->errorMessage(str);
  270. e->Release();
  271. checker.error(str.str());
  272. }
  273. }
  274. else {
  275. Owned<IDistributedFileIterator> iter = queryDistributedFileDirectory().getIterator("*",false,user);
  276. unsigned i=0;
  277. unsigned ss = msTick();
  278. ForEach(*iter) {
  279. i++;
  280. StringBuffer lfname;
  281. iter->getName(lfname);
  282. checker.title(i,lfname.str());
  283. try {
  284. IDistributedFile &file=iter->query();
  285. checkFile(&checker,&file);
  286. unsigned t = (msTick()-ss);
  287. if (t>slowest) {
  288. slowest = t;
  289. slowname.set(lfname.str());
  290. }
  291. }
  292. catch (IException *e) {
  293. StringBuffer str;
  294. e->errorMessage(str);
  295. e->Release();
  296. checker.error(str.str());
  297. }
  298. ss = msTick();
  299. }
  300. }
  301. unsigned t = msTick()-start;
  302. printf("Complete in %ds\n",t/1000);
  303. if (!slowname.isEmpty())
  304. printf("Slowest %s = %dms\n",slowname.get(),slowest);
  305. };
  306. const char *filelist=
  307. "thor_data400::gong_delete_plus,"
  308. "thor_data400::in::npanxx,"
  309. "thor_data400::tpm_deduped,"
  310. "thor_data400::base::movers_ingest_ll,"
  311. "thor_hank::cemtemp::fldl,"
  312. "thor_data400::patch,"
  313. "thor_data400::in::flvehreg_01_prethor_upd200204_v3,"
  314. "thor_data400::in::flvehreg_01_prethor_upd20020625_v3_flag,"
  315. "thor_data400::in::flvehreg_01_prethor_upd20020715_v3,"
  316. "thor_data400::in::flvehreg_01_prethor_upd20020715_v3_v3_flag,"
  317. "thor_data400::in::flvehreg_01_prethor_upd20020816_v3,"
  318. "thor_data400::in::flvehreg_01_prethor_upd20020816_v3_flag,"
  319. "thor_data400::in::flvehreg_01_prethor_upd20020625_v3,"
  320. "thor_data400::in::fl_lic_prethor_200208v2,"
  321. "thor_data400::in::fl_lic_prethor_200209,"
  322. "thor_data400::in::fl_lic_prethor_200210,"
  323. "thor_data400::in::fl_lic_prethor_200210_reclean,"
  324. "thor_data400::in::fl_lic_upd_200301,"
  325. "thor_data400::in::fl_lic_upd_200302,"
  326. "thor_data400::in::oh_lic_200209,"
  327. "thor_data400::in::ohio_lic_upd_200210,"
  328. "thor_data400::prepped_for_keys,"
  329. "a0e65__w20060224-155748,"
  330. "common_did,"
  331. "test::ftest1,"
  332. "thor_data50::BASE::chunk,"
  333. "hthor::key::codes_v320040901,"
  334. "thor_data400::in::ucc_direct_ok_99999999_event_20060124,"
  335. "thor400::ks_work::distancedetails";
  336. #ifndef COMPAT
  337. void dispFDesc(IFileDescriptor *fdesc)
  338. {
  339. printf("======================================\n");
  340. Owned<IPropertyTree> pt = createPTree("File");
  341. fdesc->serializeTree(*pt);
  342. StringBuffer out;
  343. toXML(pt,out);
  344. printf("%s\n",out.str());
  345. Owned<IFileDescriptor> fdesc2 = deserializeFileDescriptorTree(pt);
  346. toXML(pt,out.clear());
  347. printf("%s\n",out.str());
  348. unsigned np = fdesc->numParts();
  349. unsigned ncl = fdesc->numClusters();
  350. printf("numclusters = %d, numparts=%d\n",ncl,np);
  351. for (unsigned pass=0;pass<1;pass++) {
  352. for (unsigned ip=0;ip<np;ip++) {
  353. IPartDescriptor *part = fdesc->queryPart(ip);
  354. unsigned nc = part->numCopies();
  355. for (unsigned ic=0;ic<nc;ic++) {
  356. StringBuffer tmp1;
  357. StringBuffer tmp2;
  358. StringBuffer tmp3;
  359. StringBuffer tmp4;
  360. RemoteFilename rfn;
  361. bool blocked;
  362. out.clear().appendf("%d,%d: '%s' '%s' '%s' '%s' '%s' '%s' %s%s%s",ip,ic,
  363. part->getDirectory(tmp1,ic).str(),
  364. part->getTail(tmp2).str(),
  365. part->getPath(tmp3,ic).str(),
  366. fdesc->getFilename(ip,ic,rfn).getRemotePath(tmp4).str(), // multi TBD
  367. fdesc->queryPartMask()?fdesc->queryPartMask():"",
  368. fdesc->queryDefaultDir()?fdesc->queryDefaultDir():"",
  369. fdesc->isGrouped()?"GROUPED ":"",
  370. fdesc->queryKind()?fdesc->queryKind():"",
  371. fdesc->isCompressed(&blocked)?(blocked?" BLOCKCOMPRESSED":" COMPRESSED"):""
  372. );
  373. printf("%s\n",out.str());
  374. if (1) {
  375. MemoryBuffer mb;
  376. part->serialize(mb);
  377. Owned<IPartDescriptor> copypart;
  378. copypart.setown(deserializePartFileDescriptor(mb));
  379. StringBuffer out2;
  380. out2.appendf("%d,%d: '%s' '%s' '%s' '%s' '%s' '%s' %s%s%s",ip,ic,
  381. copypart->getDirectory(tmp1.clear(),ic).str(),
  382. copypart->getTail(tmp2.clear()).str(),
  383. copypart->getPath(tmp3.clear(),ic).str(),
  384. copypart->getFilename(ic,rfn).getRemotePath(tmp4.clear()).str(), // multi TBD
  385. copypart->queryOwner().queryPartMask()?copypart->queryOwner().queryPartMask():"",
  386. copypart->queryOwner().queryDefaultDir()?copypart->queryOwner().queryDefaultDir():"",
  387. copypart->queryOwner().isGrouped()?"GROUPED ":"",
  388. copypart->queryOwner().queryKind()?copypart->queryOwner().queryKind():"",
  389. copypart->queryOwner().isCompressed(&blocked)?(blocked?" BLOCKCOMPRESSED":" COMPRESSED"):""
  390. );
  391. if (strcmp(out.str(),out2.str())!=0)
  392. printf("FAILED!\n%s\n%s\n",out.str(),out2.str());
  393. pt.setown(createPTree("File"));
  394. copypart->queryOwner().serializeTree(*pt);
  395. StringBuffer out;
  396. toXML(pt,out);
  397. // printf("%d,%d: \n%s\n",ip,ic,out.str());
  398. }
  399. }
  400. }
  401. }
  402. }
  403. #endif
  404. // ================================================================================== UNIT TESTS
  405. class DaliTests : public CppUnit::TestFixture
  406. {
  407. CPPUNIT_TEST_SUITE( DaliTests );
  408. CPPUNIT_TEST(testDFS);
  409. // CPPUNIT_TEST(testReadAllSDS); // Ignoring this test; See comments below
  410. CPPUNIT_TEST(testSDSRW);
  411. CPPUNIT_TEST(testSDSSubs);
  412. CPPUNIT_TEST(testFiles);
  413. CPPUNIT_TEST(testGroups);
  414. CPPUNIT_TEST(testMultiCluster);
  415. #ifndef COMPAT
  416. CPPUNIT_TEST(testDF1);
  417. CPPUNIT_TEST(testDF2);
  418. CPPUNIT_TEST(testMisc);
  419. CPPUNIT_TEST(testDFile);
  420. #endif
  421. CPPUNIT_TEST(testDFSTrans);
  422. CPPUNIT_TEST(testDFSPromote);
  423. CPPUNIT_TEST(testDFSDel);
  424. CPPUNIT_TEST(testDFSRename);
  425. CPPUNIT_TEST(testDFSClearAdd);
  426. CPPUNIT_TEST(testDFSRename2);
  427. CPPUNIT_TEST(testDFSRenameThenDelete);
  428. CPPUNIT_TEST(testDFSRemoveSuperSub);
  429. // This test requires access to an external IP with dafilesrv running
  430. // CPPUNIT_TEST(testDFSRename3);
  431. CPPUNIT_TEST(testDFSAddFailReAdd);
  432. CPPUNIT_TEST(testDFSRetrySuperLock);
  433. CPPUNIT_TEST(testDFSHammer);
  434. CPPUNIT_TEST_SUITE_END();
  435. #ifndef COMPAT
  436. #endif
  437. void testGrp(SocketEndpointArray &epa)
  438. {
  439. Owned<IGroup> grp = createIGroup(epa);
  440. StringBuffer s;
  441. grp->getText(s);
  442. printf("'%s'\n",s.str());
  443. Owned<IGroup> grp2 = createIGroup(s.str());
  444. if (grp->compare(grp2)!=GRidentical) {
  445. grp->getText(s.clear());
  446. printf("^FAILED! %s\n",s.str());
  447. }
  448. }
  449. unsigned fn(unsigned n, unsigned m, unsigned seed, unsigned depth, IPropertyTree *parent)
  450. {
  451. __int64 val = parent->getPropInt64("val",0);
  452. parent->setPropInt64("val",n+val);
  453. val = parent->getPropInt64("@val",0);
  454. parent->setPropInt64("@val",m+val);
  455. val = parent->getPropInt64(NULL,0);
  456. parent->setPropInt64(NULL,seed+val);
  457. if (Rconn&&((n+m+seed)%100==0))
  458. Rconn->commit();
  459. if (!seed)
  460. return m+n;
  461. if (n==m)
  462. return seed;
  463. if (depth>10)
  464. return seed+n+m;
  465. if (seed%7==n%7)
  466. return n;
  467. if (seed%7==m%7)
  468. return m;
  469. char name[64];
  470. unsigned v = seed;
  471. name[0] = 's';
  472. name[1] = 'u';
  473. name[2] = 'b';
  474. unsigned i = 3;
  475. while (v) {
  476. name[i++] = ('A'+v%26 );
  477. v /= 26;
  478. }
  479. name[i] = 0;
  480. IPropertyTree *child = parent->queryPropTree(name);
  481. if (!child)
  482. child = parent->addPropTree(name, createPTree(name));
  483. return fn(fn(n,seed,seed*17+11,depth+1,child),fn(seed,m,seed*11+17,depth+1,child),seed*19+7,depth+1,child);
  484. }
  485. unsigned fn2(unsigned n, unsigned m, unsigned seed, unsigned depth, StringBuffer &parentname)
  486. {
  487. if (!Rconn)
  488. return 0;
  489. if ((n+m+seed)%25==0) {
  490. Rconn->commit();
  491. Rconn->Release();
  492. Rconn = querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000);
  493. ASSERT(Rconn && "Failed to connect to /DAREGRESS");
  494. }
  495. IPropertyTree *parent = parentname.length()?Rconn->queryRoot()->queryPropTree(parentname.str()):Rconn->queryRoot();
  496. ASSERT(parent && "Failed to connect to parent");
  497. __int64 val = parent->getPropInt64("val",0);
  498. parent->setPropInt64("val",n+val);
  499. val = parent->getPropInt64("@val",0);
  500. parent->setPropInt64("@val",m+val);
  501. val = parent->getPropInt64(NULL,0);
  502. parent->setPropInt64(NULL,seed+val);
  503. if (!seed)
  504. return m+n;
  505. if (n==m)
  506. return seed;
  507. if (depth>10)
  508. return seed+n+m;
  509. if (seed%7==n%7)
  510. return n;
  511. if (seed%7==m%7)
  512. return m;
  513. char name[64];
  514. unsigned v = seed;
  515. name[0] = 's';
  516. name[1] = 'u';
  517. name[2] = 'b';
  518. unsigned i = 3;
  519. while (v) {
  520. name[i++] = ('A'+v%26 );
  521. v /= 26;
  522. }
  523. name[i] = 0;
  524. unsigned l = parentname.length();
  525. if (parentname.length())
  526. parentname.append('/');
  527. parentname.append(name);
  528. IPropertyTree *child = parent->queryPropTree(name);
  529. if (!child)
  530. child = parent->addPropTree(name, createPTree(name));
  531. unsigned ret = fn2(fn2(n,seed,seed*17+11,depth+1,parentname),fn2(seed,m,seed*11+17,depth+1,parentname),seed*19+7,depth+1,parentname);
  532. parentname.setLength(l);
  533. return ret;
  534. }
  535. IFileDescriptor *createDescriptor(const char* dir, const char* name, unsigned parts, unsigned recSize, unsigned index=0)
  536. {
  537. Owned<IPropertyTree> pp = createPTree("Part");
  538. Owned<IFileDescriptor>fdesc = createFileDescriptor();
  539. fdesc->setDefaultDir(dir);
  540. StringBuffer s;
  541. SocketEndpoint ep;
  542. ep.setLocalHost(0);
  543. StringBuffer ip;
  544. ep.getIpText(ip);
  545. for (unsigned k=0;k<parts;k++) {
  546. s.clear().append(ip);
  547. Owned<INode> node = createINode(s.str());
  548. pp->setPropInt64("@size",recSize);
  549. s.clear().append(name);
  550. if (index)
  551. s.append(index);
  552. s.append("._").append(k+1).append("_of_").append(parts);
  553. fdesc->setPart(k,node,s.str(),pp);
  554. }
  555. fdesc->queryProperties().setPropInt("@recordSize",recSize);
  556. fdesc->setDefaultDir(dir);
  557. return fdesc.getClear();
  558. }
  559. void setupDFS(const char *scope, unsigned supersToDel=3, unsigned subsToCreate=4)
  560. {
  561. StringBuffer bufScope;
  562. bufScope.append("regress::").append(scope);
  563. StringBuffer bufDir;
  564. bufDir.append("regress/").append(scope);
  565. logctx.CTXLOG("Cleaning up '%s' scope", bufScope.str());
  566. for (unsigned i=1; i<=supersToDel; i++) {
  567. StringBuffer super = bufScope;
  568. super.append("::super").append(i);
  569. if (dir.exists(super.str(),user,false,true))
  570. ASSERT(dir.removeEntry(super.str(), user) && "Can't remove super-file");
  571. }
  572. logctx.CTXLOG("Creating 'regress::trans' subfiles(1,%d)", subsToCreate);
  573. for (unsigned i=1; i<=subsToCreate; i++) {
  574. StringBuffer name;
  575. name.append("sub").append(i);
  576. StringBuffer sub = bufScope;
  577. sub.append("::").append(name);
  578. // Remove first
  579. if (dir.exists(sub.str(),user,true,false))
  580. ASSERT(dir.removeEntry(sub.str(), user) && "Can't remove sub-file");
  581. try {
  582. // Create the sub file with an arbitrary format
  583. Owned<IFileDescriptor> subd = createDescriptor(bufDir.str(), name.str(), 1, 17);
  584. Owned<IPartDescriptor> partd = subd->getPart(0);
  585. RemoteFilename rfn;
  586. partd->getFilename(0, rfn);
  587. StringBuffer fname;
  588. rfn.getPath(fname);
  589. recursiveCreateDirectoryForFile(fname.str());
  590. OwnedIFile ifile = createIFile(fname.str());
  591. Owned<IFileIO> io;
  592. io.setown(ifile->open(IFOcreate));
  593. io->write(0, 17, "12345678901234567");
  594. io->close();
  595. Owned<IDistributedFile> dsub = dir.createNew(subd, sub.str());
  596. dsub->attach(sub.str(),user);
  597. } catch (IException *e) {
  598. StringBuffer msg;
  599. e->errorMessage(msg);
  600. logctx.CTXLOG("Caught exception while creating file in DFS: %s", msg.str());
  601. e->Release();
  602. ASSERT(0 && "Exception Caught in setupDFS - is the directory writeable by this user?");
  603. }
  604. // Make sure it got created
  605. ASSERT(dir.exists(sub.str(),user,true,false) && "Can't add physical files");
  606. }
  607. }
  608. void testReadBranch(const char *path)
  609. {
  610. PROGLOG("Connecting to %s",path);
  611. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_READ, 10000);
  612. ASSERT(conn && "Could not connect");
  613. IPropertyTree *root = conn->queryRoot();
  614. Owned<IAttributeIterator> aiter = root->getAttributes();
  615. StringBuffer s;
  616. ForEach(*aiter)
  617. aiter->getValue(s.clear());
  618. aiter.clear();
  619. root->getProp(NULL,s.clear());
  620. Owned<IPropertyTreeIterator> iter = root->getElements("*");
  621. StringAttrArray children;
  622. UnsignedArray childidx;
  623. ForEach(*iter) {
  624. children.append(*new StringAttrItem(iter->query().queryName()));
  625. childidx.append(root->queryChildIndex(&iter->query()));
  626. }
  627. iter.clear();
  628. conn.clear();
  629. ForEachItemIn(i,children) {
  630. s.clear().append(path);
  631. if (path[strlen(path)-1]!='/')
  632. s.append('/');
  633. s.append(children.item(i).text).append('[').append(childidx.item(i)+1).append(']');
  634. testReadBranch(s.str());
  635. }
  636. }
  637. const IContextLogger &logctx;
  638. public:
  639. DaliTests() : logctx(queryDummyContextLogger()) {
  640. init();
  641. }
  642. ~DaliTests() {
  643. destroy();
  644. }
  645. void testSDSRW()
  646. {
  647. Owned<IPropertyTree> ref = createPTree("DAREGRESS");
  648. fn(1,2,3,0,ref);
  649. StringBuffer refstr;
  650. toXML(ref,refstr,0,XML_SortTags|XML_Format);
  651. logctx.CTXLOG("Created reference size %d",refstr.length());
  652. Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS",myProcessSession(), RTM_CREATE, 1000000);
  653. Rconn = conn;
  654. IPropertyTree *root = conn->queryRoot();
  655. fn(1,2,3,0,root);
  656. conn.clear();
  657. logctx.CTXLOG("Created test branch 1");
  658. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), RTM_DELETE_ON_DISCONNECT, 1000000));
  659. root = conn->queryRoot();
  660. StringBuffer s;
  661. toXML(root,s,0,XML_SortTags|XML_Format);
  662. ASSERT(strcmp(s.str(),refstr.str())==0 && "Branch 1 does not match");
  663. conn.clear();
  664. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000));
  665. ASSERT(!conn && "RTM_DELETE_ON_DISCONNECT failed");
  666. Rconn = querySDS().connect("/DAREGRESS",myProcessSession(), RTM_CREATE, 1000000);
  667. StringBuffer pn;
  668. fn2(1,2,3,0,pn);
  669. ::Release(Rconn);
  670. logctx.CTXLOG("Created test branch 2");
  671. Rconn = NULL;
  672. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), RTM_DELETE_ON_DISCONNECT, 1000000));
  673. root = conn->queryRoot();
  674. toXML(root,s.clear(),0,XML_SortTags|XML_Format);
  675. ASSERT(strcmp(s.str(),refstr.str())==0 && "Branch 2 does not match");
  676. conn.clear();
  677. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000));
  678. ASSERT(!conn && "RTM_DELETE_ON_DISCONNECT failed");
  679. }
  680. /*
  681. * This test is invasive, obsolete and the main source of
  682. * errors in the DFS code. It was created on a time where
  683. * the DFS API was spread open and methods could openly
  684. * fiddle with its internals without injury. Times have changed.
  685. *
  686. * TODO: Convert this test into a proper test of the DFS as
  687. * it currently stands, not work around its deficiencies.
  688. *
  689. * Unfortunately, to do that, some functionality has to be
  690. * re-worked (like creating groups, adding files to it,
  691. * creating physical temporary files, etc).
  692. */
  693. void testDFS()
  694. {
  695. const size32_t recsize = 17;
  696. StringBuffer s;
  697. unsigned i;
  698. unsigned n;
  699. unsigned t;
  700. queryNamedGroupStore().remove("daregress_group");
  701. dir.removeEntry("daregress::superfile1", user);
  702. SocketEndpointArray epa;
  703. for (n=0;n<400;n++) {
  704. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  705. SocketEndpoint ep(s.str());
  706. epa.append(ep);
  707. }
  708. Owned<IGroup> group = createIGroup(epa);
  709. queryNamedGroupStore().add("daregress_group",group,true);
  710. ASSERT(queryNamedGroupStore().find(group,s.clear()) && "Created logical group not found");
  711. ASSERT(stricmp(s.str(),"daregress_group")==0 && "Created logical group found with wrong name");
  712. group.setown(queryNamedGroupStore().lookup("daregress_group"));
  713. ASSERT(group && "named group lookup failed");
  714. logctx.CTXLOG("Named group created - 400 nodes");
  715. for (i=0;i<100;i++) {
  716. Owned<IPropertyTree> pp = createPTree("Part");
  717. Owned<IFileDescriptor>fdesc = createFileDescriptor();
  718. fdesc->setDefaultDir("thordata/regress");
  719. n = 9;
  720. for (unsigned k=0;k<400;k++) {
  721. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  722. Owned<INode> node = createINode(s.str());
  723. pp->setPropInt64("@size",(n*777+i)*recsize);
  724. s.clear().append("daregress_test").append(i).append("._").append(n+1).append("_of_400");
  725. fdesc->setPart(n,node,s.str(),pp);
  726. n = (n+9)%400;
  727. }
  728. fdesc->queryProperties().setPropInt("@recordSize",17);
  729. s.clear().append("daregress::test").append(i);
  730. removeLogical(s.str(), user);
  731. StringBuffer cname;
  732. Owned<IDistributedFile> dfile = dir.createNew(fdesc);
  733. ASSERT(stricmp(dfile->getClusterName(0,cname),"daregress_group")==0 && "Cluster name wrong");
  734. s.clear().append("daregress::test").append(i);
  735. dfile->attach(s.str(),user);
  736. }
  737. logctx.CTXLOG("DFile create done - 100 files");
  738. unsigned samples = 5;
  739. t = 33;
  740. for (i=0;i<100;i++) {
  741. s.clear().append("daregress::test").append(t);
  742. ASSERT(dir.exists(s.str(),user) && "Could not find sub-file");
  743. Owned<IDistributedFile> dfile = dir.lookup(s.str(), user);
  744. ASSERT(dfile && "Could not find sub-file");
  745. offset_t totsz = 0;
  746. n = 11;
  747. for (unsigned k=0;k<400;k++) {
  748. Owned<IDistributedFilePart> part = dfile->getPart(n);
  749. ASSERT(part && "part not found");
  750. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  751. Owned<INode> node = createINode(s.str());
  752. ASSERT(node->equals(part->queryNode()) && "part node mismatch");
  753. ASSERT(part->getFileSize(false,false)==(n*777+t)*recsize && "size node mismatch");
  754. s.clear().append("daregress_test").append(t).append("._").append(n+1).append("_of_400");
  755. /* ** TBD
  756. if (stricmp(s.str(),part->queryPartName())!=0)
  757. ERROR4("part name mismatch %d, %d '%s' '%s'",t,n,s.str(),part->queryPartName());
  758. */
  759. totsz += (n*777+t)*recsize;
  760. if ((samples>0)&&(i+n+t==k)) {
  761. samples--;
  762. RemoteFilename rfn;
  763. part->getFilename(rfn,samples%2);
  764. StringBuffer fn;
  765. rfn.getRemotePath(fn);
  766. logctx.CTXLOG("SAMPLE: %d,%d %s",t,n,fn.str());
  767. }
  768. n = (n+11)%400;
  769. }
  770. ASSERT(totsz==dfile->getFileSize(false,false) && "total size mismatch");
  771. t = (t+33)%100;
  772. }
  773. logctx.CTXLOG("DFile lookup done - 100 files");
  774. // check iteration
  775. __int64 crctot = 0;
  776. unsigned np = 0;
  777. unsigned totrows = 0;
  778. Owned<IDistributedFileIterator> fiter = dir.getIterator("daregress::*",false, user);
  779. Owned<IDistributedFilePartIterator> piter;
  780. ForEach(*fiter) {
  781. piter.setown(fiter->query().getIterator());
  782. ForEach(*piter) {
  783. RemoteFilename rfn;
  784. StringBuffer s;
  785. piter->query().getFilename(rfn,0);
  786. rfn.getRemotePath(s);
  787. piter->query().getFilename(rfn,1);
  788. rfn.getRemotePath(s);
  789. crctot += crc32(s.str(),s.length(),0);
  790. np++;
  791. totrows += (unsigned)(piter->query().getFileSize(false,false)/fiter->query().queryAttributes().getPropInt("@recordSize",-1));
  792. }
  793. }
  794. piter.clear();
  795. fiter.clear();
  796. logctx.CTXLOG("DFile iterate done - %d parts, %d rows, CRC sum %"I64F"d",np,totrows,crctot);
  797. Owned<IDistributedSuperFile> sfile;
  798. sfile.setown(dir.createSuperFile("daregress::superfile1",user,true,false));
  799. for (i = 0;i<100;i++) {
  800. s.clear().append("daregress::test").append(i);
  801. sfile->addSubFile(s.str());
  802. }
  803. sfile.clear();
  804. sfile.setown(dir.lookupSuperFile("daregress::superfile1", user));
  805. ASSERT(sfile && "Could not find added superfile");
  806. __int64 savcrc = crctot;
  807. crctot = 0;
  808. np = 0;
  809. totrows = 0;
  810. size32_t srs = (size32_t)sfile->queryAttributes().getPropInt("@recordSize",-1);
  811. ASSERT(srs==17 && "Superfile does not match subfile row size");
  812. piter.setown(sfile->getIterator());
  813. ForEach(*piter) {
  814. RemoteFilename rfn;
  815. StringBuffer s;
  816. piter->query().getFilename(rfn,0);
  817. rfn.getRemotePath(s);
  818. piter->query().getFilename(rfn,1);
  819. rfn.getRemotePath(s);
  820. crctot += crc32(s.str(),s.length(),0);
  821. np++;
  822. totrows += (unsigned)(piter->query().getFileSize(false,false)/srs);
  823. }
  824. piter.clear();
  825. logctx.CTXLOG("Superfile iterate done - %d parts, %d rows, CRC sum %"I64F"d",np,totrows,crctot);
  826. ASSERT(crctot==savcrc && "SuperFile does not match sub files");
  827. unsigned tr = (unsigned)(sfile->getFileSize(false,false)/srs);
  828. ASSERT(totrows==tr && "Superfile size does not match part sum");
  829. sfile->detach();
  830. sfile.clear();
  831. sfile.setown(dir.lookupSuperFile("daregress::superfile1",user));
  832. ASSERT(!sfile && "Superfile deletion failed");
  833. t = 37;
  834. for (i=0;i<100;i++) {
  835. s.clear().append("daregress::test").append(t);
  836. removeLogical(s.str(), user);
  837. t = (t+37)%100;
  838. }
  839. logctx.CTXLOG("DFile removal complete");
  840. t = 39;
  841. for (i=0;i<100;i++) {
  842. ASSERT(!dir.exists(s.str(),user) && "Found dir after deletion");
  843. Owned<IDistributedFile> dfile = dir.lookup(s.str(), user);
  844. ASSERT(!dfile && "Found file after deletion");
  845. t = (t+39)%100;
  846. }
  847. logctx.CTXLOG("DFile removal check complete");
  848. queryNamedGroupStore().remove("daregress_group");
  849. ASSERT(!queryNamedGroupStore().lookup("daregress_group") && "Named group not removed");
  850. }
  851. void testDFSTrans()
  852. {
  853. setupDFS("trans");
  854. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  855. // Auto-commit
  856. logctx.CTXLOG("Auto-commit test (inactive transaction)");
  857. Owned<IDistributedSuperFile> sfile1 = dir.createSuperFile("regress::trans::super1",user , false, false, transaction);
  858. sfile1->addSubFile("regress::trans::sub1", false, NULL, false, transaction);
  859. sfile1->addSubFile("regress::trans::sub2", false, NULL, false, transaction);
  860. sfile1.clear();
  861. sfile1.setown(dir.lookupSuperFile("regress::trans::super1", user, transaction));
  862. ASSERT(sfile1.get() && "non-transactional add super1 failed");
  863. ASSERT(sfile1->numSubFiles() == 2 && "auto-commit add sub failed, not all subs were added");
  864. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "auto-commit add sub failed, wrong name for sub1");
  865. ASSERT(strcmp(sfile1->querySubFile(1).queryLogicalName(), "regress::trans::sub2") == 0 && "auto-commit add sub failed, wrong name for sub2");
  866. sfile1.clear();
  867. // Rollback
  868. logctx.CTXLOG("Rollback test (active transaction)");
  869. transaction->start();
  870. Owned<IDistributedSuperFile> sfile2 = dir.createSuperFile("regress::trans::super2", user, false, false, transaction);
  871. sfile2->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
  872. sfile2->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
  873. transaction->rollback();
  874. ASSERT(sfile2->numSubFiles() == 0 && "transactional rollback failed, some subs were added");
  875. sfile2.clear();
  876. sfile2.setown(dir.lookupSuperFile("regress::trans::super2", user, transaction));
  877. ASSERT(!sfile2.get() && "transactional rollback super2 failed, it exists!");
  878. // Commit
  879. logctx.CTXLOG("Commit test (active transaction)");
  880. transaction->start();
  881. Owned<IDistributedSuperFile> sfile3 = dir.createSuperFile("regress::trans::super3", user, false, false, transaction);
  882. sfile3->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
  883. sfile3->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
  884. transaction->commit();
  885. sfile3.clear();
  886. sfile3.setown(dir.lookupSuperFile("regress::trans::super3", user, transaction));
  887. ASSERT(sfile3.get() && "transactional add super3 failed");
  888. ASSERT(sfile3->numSubFiles() == 2 && "transactional add sub failed, not all subs were added");
  889. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "transactional add sub failed, wrong name for sub3");
  890. ASSERT(strcmp(sfile3->querySubFile(1).queryLogicalName(), "regress::trans::sub4") == 0 && "transactional add sub failed, wrong name for sub4");
  891. sfile3.clear();
  892. }
  893. void testDFSPromote()
  894. {
  895. setupDFS("trans");
  896. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  897. // ===============================================================================
  898. // Don't change these parameters, or you'll have to change all ERROR tests below
  899. const char *sfnames[3] = {
  900. "regress::trans::super1", "regress::trans::super2", "regress::trans::super3"
  901. };
  902. bool delsub = false;
  903. bool createonlyone = true;
  904. unsigned timeout = 1000; // 1s
  905. // ===============================================================================
  906. StringArray outlinked;
  907. logctx.CTXLOG("Promote (1, -, -) - first iteration");
  908. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub1", delsub, createonlyone, user, timeout, outlinked);
  909. {
  910. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  911. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  912. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  913. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  914. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  915. ASSERT(!sfile2.get() && "promote failed, super2 does exist");
  916. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  917. }
  918. logctx.CTXLOG("Promote (2, 1, -) - second iteration");
  919. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub2", delsub, createonlyone, user, timeout, outlinked);
  920. {
  921. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  922. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  923. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  924. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  925. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  926. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  927. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  928. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  929. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  930. ASSERT(!sfile3.get() && "promote failed, super3 does exist");
  931. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  932. }
  933. logctx.CTXLOG("Promote (3, 2, 1) - third iteration");
  934. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub3", delsub, createonlyone, user, timeout, outlinked);
  935. {
  936. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  937. ASSERT(sfile1.get() &&* "promote failed, super1 doesn't exist");
  938. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  939. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  940. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  941. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  942. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  943. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  944. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  945. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  946. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  947. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  948. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  949. }
  950. logctx.CTXLOG("Promote (4, 3, 2) - fourth iteration, expect outlinked");
  951. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub4", delsub, createonlyone, user, timeout, outlinked);
  952. {
  953. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  954. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  955. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  956. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub4") == 0 && "promote failed, wrong name for sub4");
  957. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  958. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  959. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  960. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  961. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  962. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  963. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  964. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  965. ASSERT(outlinked.length() == 1 && "promote failed, outlinked expected only one item");
  966. ASSERT(strcmp(outlinked.popGet(), "regress::trans::sub1") == 0 && "promote failed, outlinked expected to be sub1");
  967. Owned<IDistributedFile> sub1 = dir.lookup("regress::trans::sub1", user, false, false, NULL, timeout);
  968. ASSERT(sub1.get() && "promote failed, sub1 was physically deleted");
  969. }
  970. logctx.CTXLOG("Promote ([1,2], 4, 3) - fifth iteration, two in-files");
  971. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub1,regress::trans::sub2", delsub, createonlyone, user, timeout, outlinked);
  972. {
  973. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  974. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  975. ASSERT(sfile1->numSubFiles() == 2 && "promote failed, super1 should have two subfiles");
  976. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  977. ASSERT(strcmp(sfile1->querySubFile(1).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  978. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  979. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  980. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  981. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub4") == 0 && "promote failed, wrong name for sub4");
  982. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  983. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  984. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  985. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  986. ASSERT(outlinked.length() == 1 && "promote failed, outlinked expected only one item");
  987. ASSERT(strcmp(outlinked.popGet(), "regress::trans::sub2") == 0 && "promote failed, outlinked expected to be sub2");
  988. Owned<IDistributedFile> sub1 = dir.lookup("regress::trans::sub1", user, false, false, NULL, timeout);
  989. ASSERT(sub1.get() && "promote failed, sub1 was physically deleted");
  990. Owned<IDistributedFile> sub2 = dir.lookup("regress::trans::sub2", user, false, false, NULL, timeout);
  991. ASSERT(sub2.get() && "promote failed, sub2 was physically deleted");
  992. }
  993. }
  994. void testSDSSubs()
  995. {
  996. subchangenum = 0;
  997. subchangetotal = 0;
  998. IArrayOf<CChange> a;
  999. for (unsigned i=0; i<10 ; i++)
  1000. a.append(*new CChange(i));
  1001. unsigned last = 0;
  1002. loop {
  1003. Sleep(1000);
  1004. {
  1005. CriticalBlock block(subchangesect);
  1006. if (subchangenum==last)
  1007. break;
  1008. last = subchangenum;
  1009. }
  1010. }
  1011. ForEachItemIn(i1, a)
  1012. a.item(i1).stopsem.signal();
  1013. ForEachItemIn(i2, a)
  1014. a.item(i2).join();
  1015. ASSERT(subchangenum==1000 && "Not all notifications received");
  1016. logctx.CTXLOG("%d subscription notifications, check sum = %"I64F"d",subchangenum,subchangetotal);
  1017. }
  1018. /*
  1019. * This test is silly and can take a very long time on clusters with
  1020. * a large file-system. But keeping it here for further reference.
  1021. * MORE: Maybe, this could be added to daliadmin or a thorough check
  1022. * on the filesystem, together with super-file checks et al.
  1023. void testReadAllSDS()
  1024. {
  1025. logctx.CTXLOG("Test SDS connecting to every branch");
  1026. testReadBranch("/");
  1027. logctx.CTXLOG("Connected to every branch");
  1028. }
  1029. */
  1030. void testMultiCluster()
  1031. {
  1032. Owned<IGroup> grp1 = createIGroup("192.168.51.1-5");
  1033. Owned<IGroup> grp2 = createIGroup("192.168.16.1-5");
  1034. Owned<IGroup> grp3 = createIGroup("192.168.53.1-5");
  1035. queryNamedGroupStore().add("testgrp1",grp1);
  1036. queryNamedGroupStore().add("testgrp2",grp2);
  1037. queryNamedGroupStore().add("testgrp3",grp3);
  1038. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1039. fdesc->setDefaultDir("/c$/thordata/test");
  1040. fdesc->setPartMask("testfile1._$P$_of_$N$");
  1041. fdesc->setNumParts(5);
  1042. ClusterPartDiskMapSpec mapping;
  1043. fdesc->addCluster(grp1,mapping);
  1044. fdesc->addCluster(grp2,mapping);
  1045. fdesc->addCluster(grp3,mapping);
  1046. removeLogical("test::testfile1", user);
  1047. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1048. removeLogical("test::testfile1", user);
  1049. file->attach("test::testfile1",user);
  1050. StringBuffer name;
  1051. unsigned i;
  1052. for (i=0;i<file->numClusters();i++)
  1053. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1054. file.clear();
  1055. file.setown(queryDistributedFileDirectory().lookup("test::testfile1",user));
  1056. for (i=0;i<file->numClusters();i++)
  1057. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1058. file.clear();
  1059. file.setown(queryDistributedFileDirectory().lookup("test::testfile1@testgrp1",user));
  1060. for (i=0;i<file->numClusters();i++)
  1061. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1062. file.clear();
  1063. removeLogical("test::testfile1@testgrp2", user);
  1064. file.setown(queryDistributedFileDirectory().lookup("test::testfile1",user));
  1065. for (i=0;i<file->numClusters();i++)
  1066. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1067. }
  1068. void testFiles()
  1069. {
  1070. StringBuffer fn;
  1071. const char *s = filelist;
  1072. unsigned slowest = 0;
  1073. StringAttr slowname;
  1074. unsigned tot = 0;
  1075. unsigned n = 0;
  1076. while (*s) {
  1077. fn.clear();
  1078. while (*s==',')
  1079. s++;
  1080. while (*s&&(*s!=','))
  1081. fn.append(*(s++));
  1082. if (fn.length()) {
  1083. n++;
  1084. unsigned ss = msTick();
  1085. checkFiles(fn);
  1086. unsigned t = (msTick()-ss);
  1087. if (t>slowest) {
  1088. slowest = t;
  1089. slowname.set(fn);
  1090. }
  1091. tot += t;
  1092. }
  1093. }
  1094. printf("Complete in %ds avg %dms\n",tot/1000,tot/(n?n:1));
  1095. if (!slowname.isEmpty())
  1096. printf("Slowest %s = %dms\n",slowname.get(),slowest);
  1097. }
  1098. void testGroups()
  1099. {
  1100. SocketEndpointArray epa;
  1101. SocketEndpoint ep;
  1102. Owned<IGroup> grp;
  1103. testGrp(epa);
  1104. ep.set("10.150.10.80");
  1105. epa.append(ep);
  1106. testGrp(epa);
  1107. ep.set("10.150.10.81");
  1108. epa.append(ep);
  1109. testGrp(epa);
  1110. ep.set("10.150.10.82");
  1111. epa.append(ep);
  1112. testGrp(epa);
  1113. ep.set("10.150.10.83:111");
  1114. epa.append(ep);
  1115. testGrp(epa);
  1116. ep.set("10.150.10.84:111");
  1117. epa.append(ep);
  1118. testGrp(epa);
  1119. ep.set("10.150.10.84:111");
  1120. epa.append(ep);
  1121. testGrp(epa);
  1122. ep.set("10.150.10.84:111");
  1123. epa.append(ep);
  1124. testGrp(epa);
  1125. ep.set("10.150.10.84:111");
  1126. epa.append(ep);
  1127. testGrp(epa);
  1128. ep.set("10.150.10.85:111");
  1129. epa.append(ep);
  1130. testGrp(epa);
  1131. ep.set("10.150.10.86:111");
  1132. epa.append(ep);
  1133. testGrp(epa);
  1134. ep.set("10.150.10.87");
  1135. epa.append(ep);
  1136. testGrp(epa);
  1137. ep.set("10.150.10.87");
  1138. epa.append(ep);
  1139. testGrp(epa);
  1140. ep.set("10.150.10.88");
  1141. epa.append(ep);
  1142. testGrp(epa);
  1143. ep.set("10.150.11.88");
  1144. epa.append(ep);
  1145. testGrp(epa);
  1146. ep.set("10.173.10.88");
  1147. epa.append(ep);
  1148. testGrp(epa);
  1149. ep.set("10.173.10.88:22222");
  1150. epa.append(ep);
  1151. testGrp(epa);
  1152. ep.set("192.168.10.88");
  1153. epa.append(ep);
  1154. testGrp(epa);
  1155. }
  1156. #ifndef COMPAT
  1157. void testDF1()
  1158. {
  1159. const char * fname = "testing::propfile2";
  1160. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1161. Owned<IPropertyTree> pt = createPTree("Attr");
  1162. RemoteFilename rfn;
  1163. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/part._1_of_3");
  1164. pt->setPropInt("@size",123);
  1165. fdesc->setPart(0,rfn,pt);
  1166. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/part._2_of_3");
  1167. pt->setPropInt("@size",456);
  1168. fdesc->setPart(1,rfn,pt);
  1169. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/part._3_of_3");
  1170. pt->setPropInt("@size",789);
  1171. fdesc->setPart(2,rfn,pt);
  1172. dispFDesc(fdesc);
  1173. try {
  1174. removeLogical(fname, user);
  1175. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1176. {
  1177. DistributedFilePropertyLock lock(file);
  1178. lock.queryAttributes().setProp("@testing","1");
  1179. }
  1180. file->attach(fname,user);
  1181. } catch (IException *e) {
  1182. StringBuffer msg;
  1183. e->errorMessage(msg);
  1184. logctx.CTXLOG("Caught exception while setting property: %s", msg.str());
  1185. e->Release();
  1186. }
  1187. }
  1188. void testDF2() // 4*3 superfile
  1189. {
  1190. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1191. Owned<IPropertyTree> pt = createPTree("Attr");
  1192. RemoteFilename rfn;
  1193. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/partone._1_of_3");
  1194. pt->setPropInt("@size",1231);
  1195. fdesc->setPart(0,rfn,pt);
  1196. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/parttwo._1_of_3");
  1197. pt->setPropInt("@size",1232);
  1198. fdesc->setPart(1,rfn,pt);
  1199. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/partthree._1_of_3");
  1200. pt->setPropInt("@size",1233);
  1201. fdesc->setPart(2,rfn,pt);
  1202. rfn.setRemotePath("//10.150.10.80/c$/thordata/test2/partfour._1_of_3");
  1203. pt->setPropInt("@size",1234);
  1204. fdesc->setPart(3,rfn,pt);
  1205. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/partone._2_of_3");
  1206. pt->setPropInt("@size",4565);
  1207. fdesc->setPart(4,rfn,pt);
  1208. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/parttwo._2_of_3");
  1209. pt->setPropInt("@size",4566);
  1210. fdesc->setPart(5,rfn,pt);
  1211. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/partthree._2_of_3");
  1212. pt->setPropInt("@size",4567);
  1213. fdesc->setPart(6,rfn,pt);
  1214. rfn.setRemotePath("//10.150.10.81/c$/thordata/test2/partfour._2_of_3");
  1215. pt->setPropInt("@size",4568);
  1216. fdesc->setPart(7,rfn,pt);
  1217. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/partone._3_of_3");
  1218. pt->setPropInt("@size",7899);
  1219. fdesc->setPart(8,rfn,pt);
  1220. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/parttwo._3_of_3");
  1221. pt->setPropInt("@size",78910);
  1222. fdesc->setPart(9,rfn,pt);
  1223. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/partthree._3_of_3");
  1224. pt->setPropInt("@size",78911);
  1225. fdesc->setPart(10,rfn,pt);
  1226. rfn.setRemotePath("//10.150.10.82/c$/thordata/test2/partfour._3_of_3");
  1227. pt->setPropInt("@size",78912);
  1228. fdesc->setPart(11,rfn,pt);
  1229. ClusterPartDiskMapSpec mspec;
  1230. mspec.interleave = 4;
  1231. fdesc->endCluster(mspec);
  1232. dispFDesc(fdesc);
  1233. }
  1234. void testMisc()
  1235. {
  1236. ClusterPartDiskMapSpec mspec;
  1237. Owned<IGroup> grp = createIGroup("10.150.10.1-3");
  1238. RemoteFilename rfn;
  1239. for (unsigned i=0;i<3;i++)
  1240. for (unsigned ic=0;ic<mspec.defaultCopies;ic++) {
  1241. constructPartFilename(grp,i+1,3,(i==1)?"test.txt":NULL,"test._$P$_of_$N$","/c$/thordata/test",ic,mspec,rfn);
  1242. StringBuffer tmp;
  1243. printf("%d,%d: %s\n",i,ic,rfn.getRemotePath(tmp).str());
  1244. }
  1245. }
  1246. void testDFile()
  1247. {
  1248. ClusterPartDiskMapSpec map;
  1249. { // 1: single part file old method
  1250. #define TN "1"
  1251. removeLogical("test::ftest"TN, user);
  1252. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1253. RemoteFilename rfn;
  1254. rfn.setRemotePath("//10.150.10.1/c$/thordata/test/ftest"TN"._1_of_1");
  1255. fdesc->setPart(0,rfn);
  1256. fdesc->endCluster(map);
  1257. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1258. file->attach("test::ftest"TN,user);
  1259. #undef TN
  1260. }
  1261. { // 2: single part file new method
  1262. #define TN "2"
  1263. removeLogical("test::ftest"TN, user);
  1264. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1265. fdesc->setPartMask("ftest"TN"._$P$_of_$N$");
  1266. fdesc->setNumParts(1);
  1267. Owned<IGroup> grp = createIGroup("10.150.10.1");
  1268. fdesc->addCluster(grp,map);
  1269. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1270. file->attach("test::ftest"TN,user);
  1271. #undef TN
  1272. }
  1273. Owned<IGroup> grp3 = createIGroup("10.150.10.1,10.150.10.2,10.150.10.3");
  1274. queryNamedGroupStore().add("__testgroup3__",grp3,true);
  1275. { // 3: three parts file old method
  1276. #define TN "3"
  1277. removeLogical("test::ftest"TN, user);
  1278. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1279. RemoteFilename rfn;
  1280. rfn.setRemotePath("//10.150.10.1/c$/thordata/test/ftest"TN"._1_of_3");
  1281. fdesc->setPart(0,rfn);
  1282. rfn.setRemotePath("//10.150.10.2/c$/thordata/test/ftest"TN"._2_of_3");
  1283. fdesc->setPart(1,rfn);
  1284. rfn.setRemotePath("//10.150.10.3/c$/thordata/test/ftest"TN"._3_of_3");
  1285. fdesc->setPart(2,rfn);
  1286. fdesc->endCluster(map);
  1287. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1288. file->attach("test::ftest"TN,user);
  1289. #undef TN
  1290. }
  1291. { // 4: three part file new method
  1292. #define TN "4"
  1293. removeLogical("test::ftest"TN, user);
  1294. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1295. fdesc->setPartMask("ftest"TN"._$P$_of_$N$");
  1296. fdesc->setNumParts(3);
  1297. fdesc->addCluster(grp3,map);
  1298. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1299. file->attach("test::ftest"TN,user);
  1300. #undef TN
  1301. }
  1302. }
  1303. #endif
  1304. void testDFSDel()
  1305. {
  1306. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1307. setupDFS("del");
  1308. // Sub-file deletion
  1309. logctx.CTXLOG("Creating regress::del::super1 and attaching sub");
  1310. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::del::super1", user, false, false, transaction);
  1311. sfile->addSubFile("regress::del::sub1", false, NULL, false, transaction);
  1312. sfile->addSubFile("regress::del::sub4", false, NULL, false, transaction);
  1313. sfile.clear();
  1314. logctx.CTXLOG("Deleting regress::del::sub1, should fail");
  1315. try {
  1316. if (dir.removeEntry("regress::del::sub1", user, transaction)) {
  1317. ASSERT(0 && "Could remove sub, this will make the DFS inconsistent!");
  1318. return;
  1319. }
  1320. } catch (IException *e) {
  1321. // expecting an exception
  1322. e->Release();
  1323. }
  1324. logctx.CTXLOG("Removing regress::del::sub1 from super1, no del");
  1325. sfile.setown(transaction->lookupSuperFile("regress::del::super1"));
  1326. sfile->removeSubFile("regress::del::sub1", false, false, transaction);
  1327. ASSERT(sfile->numSubFiles() == 1 && "File sub1 was not removed from super1");
  1328. sfile.clear();
  1329. ASSERT(dir.exists("regress::del::sub1", user) && "File sub1 was removed from the file system");
  1330. logctx.CTXLOG("Removing regress::del::sub4 from super1, del");
  1331. sfile.setown(transaction->lookupSuperFile("regress::del::super1"));
  1332. sfile->removeSubFile("regress::del::sub4", true, false, transaction);
  1333. ASSERT(!sfile->numSubFiles() && "File sub4 was not removed from super1");
  1334. sfile.clear();
  1335. ASSERT(!dir.exists("regress::del::sub4", user) && "File sub4 was NOT removed from the file system");
  1336. // Logical Remove
  1337. logctx.CTXLOG("Deleting 'regress::del::super1, should work");
  1338. ASSERT(dir.removeEntry("regress::del::super1", user) && "Can't remove super1");
  1339. logctx.CTXLOG("Deleting 'regress::del::sub1 autoCommit, should work");
  1340. ASSERT(dir.removeEntry("regress::del::sub1", user) && "Can't remove sub1");
  1341. logctx.CTXLOG("Removing 'regress::del::sub2 - rollback");
  1342. transaction->start();
  1343. dir.removeEntry("regress::del::sub2", user, transaction);
  1344. transaction->rollback();
  1345. ASSERT(dir.exists("regress::del::sub2", user, true, false) && "Shouldn't have removed sub2 on rollback");
  1346. logctx.CTXLOG("Removing 'regress::del::sub2 - commit");
  1347. transaction->start();
  1348. dir.removeEntry("regress::del::sub2", user, transaction);
  1349. transaction->commit();
  1350. ASSERT(!dir.exists("regress::del::sub2", user, true, false) && "Should have removed sub2 on commit");
  1351. // Physical Remove
  1352. logctx.CTXLOG("Physically removing 'regress::del::sub3 - rollback");
  1353. transaction->start();
  1354. dir.removeEntry("regress::del::sub3", user, transaction);
  1355. transaction->rollback();
  1356. ASSERT(dir.exists("regress::del::sub3", user, true, false) && "Shouldn't have removed sub3 on rollback");
  1357. logctx.CTXLOG("Physically removing 'regress::del::sub3 - commit");
  1358. transaction->start();
  1359. dir.removeEntry("regress::del::sub3", user, transaction);
  1360. transaction->commit();
  1361. ASSERT(!dir.exists("regress::del::sub3", user, true, false) && "Should have removed sub3 on commit");
  1362. }
  1363. void testDFSRename()
  1364. {
  1365. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1366. if (dir.exists("regress::rename::other1",user,false,false))
  1367. ASSERT(dir.removeEntry("regress::rename::other1", user) && "Can't remove 'regress::rename::other1'");
  1368. if (dir.exists("regress::rename::other2",user,false,false))
  1369. ASSERT(dir.removeEntry("regress::rename::other2", user) && "Can't remove 'regress::rename::other2'");
  1370. setupDFS("rename");
  1371. try {
  1372. logctx.CTXLOG("Renaming 'regress::rename::sub1 to 'sub2' with auto-commit, should fail");
  1373. dir.renamePhysical("regress::rename::sub1", "regress::rename::sub2", user, transaction);
  1374. ASSERT(0 && "Renamed to existing file should have failed!");
  1375. return;
  1376. } catch (IException *e) {
  1377. // Expecting exception
  1378. e->Release();
  1379. }
  1380. logctx.CTXLOG("Renaming 'regress::rename::sub1 to 'other1' with auto-commit");
  1381. dir.renamePhysical("regress::rename::sub1", "regress::rename::other1", user, transaction);
  1382. ASSERT(dir.exists("regress::rename::other1", user, true, false) && "Renamed to other failed");
  1383. logctx.CTXLOG("Renaming 'regress::rename::sub2 to 'other2' and rollback");
  1384. transaction->start();
  1385. dir.renamePhysical("regress::rename::sub2", "regress::rename::other2", user, transaction);
  1386. transaction->rollback();
  1387. ASSERT(!dir.exists("regress::rename::other2", user, true, false) && "Renamed to other2 when it shouldn't");
  1388. logctx.CTXLOG("Renaming 'regress::rename::sub2 to 'other2' and commit");
  1389. transaction->start();
  1390. dir.renamePhysical("regress::rename::sub2", "regress::rename::other2", user, transaction);
  1391. transaction->commit();
  1392. ASSERT(dir.exists("regress::rename::other2", user, true, false) && "Renamed to other failed");
  1393. try {
  1394. logctx.CTXLOG("Renaming 'regress::rename::sub3 to 'sub3' with auto-commit, should fail");
  1395. dir.renamePhysical("regress::rename::sub3", "regress::rename::sub3", user, transaction);
  1396. ASSERT(0 && "Renamed to same file should have failed!");
  1397. return;
  1398. } catch (IException *e) {
  1399. // Expecting exception
  1400. e->Release();
  1401. }
  1402. // To make sure renamed files are cleaned properly
  1403. printf("Renaming 'regress::rename::other2 to 'sub2' on auto-commit\n");
  1404. dir.renamePhysical("regress::rename::other2", "regress::rename::sub2", user, transaction);
  1405. ASSERT(dir.exists("regress::rename::sub2", user, true, false) && "Renamed from other2 failed");
  1406. }
  1407. void testDFSClearAdd()
  1408. {
  1409. setupDFS("clearadd");
  1410. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1411. logctx.CTXLOG("Creating regress::clearadd::super1 and attaching sub1 & sub4");
  1412. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::clearadd::super1", user, false, false, transaction);
  1413. sfile->addSubFile("regress::clearadd::sub1", false, NULL, false, transaction);
  1414. sfile->addSubFile("regress::clearadd::sub4", false, NULL, false, transaction);
  1415. sfile.clear();
  1416. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1417. transaction->start();
  1418. logctx.CTXLOG("Removing sub1 from super1, within transaction");
  1419. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1420. sfile->removeSubFile("regress::clearadd::sub1", false, false, transaction);
  1421. sfile.clear();
  1422. logctx.CTXLOG("Adding sub1 back into to super1, within transaction");
  1423. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1424. sfile->addSubFile("regress::clearadd::sub1", false, NULL, false, transaction);
  1425. sfile.clear();
  1426. try
  1427. {
  1428. transaction->commit();
  1429. }
  1430. catch (IException *e)
  1431. {
  1432. StringBuffer eStr;
  1433. e->errorMessage(eStr);
  1434. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1435. e->Release();
  1436. }
  1437. sfile.setown(dir.lookupSuperFile("regress::clearadd::super1", user));
  1438. ASSERT(NULL != sfile->querySubFileNamed("regress::clearadd::sub1") && "regress::clearadd::sub1, should be a subfile of super1");
  1439. // same but remove all (clear)
  1440. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1441. transaction->start();
  1442. logctx.CTXLOG("Adding sub2 into to super1, within transaction");
  1443. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1444. sfile->addSubFile("regress::clearadd::sub2", false, NULL, false, transaction);
  1445. sfile.clear();
  1446. logctx.CTXLOG("Removing all sub files from super1, within transaction");
  1447. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1448. sfile->removeSubFile(NULL, false, false, transaction);
  1449. sfile.clear();
  1450. logctx.CTXLOG("Adding sub2 back into to super1, within transaction");
  1451. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1452. sfile->addSubFile("regress::clearadd::sub2", false, NULL, false, transaction);
  1453. sfile.clear();
  1454. try
  1455. {
  1456. transaction->commit();
  1457. }
  1458. catch (IException *e)
  1459. {
  1460. StringBuffer eStr;
  1461. e->errorMessage(eStr);
  1462. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1463. e->Release();
  1464. }
  1465. sfile.setown(dir.lookupSuperFile("regress::clearadd::super1", user));
  1466. ASSERT(NULL != sfile->querySubFileNamed("regress::clearadd::sub2") && "regress::clearadd::sub2, should be a subfile of super1");
  1467. ASSERT(NULL == sfile->querySubFileNamed("regress::clearadd::sub1") && "regress::clearadd::sub1, should NOT be a subfile of super1");
  1468. ASSERT(NULL == sfile->querySubFileNamed("regress::clearadd::sub4") && "regress::clearadd::sub4, should NOT be a subfile of super1");
  1469. ASSERT(1 == sfile->numSubFiles() && "regress::clearadd::super1 should contain 1 subfile");
  1470. }
  1471. void testDFSAddFailReAdd()
  1472. {
  1473. setupDFS("addreadd");
  1474. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1475. logctx.CTXLOG("Creating super1 and supet2, adding sub1 and sub2 to super1 and sub3 to super2");
  1476. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::addreadd::super1", user, false, false, transaction);
  1477. sfile->addSubFile("regress::addreadd::sub1", false, NULL, false, transaction);
  1478. sfile->addSubFile("regress::addreadd::sub2", false, NULL, false, transaction);
  1479. sfile.clear();
  1480. Owned<IDistributedSuperFile> sfile2 = dir.createSuperFile("regress::addreadd::super2", user, false, false, transaction);
  1481. sfile2->addSubFile("regress::addreadd::sub3", false, NULL, false, transaction);
  1482. sfile2.clear();
  1483. class CShortLock : implements IThreaded
  1484. {
  1485. StringAttr fileName;
  1486. unsigned secDelay;
  1487. CThreaded threaded;
  1488. public:
  1489. CShortLock(const char *_fileName, unsigned _secDelay) : fileName(_fileName), secDelay(_secDelay), threaded("CShortLock", this) { }
  1490. virtual void main()
  1491. {
  1492. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fileName, NULL);
  1493. if (!file)
  1494. {
  1495. PROGLOG("File %s not found", fileName.get());
  1496. return;
  1497. }
  1498. PROGLOG("Locked file: %s, sleeping (before unlock) for %d secs", fileName.get(), secDelay);
  1499. MilliSleep(secDelay * 1000);
  1500. PROGLOG("Unlocking file: %s", fileName.get());
  1501. }
  1502. void start() { threaded.start(); }
  1503. };
  1504. /* Tests transaction failing, due to lock and retrying after having partial success */
  1505. CShortLock sL("regress::addreadd::sub2", 30); // the 2nd subfile of super1
  1506. sL.start();
  1507. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1508. logctx.CTXLOG("Starting transaction");
  1509. transaction->start();
  1510. logctx.CTXLOG("Adding contents of regress::addreadd::super1 to regress::addreadd::super2, within transaction");
  1511. sfile.setown(transaction->lookupSuperFile("regress::addreadd::super2"));
  1512. sfile->addSubFile("regress::addreadd::super1", false, NULL, true, transaction); // add contents of super1 to super2
  1513. sfile.setown(transaction->lookupSuperFile("regress::addreadd::super1"));
  1514. sfile->removeSubFile(NULL, false, false, transaction); // clears super1
  1515. sfile.clear();
  1516. try
  1517. {
  1518. transaction->commit();
  1519. }
  1520. catch (IException *e)
  1521. {
  1522. StringBuffer eStr;
  1523. e->errorMessage(eStr);
  1524. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1525. e->Release();
  1526. }
  1527. transaction.clear();
  1528. sfile.setown(dir.lookupSuperFile("regress::addreadd::super2", user));
  1529. ASSERT(3 == sfile->numSubFiles() && "regress::addreadd::super2 should contain 3 subfiles");
  1530. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub1") && "regress::addreadd::sub1, should be a subfile of super2");
  1531. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub2") && "regress::addreadd::sub2, should be a subfile of super2");
  1532. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub3") && "regress::addreadd::sub3, should be a subfile of super2");
  1533. sfile.setown(dir.lookupSuperFile("regress::addreadd::super1", user));
  1534. ASSERT(0 == sfile->numSubFiles() && "regress::addreadd::super1 should contain 0 subfiles");
  1535. }
  1536. void testDFSRetrySuperLock()
  1537. {
  1538. setupDFS("retrysuperlock");
  1539. logctx.CTXLOG("Creating regress::retrysuperlock::super1 and regress::retrysuperlock::sub1");
  1540. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::retrysuperlock::super1", user, false, false);
  1541. sfile->addSubFile("regress::retrysuperlock::sub1", false, NULL, false);
  1542. sfile.clear();
  1543. class CShortLock : implements IThreaded
  1544. {
  1545. StringAttr fileName;
  1546. unsigned secDelay;
  1547. CThreaded threaded;
  1548. public:
  1549. CShortLock(const char *_fileName, unsigned _secDelay) : fileName(_fileName), secDelay(_secDelay), threaded("CShortLock", this) { }
  1550. ~CShortLock()
  1551. {
  1552. threaded.join();
  1553. }
  1554. virtual void main()
  1555. {
  1556. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fileName, NULL);
  1557. if (!file)
  1558. {
  1559. PROGLOG("File %s not found", fileName.get());
  1560. return;
  1561. }
  1562. PROGLOG("Locked file: %s, sleeping (before unlock) for %d secs", fileName.get(), secDelay);
  1563. MilliSleep(secDelay * 1000);
  1564. PROGLOG("Unlocking file: %s", fileName.get());
  1565. }
  1566. void start() { threaded.start(); }
  1567. };
  1568. /* Tests transaction failing, due to lock and retrying after having partial success */
  1569. CShortLock sL("regress::retrysuperlock::super1", 15);
  1570. sL.start();
  1571. sfile.setown(dir.lookupSuperFile("regress::retrysuperlock::super1", user));
  1572. if (sfile)
  1573. {
  1574. logctx.CTXLOG("Removing subfiles from regress::retrysuperlock::super1");
  1575. sfile->removeSubFile(NULL, false, false);
  1576. logctx.CTXLOG("SUCCEEDED");
  1577. }
  1578. // put it back, for next test
  1579. sfile->addSubFile("regress::retrysuperlock::sub1", false, NULL, false);
  1580. sfile.clear();
  1581. // try again, this time in transaction
  1582. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1583. logctx.CTXLOG("Starting transaction");
  1584. transaction->start();
  1585. sfile.setown(transaction->lookupSuperFile("regress::retrysuperlock::super1"));
  1586. if (sfile)
  1587. {
  1588. logctx.CTXLOG("Removing subfiles from regress::retrysuperlock::super1 with transaction");
  1589. sfile->removeSubFile(NULL, false, false, transaction);
  1590. logctx.CTXLOG("SUCCEEDED");
  1591. }
  1592. sfile.clear();
  1593. logctx.CTXLOG("Committing transaction");
  1594. transaction->commit();
  1595. }
  1596. void testDFSRename2()
  1597. {
  1598. setupDFS("rename2");
  1599. /* Create a super and sub1 and sub4 in a auto-commit transaction
  1600. * Inside a transaction, do:
  1601. * a) rename sub2 to renamedsub2
  1602. * b) remove sub1
  1603. * c) add sub1
  1604. * d) add renamedsub2
  1605. * e) commit transaction
  1606. * f) renamed files existing and superfile contents
  1607. */
  1608. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1609. logctx.CTXLOG("Creating regress::rename2::super1 and attaching sub1 & sub4");
  1610. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::rename2::super1", user, false, false, transaction);
  1611. sfile->addSubFile("regress::rename2::sub1", false, NULL, false, transaction);
  1612. sfile->addSubFile("regress::rename2::sub4", false, NULL, false, transaction);
  1613. sfile.clear();
  1614. if (dir.exists("regress::rename2::renamedsub2",user,false,false))
  1615. ASSERT(dir.removeEntry("regress::rename2::renamedsub2", user) && "Can't remove 'regress::rename2::renamedsub2'");
  1616. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1617. logctx.CTXLOG("Starting transaction");
  1618. transaction->start();
  1619. logctx.CTXLOG("Renaming regress::rename2::sub2 TO regress::rename2::renamedsub2");
  1620. dir.renamePhysical("regress::rename2::sub2", "regress::rename2::renamedsub2", user, transaction);
  1621. logctx.CTXLOG("Removing regress::rename2::sub1 from regress::rename2::super1");
  1622. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1623. sfile->removeSubFile("regress::rename2::sub1", false, false, transaction);
  1624. sfile.clear();
  1625. logctx.CTXLOG("Adding renamedsub2 to super1");
  1626. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1627. sfile->addSubFile("regress::rename2::renamedsub2", false, NULL, false, transaction);
  1628. sfile.clear();
  1629. logctx.CTXLOG("Adding back sub1 to super1");
  1630. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1631. sfile->addSubFile("regress::rename2::sub1", false, NULL, false, transaction);
  1632. sfile.clear();
  1633. try
  1634. {
  1635. logctx.CTXLOG("Committing transaction");
  1636. transaction->commit();
  1637. }
  1638. catch (IException *e)
  1639. {
  1640. StringBuffer eStr;
  1641. e->errorMessage(eStr);
  1642. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1643. e->Release();
  1644. }
  1645. transaction.clear();
  1646. // validate..
  1647. ASSERT(dir.exists("regress::rename2::renamedsub2", user, true, false) && "regress::rename2::renamedsub2 should exist now transaction committed");
  1648. sfile.setown(dir.lookupSuperFile("regress::rename2::super1", user));
  1649. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::renamedsub2") && "regress::rename2::renamedsub2, should be a subfile of super1");
  1650. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::sub1") && "regress::rename2::sub1, should be a subfile of super1");
  1651. ASSERT(NULL == sfile->querySubFileNamed("regress::rename2::sub2") && "regress::rename2::sub2, should NOT be a subfile of super1");
  1652. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::sub4") && "regress::rename2::sub4, should be a subfile of super1");
  1653. ASSERT(3 == sfile->numSubFiles() && "regress::rename2::super1 should contain 4 subfiles");
  1654. }
  1655. void testDFSRenameThenDelete()
  1656. {
  1657. setupDFS("renamedelete");
  1658. if (dir.exists("regress::renamedelete::renamedsub2",user,false,false))
  1659. ASSERT(dir.removeEntry("regress::renamedelete::renamedsub2", user) && "Can't remove 'regress::renamedelete::renamedsub2'");
  1660. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1661. logctx.CTXLOG("Starting transaction");
  1662. transaction->start();
  1663. logctx.CTXLOG("Renaming regress::renamedelete::sub2 TO regress::renamedelete::renamedsub2");
  1664. dir.renamePhysical("regress::renamedelete::sub2", "regress::renamedelete::renamedsub2", user, transaction);
  1665. logctx.CTXLOG("Removing regress::renamedelete::renamedsub2");
  1666. ASSERT(dir.removeEntry("regress::renamedelete::renamedsub2", user, transaction) && "Can't remove 'regress::rename2::renamedsub2'");
  1667. try
  1668. {
  1669. logctx.CTXLOG("Committing transaction");
  1670. transaction->commit();
  1671. }
  1672. catch (IException *e)
  1673. {
  1674. StringBuffer eStr;
  1675. e->errorMessage(eStr);
  1676. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1677. e->Release();
  1678. }
  1679. transaction.clear();
  1680. // validate..
  1681. ASSERT(!dir.exists("regress::renamedelete::sub2", user, true, false) && "regress::renamedelete::sub2 should NOT exist now transaction has been committed");
  1682. ASSERT(!dir.exists("regress::renamedelete::renamedsub2", user, true, false) && "regress::renamedelete::renamedsub2 should NOT exist now transaction has been committed");
  1683. }
  1684. // NB: This test requires access (via dafilesrv) to an external IP (10.239.222.21 used below, but could be any)
  1685. void testDFSRename3()
  1686. {
  1687. setupDFS("rename3");
  1688. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1689. if (dir.exists("regress::tenwayfile",user))
  1690. ASSERT(dir.removeEntry("regress::tenwayfile", user) && "Can't remove");
  1691. Owned<IFileDescriptor> fdesc = createDescriptor("regress", "tenwayfile", 1, 17);
  1692. Owned<IGroup> grp1 = createIGroup("10.239.222.1");
  1693. ClusterPartDiskMapSpec mapping;
  1694. fdesc->setClusterGroup(0, grp1);
  1695. Linked<IPartDescriptor> part = fdesc->getPart(0);
  1696. RemoteFilename rfn;
  1697. part->getFilename(0, rfn);
  1698. StringBuffer path;
  1699. rfn.getPath(path);
  1700. recursiveCreateDirectoryForFile(path.str());
  1701. OwnedIFile ifile = createIFile(path.str());
  1702. Owned<IFileIO> io;
  1703. io.setown(ifile->open(IFOcreate));
  1704. io->write(0, 17, "12345678901234567");
  1705. io->close();
  1706. Owned<IDistributedFile> dsub = dir.createNew(fdesc);
  1707. dsub->attach("regress::tenwayfile", user);
  1708. dsub.clear();
  1709. fdesc.clear();
  1710. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1711. transaction->start();
  1712. logctx.CTXLOG("Renaming regress::rename3::sub2 TO regress::tenwayfile@mythor");
  1713. dir.renamePhysical("regress::rename3::sub2", "regress::tenwayfile@mythor", user, transaction);
  1714. try
  1715. {
  1716. transaction->commit();
  1717. }
  1718. catch (IException *e)
  1719. {
  1720. StringBuffer eStr;
  1721. e->errorMessage(eStr);
  1722. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1723. e->Release();
  1724. }
  1725. transaction.setown(createDistributedFileTransaction(user));
  1726. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1727. transaction->start();
  1728. logctx.CTXLOG("Renaming regress::tenwayfile TO regress::rename3::sub2");
  1729. dir.renamePhysical("regress::tenwayfile@mythor", "regress::rename3::sub2", user, transaction);
  1730. try
  1731. {
  1732. transaction->commit();
  1733. }
  1734. catch (IException *e)
  1735. {
  1736. StringBuffer eStr;
  1737. e->errorMessage(eStr);
  1738. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1739. e->Release();
  1740. }
  1741. }
  1742. void testDFSRemoveSuperSub()
  1743. {
  1744. setupDFS("removesupersub");
  1745. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  1746. logctx.CTXLOG("Creating regress::removesupersub::super1 and attaching sub1 & sub4");
  1747. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::removesupersub::super1", user, false, false, transaction);
  1748. sfile->addSubFile("regress::removesupersub::sub1", false, NULL, false, transaction);
  1749. sfile->addSubFile("regress::removesupersub::sub4", false, NULL, false, transaction);
  1750. sfile.clear();
  1751. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1752. logctx.CTXLOG("Starting transaction");
  1753. transaction->start();
  1754. logctx.CTXLOG("Removing super removesupersub::super1 along with it's subfiles sub1 and sub4");
  1755. dir.removeSuperFile("regress::removesupersub::super1", true, user, transaction);
  1756. try
  1757. {
  1758. logctx.CTXLOG("Committing transaction");
  1759. transaction->commit();
  1760. }
  1761. catch (IException *e)
  1762. {
  1763. StringBuffer eStr;
  1764. e->errorMessage(eStr);
  1765. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1766. e->Release();
  1767. }
  1768. transaction.clear();
  1769. // validate..
  1770. ASSERT(!dir.exists("regress::removesupersub::super1", user, true, false) && "regress::removesupersub::super1 should NOT exist");
  1771. ASSERT(!dir.exists("regress::removesupersub::sub1", user, true, false) && "regress::removesupersub::sub1 should NOT exist");
  1772. ASSERT(!dir.exists("regress::removesupersub::sub4", user, true, false) && "regress::removesupersub::sub4 should NOT exist");
  1773. }
  1774. void testDFSHammer()
  1775. {
  1776. unsigned numFiles = 100;
  1777. unsigned numReads = 40000;
  1778. unsigned hammerThreads = 10;
  1779. setupDFS("hammer", 0, numFiles);
  1780. StringBuffer msg("Reading ");
  1781. msg.append(numFiles).append(" files").append(numReads).append(" times, on ").append(hammerThreads).append(" threads");
  1782. logctx.CTXLOG("%s", msg.str());
  1783. class CHammerFactory : public CSimpleInterface, implements IThreadFactory
  1784. {
  1785. public:
  1786. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1787. virtual IPooledThread *createNew()
  1788. {
  1789. class CHammerThread : public CSimpleInterface, implements IPooledThread
  1790. {
  1791. StringAttr filename;
  1792. public:
  1793. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1794. virtual void init(void *param)
  1795. {
  1796. filename.set((const char *)param);
  1797. }
  1798. virtual void main()
  1799. {
  1800. try
  1801. {
  1802. Owned<IPropertyTree> tree = queryDistributedFileDirectory().getFileTree(filename,user);
  1803. }
  1804. catch (IException *e)
  1805. {
  1806. PrintExceptionLog(e, NULL);
  1807. }
  1808. }
  1809. virtual bool stop() { return true; }
  1810. virtual bool canReuse() { return true; }
  1811. };
  1812. return new CHammerThread();
  1813. }
  1814. } poolFactory;
  1815. CTimeMon tm;
  1816. Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, hammerThreads, 2000);
  1817. while (numReads--)
  1818. {
  1819. StringBuffer filename("regress::hammer::sub");
  1820. unsigned fn = 1+(getRandom()%numFiles);
  1821. filename.append(fn);
  1822. PROGLOG("Hammer file: %s", filename.str());
  1823. pool->start((void *)filename.str());
  1824. }
  1825. pool->joinAll();
  1826. PROGLOG("Hammer test took: %d ms", tm.elapsed());
  1827. }
  1828. };
  1829. CPPUNIT_TEST_SUITE_REGISTRATION( DaliTests );
  1830. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( DaliTests, "Dali" );
  1831. #endif // _USE_CPPUNIT