dalitests.cpp 100 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. /*
  14. * Dali Quick Regression Suite: Tests Dali functionality on a programmatic way.
  15. *
  16. * Add as much as possible here to avoid having to run the Hthor/Thor regressions
  17. * all the time for Dali tests, since most of it can be tested quickly from here.
  18. */
  19. #ifdef _USE_CPPUNIT
  20. #include "mpbase.hpp"
  21. #include "mpcomm.hpp"
  22. #include "daclient.hpp"
  23. #include "dadfs.hpp"
  24. #include "dafdesc.hpp"
  25. #include "dasds.hpp"
  26. #include "danqs.hpp"
  27. #include "dautils.hpp"
  28. #include "unittests.hpp"
  29. //#define COMPAT
  30. // ======================================================================= Support Functions / Classes
  31. static __int64 subchangetotal;
  32. static unsigned subchangenum;
  33. static CriticalSection subchangesect;
  34. static IRemoteConnection *Rconn;
  35. static IDistributedFileDirectory & dir = queryDistributedFileDirectory();
  36. static IUserDescriptor *user = createUserDescriptor();
  37. static unsigned initCounter = 0; // counter for initialiser
  38. // Declared in dadfs.cpp *only* when CPPUNIT is active
  39. extern void removeLogical(const char *fname, IUserDescriptor *user);
  40. void init() {
  41. // Only initialise on first pass
  42. if (initCounter != 0)
  43. return;
  44. InitModuleObjects();
  45. user->set("user", "passwd");
  46. // Connect to local Dali
  47. SocketEndpoint ep;
  48. ep.set(".", 7070);
  49. SocketEndpointArray epa;
  50. epa.append(ep);
  51. Owned<IGroup> group = createIGroup(epa);
  52. initClientProcess(group, DCR_Other);
  53. initCounter++;
  54. }
  55. void destroy() {
  56. // Only destroy on last pass
  57. if (initCounter != 0)
  58. return;
  59. // Cleanup
  60. releaseAtoms();
  61. closedownClientProcess();
  62. setNodeCaching(false);
  63. initCounter--;
  64. }
  65. class CCSub : public CInterface, implements ISDSConnectionSubscription, implements ISDSSubscription
  66. {
  67. unsigned n;
  68. unsigned &count;
  69. public:
  70. IMPLEMENT_IINTERFACE;
  71. CCSub(unsigned _n,unsigned &_count)
  72. : count(_count)
  73. {
  74. n = _n;
  75. }
  76. virtual void notify()
  77. {
  78. CriticalBlock block(subchangesect);
  79. subchangetotal += n;
  80. subchangenum++;
  81. count++;
  82. }
  83. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  84. {
  85. CriticalBlock block(subchangesect);
  86. subchangetotal += n;
  87. subchangenum++;
  88. subchangetotal += (unsigned)flags;
  89. subchangetotal += crc32(xpath,strlen(xpath),0);
  90. if (valueLen) {
  91. subchangetotal += crc32((const char *)valueData,valueLen,0);
  92. }
  93. count++;
  94. }
  95. };
  96. class CChange : public Thread
  97. {
  98. Owned<IRemoteConnection> conn;
  99. Owned<CCSub> sub;
  100. StringAttr path;
  101. SubscriptionId id[10];
  102. unsigned n;
  103. unsigned count;
  104. public:
  105. Semaphore stopsem;
  106. CChange(unsigned _n)
  107. {
  108. n = _n;
  109. StringBuffer s("/DAREGRESS/CONSUB");
  110. s.append(n+1);
  111. path.set(s.str());
  112. conn.setown(querySDS().connect(path, myProcessSession(), RTM_CREATE|RTM_DELETE_ON_DISCONNECT, 1000000));
  113. unsigned i;
  114. for (i=0;i<5;i++)
  115. id[i] = conn->subscribe(*new CCSub(n*1000+i,count));
  116. s.append("/testprop");
  117. for (;i<10;i++)
  118. id[i] = querySDS().subscribe(s.str(),*new CCSub(n*1000+i,count),false,true);
  119. count = 0;
  120. start();
  121. }
  122. virtual int run()
  123. {
  124. unsigned i;
  125. for (i = 0;i<10; i++) {
  126. conn->queryRoot()->setPropInt("testprop", (i*17+n*21)%100);
  127. conn->commit();
  128. for (unsigned j=0;j<1000;j++) {
  129. {
  130. CriticalBlock block(subchangesect);
  131. if (count>=(i+1)*10)
  132. break;
  133. }
  134. Sleep(10);
  135. }
  136. }
  137. stopsem.wait();
  138. for (i=0;i<10;i++)
  139. conn->unsubscribe(id[i]);
  140. return 0;
  141. }
  142. };
  143. interface IChecker
  144. {
  145. virtual void title(unsigned n,const char *s)=0;
  146. virtual void add(const char *s,__int64 v)=0;
  147. virtual void add(const char *s,const char* v)=0;
  148. virtual void add(unsigned n,const char *s,__int64 v)=0;
  149. virtual void add(unsigned n,const char *s,const char* v)=0;
  150. virtual void error(const char *txt)=0;
  151. };
  152. void checkFilePart(IChecker *checker,IDistributedFilePart *part,bool blocked)
  153. {
  154. StringBuffer tmp;
  155. checker->add("getPartIndex",part->getPartIndex());
  156. unsigned n = part->numCopies();
  157. checker->add("numCopies",part->numCopies());
  158. checker->add("maxCopies",n);
  159. RemoteFilename rfn;
  160. for (unsigned copy=0;copy<n;copy++) {
  161. INode *node = part->queryNode(copy);
  162. if (node)
  163. checker->add(copy,"queryNode",node->endpoint().getUrlStr(tmp.clear()).str());
  164. else
  165. checker->error("missing node");
  166. checker->add(copy,"getFilename",part->getFilename(rfn,copy).getRemotePath(tmp.clear()).str());
  167. }
  168. checker->add("getPartName",part->getPartName(tmp.clear()).str());
  169. #ifndef COMPAT
  170. checker->add("getPartDirectory",part->getPartDirectory(tmp.clear()).str());
  171. #endif
  172. checker->add("queryProperties()",toXML(&part->queryAttributes(),tmp.clear()).str());
  173. checker->add("isHost",part->isHost()?1:0);
  174. checker->add("getFileSize",part->getFileSize(false,false));
  175. CDateTime dt;
  176. if (part->getModifiedTime(false,false,dt))
  177. dt.getString(tmp.clear());
  178. else
  179. tmp.clear().append("nodatetime");
  180. checker->add("getModifiedTime",tmp.str());
  181. unsigned crc;
  182. if (part->getCrc(crc)&&!blocked)
  183. checker->add("getCrc",crc);
  184. else
  185. checker->add("getCrc","nocrc");
  186. }
  187. void checkFile(IChecker *checker,IDistributedFile *file)
  188. {
  189. StringBuffer tmp;
  190. checker->add("queryLogicalName",file->queryLogicalName());
  191. unsigned np = file->numParts();
  192. checker->add("numParts",np);
  193. checker->add("queryDefaultDir",file->queryDefaultDir());
  194. if (np>1)
  195. checker->add("queryPartMask",file->queryPartMask());
  196. checker->add("queryProperties()",toXML(&file->queryAttributes(),tmp.clear()).str());
  197. CDateTime dt;
  198. if (file->getModificationTime(dt))
  199. dt.getString(tmp.clear());
  200. else
  201. tmp.clear().append("nodatetime");
  202. // Owned<IFileDescriptor> desc = getFileDescriptor();
  203. // checkFileDescriptor(checker,desc);
  204. //virtual bool existsPhysicalPartFiles(unsigned short port) = 0; // returns true if physical patrs all exist (on primary OR secondary)
  205. //Owned<IPropertyTree> tree = getTreeCopy();
  206. //checker->add("queryProperties()",toXML(tree,tmp.clear()).str());
  207. checker->add("getFileSize",file->getFileSize(false,false));
  208. bool blocked;
  209. checker->add("isCompressed",file->isCompressed(&blocked)?1:0);
  210. checker->add("blocked",blocked?1:0);
  211. unsigned csum;
  212. if (file->getFileCheckSum(csum)&&!blocked)
  213. checker->add("getFileCheckSum",csum);
  214. else
  215. checker->add("getFileCheckSum","nochecksum");
  216. StringBuffer clustname;
  217. checker->add("queryClusterName(0)",file->getClusterName(0,clustname).str());
  218. for (unsigned i=0;i<np;i++) {
  219. Owned<IDistributedFilePart> part = file->getPart(i);
  220. if (part)
  221. checkFilePart(checker,part,blocked);
  222. }
  223. }
  224. void checkFiles(const char *fn)
  225. {
  226. class cChecker: implements IChecker
  227. {
  228. public:
  229. virtual void title(unsigned n,const char *s)
  230. {
  231. printf("Title[%d]='%s'\n",n,s);
  232. }
  233. virtual void add(const char *s,__int64 v)
  234. {
  235. printf("%s=%" I64F "d\n",s,v);
  236. }
  237. virtual void add(const char *s,const char* v)
  238. {
  239. printf("%s='%s'\n",s,v);
  240. }
  241. virtual void add(unsigned n,const char *s,__int64 v)
  242. {
  243. printf("%s[%d]=%" I64F "d\n",s,n,v);
  244. }
  245. virtual void add(unsigned n,const char *s,const char* v)
  246. {
  247. printf("%s[%d]='%s'\n",s,n,v);
  248. }
  249. virtual void error(const char *txt)
  250. {
  251. printf("ERROR '%s'\n",txt);
  252. }
  253. } checker;
  254. unsigned start = msTick();
  255. unsigned slowest = 0;
  256. StringAttr slowname;
  257. if (fn) {
  258. checker.title(1,fn);
  259. try {
  260. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fn,user);
  261. if (!file)
  262. printf("file '%s' not found\n",fn);
  263. else
  264. checkFile(&checker,file);
  265. }
  266. catch (IException *e) {
  267. StringBuffer str;
  268. e->errorMessage(str);
  269. e->Release();
  270. checker.error(str.str());
  271. }
  272. }
  273. else {
  274. Owned<IDistributedFileIterator> iter = queryDistributedFileDirectory().getIterator("*",false,user);
  275. unsigned i=0;
  276. unsigned ss = msTick();
  277. ForEach(*iter) {
  278. i++;
  279. StringBuffer lfname;
  280. iter->getName(lfname);
  281. checker.title(i,lfname.str());
  282. try {
  283. IDistributedFile &file=iter->query();
  284. checkFile(&checker,&file);
  285. unsigned t = (msTick()-ss);
  286. if (t>slowest) {
  287. slowest = t;
  288. slowname.set(lfname.str());
  289. }
  290. }
  291. catch (IException *e) {
  292. StringBuffer str;
  293. e->errorMessage(str);
  294. e->Release();
  295. checker.error(str.str());
  296. }
  297. ss = msTick();
  298. }
  299. }
  300. unsigned t = msTick()-start;
  301. printf("Complete in %ds\n",t/1000);
  302. if (!slowname.isEmpty())
  303. printf("Slowest %s = %dms\n",slowname.get(),slowest);
  304. };
  305. const char *filelist=
  306. "thor_data400::gong_delete_plus,"
  307. "thor_data400::in::npanxx,"
  308. "thor_data400::tpm_deduped,"
  309. "thor_data400::base::movers_ingest_ll,"
  310. "thor_hank::cemtemp::fldl,"
  311. "thor_data400::patch,"
  312. "thor_data400::in::flvehreg_01_prethor_upd200204_v3,"
  313. "thor_data400::in::flvehreg_01_prethor_upd20020625_v3_flag,"
  314. "thor_data400::in::flvehreg_01_prethor_upd20020715_v3,"
  315. "thor_data400::in::flvehreg_01_prethor_upd20020715_v3_v3_flag,"
  316. "thor_data400::in::flvehreg_01_prethor_upd20020816_v3,"
  317. "thor_data400::in::flvehreg_01_prethor_upd20020816_v3_flag,"
  318. "thor_data400::in::flvehreg_01_prethor_upd20020625_v3,"
  319. "thor_data400::in::fl_lic_prethor_200208v2,"
  320. "thor_data400::in::fl_lic_prethor_200209,"
  321. "thor_data400::in::fl_lic_prethor_200210,"
  322. "thor_data400::in::fl_lic_prethor_200210_reclean,"
  323. "thor_data400::in::fl_lic_upd_200301,"
  324. "thor_data400::in::fl_lic_upd_200302,"
  325. "thor_data400::in::oh_lic_200209,"
  326. "thor_data400::in::ohio_lic_upd_200210,"
  327. "thor_data400::prepped_for_keys,"
  328. "a0e65__w20060224-155748,"
  329. "common_did,"
  330. "test::ftest1,"
  331. "thor_data50::BASE::chunk,"
  332. "hthor::key::codes_v320040901,"
  333. "thor_data400::in::ucc_direct_ok_99999999_event_20060124,"
  334. "thor400::ks_work::distancedetails";
  335. #ifndef COMPAT
  336. void dispFDesc(IFileDescriptor *fdesc)
  337. {
  338. printf("======================================\n");
  339. Owned<IPropertyTree> pt = createPTree("File");
  340. fdesc->serializeTree(*pt);
  341. StringBuffer out;
  342. toXML(pt,out);
  343. printf("%s\n",out.str());
  344. Owned<IFileDescriptor> fdesc2 = deserializeFileDescriptorTree(pt);
  345. toXML(pt,out.clear());
  346. printf("%s\n",out.str());
  347. unsigned np = fdesc->numParts();
  348. unsigned ncl = fdesc->numClusters();
  349. printf("numclusters = %d, numparts=%d\n",ncl,np);
  350. for (unsigned pass=0;pass<1;pass++) {
  351. for (unsigned ip=0;ip<np;ip++) {
  352. IPartDescriptor *part = fdesc->queryPart(ip);
  353. unsigned nc = part->numCopies();
  354. for (unsigned ic=0;ic<nc;ic++) {
  355. StringBuffer tmp1;
  356. StringBuffer tmp2;
  357. StringBuffer tmp3;
  358. StringBuffer tmp4;
  359. RemoteFilename rfn;
  360. bool blocked;
  361. out.clear().appendf("%d,%d: '%s' '%s' '%s' '%s' '%s' '%s' %s%s%s",ip,ic,
  362. part->getDirectory(tmp1,ic).str(),
  363. part->getTail(tmp2).str(),
  364. part->getPath(tmp3,ic).str(),
  365. fdesc->getFilename(ip,ic,rfn).getRemotePath(tmp4).str(), // multi TBD
  366. fdesc->queryPartMask()?fdesc->queryPartMask():"",
  367. fdesc->queryDefaultDir()?fdesc->queryDefaultDir():"",
  368. fdesc->isGrouped()?"GROUPED ":"",
  369. fdesc->queryKind()?fdesc->queryKind():"",
  370. fdesc->isCompressed(&blocked)?(blocked?" BLOCKCOMPRESSED":" COMPRESSED"):""
  371. );
  372. printf("%s\n",out.str());
  373. if (1) {
  374. MemoryBuffer mb;
  375. part->serialize(mb);
  376. Owned<IPartDescriptor> copypart;
  377. copypart.setown(deserializePartFileDescriptor(mb));
  378. StringBuffer out2;
  379. out2.appendf("%d,%d: '%s' '%s' '%s' '%s' '%s' '%s' %s%s%s",ip,ic,
  380. copypart->getDirectory(tmp1.clear(),ic).str(),
  381. copypart->getTail(tmp2.clear()).str(),
  382. copypart->getPath(tmp3.clear(),ic).str(),
  383. copypart->getFilename(ic,rfn).getRemotePath(tmp4.clear()).str(), // multi TBD
  384. copypart->queryOwner().queryPartMask()?copypart->queryOwner().queryPartMask():"",
  385. copypart->queryOwner().queryDefaultDir()?copypart->queryOwner().queryDefaultDir():"",
  386. copypart->queryOwner().isGrouped()?"GROUPED ":"",
  387. copypart->queryOwner().queryKind()?copypart->queryOwner().queryKind():"",
  388. copypart->queryOwner().isCompressed(&blocked)?(blocked?" BLOCKCOMPRESSED":" COMPRESSED"):""
  389. );
  390. if (strcmp(out.str(),out2.str())!=0)
  391. printf("FAILED!\n%s\n%s\n",out.str(),out2.str());
  392. pt.setown(createPTree("File"));
  393. copypart->queryOwner().serializeTree(*pt);
  394. StringBuffer out;
  395. toXML(pt,out);
  396. // printf("%d,%d: \n%s\n",ip,ic,out.str());
  397. }
  398. }
  399. }
  400. }
  401. }
  402. #endif
  403. // ================================================================================== UNIT TESTS
  404. class CDaliTestsStress : public CppUnit::TestFixture
  405. {
  406. CPPUNIT_TEST_SUITE( CDaliTestsStress );
  407. CPPUNIT_TEST(testInit);
  408. CPPUNIT_TEST(testDFS);
  409. // CPPUNIT_TEST(testReadAllSDS); // Ignoring this test; See comments below
  410. CPPUNIT_TEST(testSDSRW);
  411. CPPUNIT_TEST(testSDSSubs);
  412. CPPUNIT_TEST(testSDSSubs2);
  413. CPPUNIT_TEST(testFiles);
  414. CPPUNIT_TEST(testGroups);
  415. CPPUNIT_TEST(testMultiCluster);
  416. #ifndef COMPAT
  417. CPPUNIT_TEST(testDF1);
  418. CPPUNIT_TEST(testDF2);
  419. CPPUNIT_TEST(testMisc);
  420. CPPUNIT_TEST(testDFile);
  421. #endif
  422. CPPUNIT_TEST(testDFSTrans);
  423. CPPUNIT_TEST(testDFSPromote);
  424. CPPUNIT_TEST(testDFSDel);
  425. CPPUNIT_TEST(testDFSRename);
  426. CPPUNIT_TEST(testDFSClearAdd);
  427. CPPUNIT_TEST(testDFSRename2);
  428. CPPUNIT_TEST(testDFSRenameThenDelete);
  429. CPPUNIT_TEST(testDFSRemoveSuperSub);
  430. // This test requires access to an external IP with dafilesrv running
  431. // CPPUNIT_TEST(testDFSRename3);
  432. CPPUNIT_TEST(testDFSAddFailReAdd);
  433. CPPUNIT_TEST(testDFSRetrySuperLock);
  434. CPPUNIT_TEST(testDFSHammer);
  435. CPPUNIT_TEST(testSDSNodeSubs);
  436. CPPUNIT_TEST_SUITE_END();
  437. #ifndef COMPAT
  438. #endif
  439. void testGrp(SocketEndpointArray &epa)
  440. {
  441. Owned<IGroup> grp = createIGroup(epa);
  442. StringBuffer s;
  443. grp->getText(s);
  444. printf("'%s'\n",s.str());
  445. Owned<IGroup> grp2 = createIGroup(s.str());
  446. if (grp->compare(grp2)!=GRidentical) {
  447. grp->getText(s.clear());
  448. printf("^FAILED! %s\n",s.str());
  449. }
  450. }
  451. unsigned fn(unsigned n, unsigned m, unsigned seed, unsigned depth, IPropertyTree *parent)
  452. {
  453. __int64 val = parent->getPropInt64("val",0);
  454. parent->setPropInt64("val",n+val);
  455. val = parent->getPropInt64("@val",0);
  456. parent->setPropInt64("@val",m+val);
  457. val = parent->getPropInt64(NULL,0);
  458. parent->setPropInt64(NULL,seed+val);
  459. if (Rconn&&((n+m+seed)%100==0))
  460. Rconn->commit();
  461. if (!seed)
  462. return m+n;
  463. if (n==m)
  464. return seed;
  465. if (depth>10)
  466. return seed+n+m;
  467. if (seed%7==n%7)
  468. return n;
  469. if (seed%7==m%7)
  470. return m;
  471. char name[64];
  472. unsigned v = seed;
  473. name[0] = 's';
  474. name[1] = 'u';
  475. name[2] = 'b';
  476. unsigned i = 3;
  477. while (v) {
  478. name[i++] = ('A'+v%26 );
  479. v /= 26;
  480. }
  481. name[i] = 0;
  482. IPropertyTree *child = parent->queryPropTree(name);
  483. if (!child)
  484. child = parent->addPropTree(name, createPTree(name));
  485. return fn(fn(n,seed,seed*17+11,depth+1,child),fn(seed,m,seed*11+17,depth+1,child),seed*19+7,depth+1,child);
  486. }
  487. unsigned fn2(unsigned n, unsigned m, unsigned seed, unsigned depth, StringBuffer &parentname)
  488. {
  489. if (!Rconn)
  490. return 0;
  491. if ((n+m+seed)%25==0) {
  492. Rconn->commit();
  493. Rconn->Release();
  494. Rconn = querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000);
  495. ASSERT(Rconn && "Failed to connect to /DAREGRESS");
  496. }
  497. IPropertyTree *parent = parentname.length()?Rconn->queryRoot()->queryPropTree(parentname.str()):Rconn->queryRoot();
  498. ASSERT(parent && "Failed to connect to parent");
  499. __int64 val = parent->getPropInt64("val",0);
  500. parent->setPropInt64("val",n+val);
  501. val = parent->getPropInt64("@val",0);
  502. parent->setPropInt64("@val",m+val);
  503. val = parent->getPropInt64(NULL,0);
  504. parent->setPropInt64(NULL,seed+val);
  505. if (!seed)
  506. return m+n;
  507. if (n==m)
  508. return seed;
  509. if (depth>10)
  510. return seed+n+m;
  511. if (seed%7==n%7)
  512. return n;
  513. if (seed%7==m%7)
  514. return m;
  515. char name[64];
  516. unsigned v = seed;
  517. name[0] = 's';
  518. name[1] = 'u';
  519. name[2] = 'b';
  520. unsigned i = 3;
  521. while (v) {
  522. name[i++] = ('A'+v%26 );
  523. v /= 26;
  524. }
  525. name[i] = 0;
  526. unsigned l = parentname.length();
  527. if (parentname.length())
  528. parentname.append('/');
  529. parentname.append(name);
  530. IPropertyTree *child = parent->queryPropTree(name);
  531. if (!child)
  532. child = parent->addPropTree(name, createPTree(name));
  533. unsigned ret = fn2(fn2(n,seed,seed*17+11,depth+1,parentname),fn2(seed,m,seed*11+17,depth+1,parentname),seed*19+7,depth+1,parentname);
  534. parentname.setLength(l);
  535. return ret;
  536. }
  537. IFileDescriptor *createDescriptor(const char* dir, const char* name, unsigned parts, unsigned recSize, unsigned index=0)
  538. {
  539. Owned<IPropertyTree> pp = createPTree("Part");
  540. Owned<IFileDescriptor>fdesc = createFileDescriptor();
  541. fdesc->setDefaultDir(dir);
  542. StringBuffer s;
  543. SocketEndpoint ep;
  544. ep.setLocalHost(0);
  545. StringBuffer ip;
  546. ep.getIpText(ip);
  547. for (unsigned k=0;k<parts;k++) {
  548. s.clear().append(ip);
  549. Owned<INode> node = createINode(s.str());
  550. pp->setPropInt64("@size",recSize);
  551. s.clear().append(name);
  552. if (index)
  553. s.append(index);
  554. s.append("._").append(k+1).append("_of_").append(parts);
  555. fdesc->setPart(k,node,s.str(),pp);
  556. }
  557. fdesc->queryProperties().setPropInt("@recordSize",recSize);
  558. fdesc->setDefaultDir(dir);
  559. return fdesc.getClear();
  560. }
  561. void setupDFS(const char *scope, unsigned supersToDel=3, unsigned subsToCreate=4)
  562. {
  563. StringBuffer bufScope;
  564. bufScope.append("regress::").append(scope);
  565. StringBuffer bufDir;
  566. bufDir.append("regress/").append(scope);
  567. logctx.CTXLOG("Cleaning up '%s' scope", bufScope.str());
  568. for (unsigned i=1; i<=supersToDel; i++) {
  569. StringBuffer super = bufScope;
  570. super.append("::super").append(i);
  571. if (dir.exists(super.str(),user,false,true))
  572. ASSERT(dir.removeEntry(super.str(), user) && "Can't remove super-file");
  573. }
  574. logctx.CTXLOG("Creating 'regress::trans' subfiles(1,%d)", subsToCreate);
  575. for (unsigned i=1; i<=subsToCreate; i++) {
  576. StringBuffer name;
  577. name.append("sub").append(i);
  578. StringBuffer sub = bufScope;
  579. sub.append("::").append(name);
  580. // Remove first
  581. if (dir.exists(sub.str(),user,true,false))
  582. ASSERT(dir.removeEntry(sub.str(), user) && "Can't remove sub-file");
  583. try {
  584. // Create the sub file with an arbitrary format
  585. Owned<IFileDescriptor> subd = createDescriptor(bufDir.str(), name.str(), 1, 17);
  586. Owned<IPartDescriptor> partd = subd->getPart(0);
  587. RemoteFilename rfn;
  588. partd->getFilename(0, rfn);
  589. StringBuffer fname;
  590. rfn.getPath(fname);
  591. recursiveCreateDirectoryForFile(fname.str());
  592. OwnedIFile ifile = createIFile(fname.str());
  593. Owned<IFileIO> io;
  594. io.setown(ifile->open(IFOcreate));
  595. io->write(0, 17, "12345678901234567");
  596. io->close();
  597. Owned<IDistributedFile> dsub = dir.createNew(subd, sub.str()); // GH->JCS second parameter is wrong
  598. dsub->attach(sub.str(),user);
  599. } catch (IException *e) {
  600. StringBuffer msg;
  601. e->errorMessage(msg);
  602. logctx.CTXLOG("Caught exception while creating file in DFS: %s", msg.str());
  603. e->Release();
  604. ASSERT(0 && "Exception Caught in setupDFS - is the directory writeable by this user?");
  605. }
  606. // Make sure it got created
  607. ASSERT(dir.exists(sub.str(),user,true,false) && "Can't add physical files");
  608. }
  609. }
  610. void testReadBranch(const char *path)
  611. {
  612. PROGLOG("Connecting to %s",path);
  613. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_READ, 10000);
  614. ASSERT(conn && "Could not connect");
  615. IPropertyTree *root = conn->queryRoot();
  616. Owned<IAttributeIterator> aiter = root->getAttributes();
  617. StringBuffer s;
  618. ForEach(*aiter)
  619. aiter->getValue(s.clear());
  620. aiter.clear();
  621. root->getProp(NULL,s.clear());
  622. Owned<IPropertyTreeIterator> iter = root->getElements("*");
  623. StringAttrArray children;
  624. UnsignedArray childidx;
  625. ForEach(*iter) {
  626. children.append(*new StringAttrItem(iter->query().queryName()));
  627. childidx.append(root->queryChildIndex(&iter->query()));
  628. }
  629. iter.clear();
  630. conn.clear();
  631. ForEachItemIn(i,children) {
  632. s.clear().append(path);
  633. if (path[strlen(path)-1]!='/')
  634. s.append('/');
  635. s.append(children.item(i).text).append('[').append(childidx.item(i)+1).append(']');
  636. testReadBranch(s.str());
  637. }
  638. }
  639. const IContextLogger &logctx;
  640. public:
  641. CDaliTestsStress() : logctx(queryDummyContextLogger()) {
  642. }
  643. ~CDaliTestsStress() {
  644. destroy();
  645. }
  646. void testInit()
  647. {
  648. init();
  649. }
  650. void testSDSRW()
  651. {
  652. Owned<IPropertyTree> ref = createPTree("DAREGRESS");
  653. fn(1,2,3,0,ref);
  654. StringBuffer refstr;
  655. toXML(ref,refstr,0,XML_SortTags|XML_Format);
  656. logctx.CTXLOG("Created reference size %d",refstr.length());
  657. Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS",myProcessSession(), RTM_CREATE, 1000000);
  658. Rconn = conn;
  659. IPropertyTree *root = conn->queryRoot();
  660. fn(1,2,3,0,root);
  661. conn.clear();
  662. logctx.CTXLOG("Created test branch 1");
  663. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), RTM_DELETE_ON_DISCONNECT, 1000000));
  664. root = conn->queryRoot();
  665. StringBuffer s;
  666. toXML(root,s,0,XML_SortTags|XML_Format);
  667. ASSERT(strcmp(s.str(),refstr.str())==0 && "Branch 1 does not match");
  668. conn.clear();
  669. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000));
  670. ASSERT(!conn && "RTM_DELETE_ON_DISCONNECT failed");
  671. Rconn = querySDS().connect("/DAREGRESS",myProcessSession(), RTM_CREATE, 1000000);
  672. StringBuffer pn;
  673. fn2(1,2,3,0,pn);
  674. ::Release(Rconn);
  675. logctx.CTXLOG("Created test branch 2");
  676. Rconn = NULL;
  677. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), RTM_DELETE_ON_DISCONNECT, 1000000));
  678. root = conn->queryRoot();
  679. toXML(root,s.clear(),0,XML_SortTags|XML_Format);
  680. ASSERT(strcmp(s.str(),refstr.str())==0 && "Branch 2 does not match");
  681. conn.clear();
  682. conn.setown(querySDS().connect("/DAREGRESS",myProcessSession(), 0, 1000000));
  683. ASSERT(!conn && "RTM_DELETE_ON_DISCONNECT failed");
  684. }
  685. /*
  686. * This test is invasive, obsolete and the main source of
  687. * errors in the DFS code. It was created on a time where
  688. * the DFS API was spread open and methods could openly
  689. * fiddle with its internals without injury. Times have changed.
  690. *
  691. * TODO: Convert this test into a proper test of the DFS as
  692. * it currently stands, not work around its deficiencies.
  693. *
  694. * Unfortunately, to do that, some functionality has to be
  695. * re-worked (like creating groups, adding files to it,
  696. * creating physical temporary files, etc).
  697. */
  698. void testDFS()
  699. {
  700. const size32_t recsize = 17;
  701. StringBuffer s;
  702. unsigned i;
  703. unsigned n;
  704. unsigned t;
  705. queryNamedGroupStore().remove("daregress_group");
  706. dir.removeEntry("daregress::superfile1", user);
  707. SocketEndpointArray epa;
  708. for (n=0;n<400;n++) {
  709. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  710. SocketEndpoint ep(s.str());
  711. epa.append(ep);
  712. }
  713. Owned<IGroup> group = createIGroup(epa);
  714. queryNamedGroupStore().add("daregress_group",group,true);
  715. ASSERT(queryNamedGroupStore().find(group,s.clear()) && "Created logical group not found");
  716. ASSERT(stricmp(s.str(),"daregress_group")==0 && "Created logical group found with wrong name");
  717. group.setown(queryNamedGroupStore().lookup("daregress_group"));
  718. ASSERT(group && "named group lookup failed");
  719. logctx.CTXLOG("Named group created - 400 nodes");
  720. for (i=0;i<100;i++) {
  721. Owned<IPropertyTree> pp = createPTree("Part");
  722. Owned<IFileDescriptor>fdesc = createFileDescriptor();
  723. fdesc->setDefaultDir("thordata/regress");
  724. n = 9;
  725. for (unsigned k=0;k<400;k++) {
  726. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  727. Owned<INode> node = createINode(s.str());
  728. pp->setPropInt64("@size",(n*777+i)*recsize);
  729. s.clear().append("daregress_test").append(i).append("._").append(n+1).append("_of_400");
  730. fdesc->setPart(n,node,s.str(),pp);
  731. n = (n+9)%400;
  732. }
  733. fdesc->queryProperties().setPropInt("@recordSize",17);
  734. s.clear().append("daregress::test").append(i);
  735. removeLogical(s.str(), user);
  736. StringBuffer cname;
  737. Owned<IDistributedFile> dfile = dir.createNew(fdesc);
  738. ASSERT(stricmp(dfile->getClusterName(0,cname),"daregress_group")==0 && "Cluster name wrong");
  739. s.clear().append("daregress::test").append(i);
  740. dfile->attach(s.str(),user);
  741. }
  742. logctx.CTXLOG("DFile create done - 100 files");
  743. unsigned samples = 5;
  744. t = 33;
  745. for (i=0;i<100;i++) {
  746. s.clear().append("daregress::test").append(t);
  747. ASSERT(dir.exists(s.str(),user) && "Could not find sub-file");
  748. Owned<IDistributedFile> dfile = dir.lookup(s.str(), user);
  749. ASSERT(dfile && "Could not find sub-file");
  750. offset_t totsz = 0;
  751. n = 11;
  752. for (unsigned k=0;k<400;k++) {
  753. Owned<IDistributedFilePart> part = dfile->getPart(n);
  754. ASSERT(part && "part not found");
  755. s.clear().append("192.168.").append(n/256).append('.').append(n%256);
  756. Owned<INode> node = createINode(s.str());
  757. ASSERT(node->equals(part->queryNode()) && "part node mismatch");
  758. ASSERT(part->getFileSize(false,false)==(n*777+t)*recsize && "size node mismatch");
  759. s.clear().append("daregress_test").append(t).append("._").append(n+1).append("_of_400");
  760. /* ** TBD
  761. if (stricmp(s.str(),part->queryPartName())!=0)
  762. ERROR4("part name mismatch %d, %d '%s' '%s'",t,n,s.str(),part->queryPartName());
  763. */
  764. totsz += (n*777+t)*recsize;
  765. if ((samples>0)&&(i+n+t==k)) {
  766. samples--;
  767. RemoteFilename rfn;
  768. part->getFilename(rfn,samples%2);
  769. StringBuffer fn;
  770. rfn.getRemotePath(fn);
  771. logctx.CTXLOG("SAMPLE: %d,%d %s",t,n,fn.str());
  772. }
  773. n = (n+11)%400;
  774. }
  775. ASSERT(totsz==dfile->getFileSize(false,false) && "total size mismatch");
  776. t = (t+33)%100;
  777. }
  778. logctx.CTXLOG("DFile lookup done - 100 files");
  779. // check iteration
  780. __int64 crctot = 0;
  781. unsigned np = 0;
  782. unsigned totrows = 0;
  783. Owned<IDistributedFileIterator> fiter = dir.getIterator("daregress::*",false, user);
  784. Owned<IDistributedFilePartIterator> piter;
  785. ForEach(*fiter) {
  786. piter.setown(fiter->query().getIterator());
  787. ForEach(*piter) {
  788. RemoteFilename rfn;
  789. StringBuffer s;
  790. piter->query().getFilename(rfn,0);
  791. rfn.getRemotePath(s);
  792. piter->query().getFilename(rfn,1);
  793. rfn.getRemotePath(s);
  794. crctot += crc32(s.str(),s.length(),0);
  795. np++;
  796. totrows += (unsigned)(piter->query().getFileSize(false,false)/fiter->query().queryAttributes().getPropInt("@recordSize",-1));
  797. }
  798. }
  799. piter.clear();
  800. fiter.clear();
  801. logctx.CTXLOG("DFile iterate done - %d parts, %d rows, CRC sum %" I64F "d",np,totrows,crctot);
  802. Owned<IDistributedSuperFile> sfile;
  803. sfile.setown(dir.createSuperFile("daregress::superfile1",user,true,false));
  804. for (i = 0;i<100;i++) {
  805. s.clear().append("daregress::test").append(i);
  806. sfile->addSubFile(s.str());
  807. }
  808. sfile.clear();
  809. sfile.setown(dir.lookupSuperFile("daregress::superfile1", user));
  810. ASSERT(sfile && "Could not find added superfile");
  811. __int64 savcrc = crctot;
  812. crctot = 0;
  813. np = 0;
  814. totrows = 0;
  815. size32_t srs = (size32_t)sfile->queryAttributes().getPropInt("@recordSize",-1);
  816. ASSERT(srs==17 && "Superfile does not match subfile row size");
  817. piter.setown(sfile->getIterator());
  818. ForEach(*piter) {
  819. RemoteFilename rfn;
  820. StringBuffer s;
  821. piter->query().getFilename(rfn,0);
  822. rfn.getRemotePath(s);
  823. piter->query().getFilename(rfn,1);
  824. rfn.getRemotePath(s);
  825. crctot += crc32(s.str(),s.length(),0);
  826. np++;
  827. totrows += (unsigned)(piter->query().getFileSize(false,false)/srs);
  828. }
  829. piter.clear();
  830. logctx.CTXLOG("Superfile iterate done - %d parts, %d rows, CRC sum %" I64F "d",np,totrows,crctot);
  831. ASSERT(crctot==savcrc && "SuperFile does not match sub files");
  832. unsigned tr = (unsigned)(sfile->getFileSize(false,false)/srs);
  833. ASSERT(totrows==tr && "Superfile size does not match part sum");
  834. sfile->detach();
  835. sfile.clear();
  836. sfile.setown(dir.lookupSuperFile("daregress::superfile1",user));
  837. ASSERT(!sfile && "Superfile deletion failed");
  838. t = 37;
  839. for (i=0;i<100;i++) {
  840. s.clear().append("daregress::test").append(t);
  841. removeLogical(s.str(), user);
  842. t = (t+37)%100;
  843. }
  844. logctx.CTXLOG("DFile removal complete");
  845. t = 39;
  846. for (i=0;i<100;i++) {
  847. ASSERT(!dir.exists(s.str(),user) && "Found dir after deletion");
  848. Owned<IDistributedFile> dfile = dir.lookup(s.str(), user);
  849. ASSERT(!dfile && "Found file after deletion");
  850. t = (t+39)%100;
  851. }
  852. logctx.CTXLOG("DFile removal check complete");
  853. queryNamedGroupStore().remove("daregress_group");
  854. ASSERT(!queryNamedGroupStore().lookup("daregress_group") && "Named group not removed");
  855. }
  856. void testDFSTrans()
  857. {
  858. setupDFS("trans");
  859. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  860. // Auto-commit
  861. logctx.CTXLOG("Auto-commit test (inactive transaction)");
  862. Owned<IDistributedSuperFile> sfile1 = dir.createSuperFile("regress::trans::super1",user , false, false, transaction);
  863. sfile1->addSubFile("regress::trans::sub1", false, NULL, false, transaction);
  864. sfile1->addSubFile("regress::trans::sub2", false, NULL, false, transaction);
  865. sfile1.clear();
  866. sfile1.setown(dir.lookupSuperFile("regress::trans::super1", user, transaction));
  867. ASSERT(sfile1.get() && "non-transactional add super1 failed");
  868. ASSERT(sfile1->numSubFiles() == 2 && "auto-commit add sub failed, not all subs were added");
  869. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "auto-commit add sub failed, wrong name for sub1");
  870. ASSERT(strcmp(sfile1->querySubFile(1).queryLogicalName(), "regress::trans::sub2") == 0 && "auto-commit add sub failed, wrong name for sub2");
  871. sfile1.clear();
  872. // Rollback
  873. logctx.CTXLOG("Rollback test (active transaction)");
  874. transaction->start();
  875. Owned<IDistributedSuperFile> sfile2 = dir.createSuperFile("regress::trans::super2", user, false, false, transaction);
  876. sfile2->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
  877. sfile2->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
  878. transaction->rollback();
  879. ASSERT(sfile2->numSubFiles() == 0 && "transactional rollback failed, some subs were added");
  880. sfile2.clear();
  881. sfile2.setown(dir.lookupSuperFile("regress::trans::super2", user, transaction));
  882. ASSERT(!sfile2.get() && "transactional rollback super2 failed, it exists!");
  883. // Commit
  884. logctx.CTXLOG("Commit test (active transaction)");
  885. transaction->start();
  886. Owned<IDistributedSuperFile> sfile3 = dir.createSuperFile("regress::trans::super3", user, false, false, transaction);
  887. sfile3->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
  888. sfile3->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
  889. transaction->commit();
  890. sfile3.clear();
  891. sfile3.setown(dir.lookupSuperFile("regress::trans::super3", user, transaction));
  892. ASSERT(sfile3.get() && "transactional add super3 failed");
  893. ASSERT(sfile3->numSubFiles() == 2 && "transactional add sub failed, not all subs were added");
  894. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "transactional add sub failed, wrong name for sub3");
  895. ASSERT(strcmp(sfile3->querySubFile(1).queryLogicalName(), "regress::trans::sub4") == 0 && "transactional add sub failed, wrong name for sub4");
  896. sfile3.clear();
  897. }
  898. void testDFSPromote()
  899. {
  900. setupDFS("trans");
  901. unsigned timeout = 1000; // 1s
  902. /* Make the meta info of one of the subfiles mismatch the rest, as subfiles are promoted through
  903. * the super files, this should _not_ cause an issue, as no single super file will contain
  904. * mismatched subfiles.
  905. */
  906. Owned<IDistributedFile> sub1 = dir.lookup("regress::trans::sub1", user, false, false, false, NULL, timeout);
  907. assertex(sub1);
  908. sub1->lockProperties();
  909. sub1->queryAttributes().setPropBool("@local", true);
  910. sub1->unlockProperties();
  911. sub1.clear();
  912. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  913. // ===============================================================================
  914. // Don't change these parameters, or you'll have to change all ERROR tests below
  915. const char *sfnames[3] = {
  916. "regress::trans::super1", "regress::trans::super2", "regress::trans::super3"
  917. };
  918. bool delsub = false;
  919. bool createonlyone = true;
  920. // ===============================================================================
  921. StringArray outlinked;
  922. logctx.CTXLOG("Promote (1, -, -) - first iteration");
  923. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub1", delsub, createonlyone, user, timeout, outlinked);
  924. {
  925. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  926. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  927. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  928. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  929. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  930. ASSERT(!sfile2.get() && "promote failed, super2 does exist");
  931. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  932. }
  933. logctx.CTXLOG("Promote (2, 1, -) - second iteration");
  934. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub2", delsub, createonlyone, user, timeout, outlinked);
  935. {
  936. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  937. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  938. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  939. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  940. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  941. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  942. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  943. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  944. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  945. ASSERT(!sfile3.get() && "promote failed, super3 does exist");
  946. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  947. }
  948. logctx.CTXLOG("Promote (3, 2, 1) - third iteration");
  949. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub3", delsub, createonlyone, user, timeout, outlinked);
  950. {
  951. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  952. ASSERT(sfile1.get() &&* "promote failed, super1 doesn't exist");
  953. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  954. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  955. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  956. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  957. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  958. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  959. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  960. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  961. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  962. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub1") == 0 && "promote failed, wrong name for sub1");
  963. ASSERT(outlinked.length() == 0 && "promote failed, outlinked expected empty");
  964. }
  965. logctx.CTXLOG("Promote (4, 3, 2) - fourth iteration, expect outlinked");
  966. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub4", delsub, createonlyone, user, timeout, outlinked);
  967. {
  968. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  969. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  970. ASSERT(sfile1->numSubFiles() == 1 && "promote failed, super1 should have one subfile");
  971. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub4") == 0 && "promote failed, wrong name for sub4");
  972. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  973. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  974. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  975. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  976. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  977. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  978. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  979. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub2");
  980. ASSERT(outlinked.length() == 1 && "promote failed, outlinked expected only one item");
  981. ASSERT(strcmp(outlinked.popGet(), "regress::trans::sub1") == 0 && "promote failed, outlinked expected to be sub1");
  982. Owned<IDistributedFile> sub1 = dir.lookup("regress::trans::sub1", user, false, false, false, NULL, timeout);
  983. ASSERT(sub1.get() && "promote failed, sub1 was physically deleted");
  984. }
  985. logctx.CTXLOG("Promote ([2,3], 4, 3) - fifth iteration, two in-files");
  986. dir.promoteSuperFiles(3, sfnames, "regress::trans::sub2,regress::trans::sub3", delsub, createonlyone, user, timeout, outlinked);
  987. {
  988. Owned<IDistributedSuperFile> sfile1 = dir.lookupSuperFile("regress::trans::super1", user, NULL, timeout);
  989. ASSERT(sfile1.get() && "promote failed, super1 doesn't exist");
  990. ASSERT(sfile1->numSubFiles() == 2 && "promote failed, super1 should have two subfiles");
  991. ASSERT(strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub2") == 0 && "promote failed, wrong name for sub1");
  992. ASSERT(strcmp(sfile1->querySubFile(1).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub2");
  993. Owned<IDistributedSuperFile> sfile2 = dir.lookupSuperFile("regress::trans::super2", user, NULL, timeout);
  994. ASSERT(sfile2.get() && "promote failed, super2 doesn't exist");
  995. ASSERT(sfile2->numSubFiles() == 1 && "promote failed, super2 should have one subfile");
  996. ASSERT(strcmp(sfile2->querySubFile(0).queryLogicalName(), "regress::trans::sub4") == 0 && "promote failed, wrong name for sub4");
  997. Owned<IDistributedSuperFile> sfile3 = dir.lookupSuperFile("regress::trans::super3", user, NULL, timeout);
  998. ASSERT(sfile3.get() && "promote failed, super3 doesn't exist");
  999. ASSERT(sfile3->numSubFiles() == 1 && "promote failed, super3 should have one subfile");
  1000. ASSERT(strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub3") == 0 && "promote failed, wrong name for sub3");
  1001. ASSERT(outlinked.length() == 1 && "promote failed, outlinked expected only one item");
  1002. ASSERT(strcmp(outlinked.popGet(), "regress::trans::sub2") == 0 && "promote failed, outlinked expected to be sub2");
  1003. Owned<IDistributedFile> sub1 = dir.lookup("regress::trans::sub1", user, false, false, false, NULL, timeout);
  1004. ASSERT(sub1.get() && "promote failed, sub1 was physically deleted");
  1005. Owned<IDistributedFile> sub2 = dir.lookup("regress::trans::sub2", user, false, false, false, NULL, timeout);
  1006. ASSERT(sub2.get() && "promote failed, sub2 was physically deleted");
  1007. }
  1008. }
  1009. void testSDSSubs()
  1010. {
  1011. subchangenum = 0;
  1012. subchangetotal = 0;
  1013. IArrayOf<CChange> a;
  1014. for (unsigned i=0; i<10 ; i++)
  1015. a.append(*new CChange(i));
  1016. unsigned last = 0;
  1017. loop {
  1018. Sleep(1000);
  1019. {
  1020. CriticalBlock block(subchangesect);
  1021. if (subchangenum==last)
  1022. break;
  1023. last = subchangenum;
  1024. }
  1025. }
  1026. ForEachItemIn(i1, a)
  1027. a.item(i1).stopsem.signal();
  1028. ForEachItemIn(i2, a)
  1029. a.item(i2).join();
  1030. ASSERT(subchangenum==1000 && "Not all notifications received");
  1031. logctx.CTXLOG("%d subscription notifications, check sum = %" I64F "d",subchangenum,subchangetotal);
  1032. }
  1033. class CNodeSubCommitThread : public CInterface, implements IThreaded
  1034. {
  1035. StringAttr xpath;
  1036. bool finalDelete;
  1037. CThreaded threaded;
  1038. public:
  1039. IMPLEMENT_IINTERFACE;
  1040. CNodeSubCommitThread(const char *_xpath, bool _finalDelete) : threaded("CNodeSubCommitThread"), xpath(_xpath), finalDelete(_finalDelete)
  1041. {
  1042. }
  1043. virtual void main()
  1044. {
  1045. unsigned mode = RTM_LOCK_WRITE;
  1046. if (finalDelete)
  1047. mode |= RTM_DELETE_ON_DISCONNECT;
  1048. Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), mode, 1000000);
  1049. assertex(conn);
  1050. for (unsigned i=0; i<5; i++)
  1051. {
  1052. VStringBuffer val("newval%d", i+1);
  1053. conn->queryRoot()->setProp(NULL, val.str());
  1054. conn->commit();
  1055. }
  1056. conn->queryRoot()->setProp("subnode", "newval");
  1057. conn->commit();
  1058. conn.clear(); // if finalDelete=true, deletes subscribed node in process, should get notificaiton
  1059. }
  1060. void start()
  1061. {
  1062. threaded.init(this);
  1063. }
  1064. void join()
  1065. {
  1066. threaded.join();
  1067. }
  1068. };
  1069. class CResults
  1070. {
  1071. StringArray results;
  1072. CRC32 crc;
  1073. public:
  1074. void add(const char *out)
  1075. {
  1076. PROGLOG("%s", out);
  1077. results.append(out);
  1078. }
  1079. unsigned getCRC()
  1080. {
  1081. results.sortAscii();
  1082. ForEachItemIn(r, results)
  1083. {
  1084. const char *result = results.item(r);
  1085. crc.tally(strlen(result), result);
  1086. }
  1087. PROGLOG("CRC = %x", crc.get());
  1088. results.kill();
  1089. return crc.get();
  1090. }
  1091. };
  1092. class CSubscriber : CSimpleInterface, implements ISDSNodeSubscription
  1093. {
  1094. StringAttr path;
  1095. CResults &results;
  1096. unsigned notifications, expectedNotifications;
  1097. Semaphore joinSem;
  1098. public:
  1099. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1100. CSubscriber(const char *_path, CResults &_results, unsigned _expectedNotifications)
  1101. : path(_path), results(_results), expectedNotifications(_expectedNotifications)
  1102. {
  1103. notifications = 0;
  1104. if (0 == expectedNotifications)
  1105. joinSem.signal();
  1106. }
  1107. virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1108. {
  1109. StringAttr value;
  1110. if (valueLen)
  1111. value.set((const char *)valueData, valueLen);
  1112. VStringBuffer res("Subscriber(%s): flags=%d, value=%s", path.get(), flags, 0==valueLen ? "(none)" : value.get());
  1113. results.add(res);
  1114. ++notifications;
  1115. if (notifications == expectedNotifications)
  1116. joinSem.signal();
  1117. }
  1118. void join()
  1119. {
  1120. if (joinSem.wait(5000))
  1121. {
  1122. MilliSleep(100); // wait a bit, see if get more than expected
  1123. if (notifications == expectedNotifications)
  1124. {
  1125. VStringBuffer out("Subscriber(%s): %d notifications received", path.get(), notifications);
  1126. results.add(out);
  1127. return;
  1128. }
  1129. }
  1130. VStringBuffer out("Expected %d notifications, received %d", expectedNotifications, notifications);
  1131. results.add(out);
  1132. }
  1133. };
  1134. void sdsNodeCommit(const char *test, unsigned from, unsigned to, bool finalDelete)
  1135. {
  1136. CIArrayOf<CNodeSubCommitThread> commitThreads;
  1137. for (unsigned i=from; i<=to; i++)
  1138. {
  1139. VStringBuffer path("/DAREGRESS/NodeSubTest/node%d", i);
  1140. CNodeSubCommitThread *commitThread = new CNodeSubCommitThread(path, finalDelete);
  1141. commitThreads.append(* commitThread);
  1142. }
  1143. ForEachItemIn(t, commitThreads)
  1144. commitThreads.item(t).start();
  1145. ForEachItemIn(t2, commitThreads)
  1146. commitThreads.item(t2).join();
  1147. }
  1148. void testSDSNodeSubs()
  1149. {
  1150. // setup
  1151. Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS/NodeSubTest", myProcessSession(), RTM_CREATE, 1000000);
  1152. IPropertyTree *root = conn->queryRoot();
  1153. unsigned i, ai;
  1154. for (i=0; i<10; i++)
  1155. {
  1156. VStringBuffer name("node%d", i+1);
  1157. IPropertyTree *sub = root->setPropTree(name, createPTree());
  1158. for (ai=0; ai<2; ai++)
  1159. {
  1160. VStringBuffer name("@attr%d", i+1);
  1161. VStringBuffer val("val%d", i+1);
  1162. sub->setProp(name, val);
  1163. }
  1164. }
  1165. conn.clear();
  1166. CResults results;
  1167. {
  1168. const char *testPath = "/DAREGRESS/NodeSubTest/doesnotexist";
  1169. Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 0);
  1170. try
  1171. {
  1172. querySDS().subscribeExact(testPath, *subscriber, true);
  1173. throwUnexpected();
  1174. }
  1175. catch(IException *e)
  1176. {
  1177. if (SDSExcpt_SubscriptionNoMatch != e->errorCode())
  1178. throw;
  1179. results.add("Correctly failed to add subscriber to non-existent node.");
  1180. }
  1181. subscriber.clear();
  1182. }
  1183. {
  1184. const char *testPath = "/DAREGRESS/NodeSubTest/node1";
  1185. Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 2*5+1+1);
  1186. SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
  1187. sdsNodeCommit(testPath, 1, 1, false);
  1188. sdsNodeCommit(testPath, 1, 1, true); // will delete 'node1'
  1189. subscriber->join();
  1190. querySDS().unsubscribeExact(id); // will actually be a NOP, as will be already unsubscribed when 'node1' deleted.
  1191. }
  1192. {
  1193. const char *testPath = "/DAREGRESS/NodeSubTest/node*";
  1194. Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 9*6);
  1195. SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
  1196. sdsNodeCommit(testPath, 2, 10, false);
  1197. subscriber->join();
  1198. querySDS().unsubscribeExact(id);
  1199. }
  1200. {
  1201. UInt64Array subscriberIds;
  1202. IArrayOf<CSubscriber> subscribers;
  1203. for (i=2; i<=10; i++) // NB: from 2, as 'node1' deleted in previous tests
  1204. {
  1205. for (ai=0; ai<2; ai++)
  1206. {
  1207. VStringBuffer path("/DAREGRESS/NodeSubTest/node%d[@attr%d=\"val%d\"]", i, i, i);
  1208. Owned<CSubscriber> subscriber = new CSubscriber(path, results, 11);
  1209. SubscriptionId id = querySDS().subscribeExact(path, *subscriber, 0==ai);
  1210. subscribers.append(* subscriber.getClear());
  1211. subscriberIds.append(id);
  1212. }
  1213. }
  1214. const char *testPath = "/DAREGRESS/NodeSubTest/node*";
  1215. Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 9*5+9*(5+1));
  1216. SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
  1217. sdsNodeCommit(testPath, 2, 10, false);
  1218. sdsNodeCommit(testPath, 2, 10, true);
  1219. subscriber->join();
  1220. querySDS().unsubscribeExact(id);
  1221. ForEachItemIn(s, subscriberIds)
  1222. {
  1223. subscribers.item(s).join();
  1224. querySDS().unsubscribeExact(subscriberIds.item(s));
  1225. }
  1226. }
  1227. ASSERT(0xa68e2324 == results.getCRC() && "SDS Node notifcation differences");
  1228. }
  1229. /*
  1230. * This test is silly and can take a very long time on clusters with
  1231. * a large file-system. But keeping it here for further reference.
  1232. * MORE: Maybe, this could be added to daliadmin or a thorough check
  1233. * on the filesystem, together with super-file checks et al.
  1234. void testReadAllSDS()
  1235. {
  1236. logctx.CTXLOG("Test SDS connecting to every branch");
  1237. testReadBranch("/");
  1238. logctx.CTXLOG("Connected to every branch");
  1239. }
  1240. */
  1241. void testMultiCluster()
  1242. {
  1243. Owned<IGroup> grp1 = createIGroup("192.168.51.1-5");
  1244. Owned<IGroup> grp2 = createIGroup("192.168.16.1-5");
  1245. Owned<IGroup> grp3 = createIGroup("192.168.53.1-5");
  1246. queryNamedGroupStore().add("testgrp1",grp1);
  1247. queryNamedGroupStore().add("testgrp2",grp2);
  1248. queryNamedGroupStore().add("testgrp3",grp3);
  1249. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1250. fdesc->setDefaultDir("/c$/thordata/test");
  1251. fdesc->setPartMask("testfile1._$P$_of_$N$");
  1252. fdesc->setNumParts(5);
  1253. ClusterPartDiskMapSpec mapping;
  1254. fdesc->addCluster(grp1,mapping);
  1255. fdesc->addCluster(grp2,mapping);
  1256. fdesc->addCluster(grp3,mapping);
  1257. removeLogical("test::testfile1", user);
  1258. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1259. removeLogical("test::testfile1", user);
  1260. file->attach("test::testfile1",user);
  1261. StringBuffer name;
  1262. unsigned i;
  1263. for (i=0;i<file->numClusters();i++)
  1264. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1265. file.clear();
  1266. file.setown(queryDistributedFileDirectory().lookup("test::testfile1",user));
  1267. for (i=0;i<file->numClusters();i++)
  1268. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1269. file.clear();
  1270. file.setown(queryDistributedFileDirectory().lookup("test::testfile1@testgrp1",user));
  1271. for (i=0;i<file->numClusters();i++)
  1272. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1273. file.clear();
  1274. removeLogical("test::testfile1@testgrp2", user);
  1275. file.setown(queryDistributedFileDirectory().lookup("test::testfile1",user));
  1276. for (i=0;i<file->numClusters();i++)
  1277. PROGLOG("cluster[%d] = %s",i,file->getClusterName(i,name.clear()).str());
  1278. }
  1279. void testFiles()
  1280. {
  1281. StringBuffer fn;
  1282. const char *s = filelist;
  1283. unsigned slowest = 0;
  1284. StringAttr slowname;
  1285. unsigned tot = 0;
  1286. unsigned n = 0;
  1287. while (*s) {
  1288. fn.clear();
  1289. while (*s==',')
  1290. s++;
  1291. while (*s&&(*s!=','))
  1292. fn.append(*(s++));
  1293. if (fn.length()) {
  1294. n++;
  1295. unsigned ss = msTick();
  1296. checkFiles(fn);
  1297. unsigned t = (msTick()-ss);
  1298. if (t>slowest) {
  1299. slowest = t;
  1300. slowname.set(fn);
  1301. }
  1302. tot += t;
  1303. }
  1304. }
  1305. printf("Complete in %ds avg %dms\n",tot/1000,tot/(n?n:1));
  1306. if (!slowname.isEmpty())
  1307. printf("Slowest %s = %dms\n",slowname.get(),slowest);
  1308. }
  1309. void testGroups()
  1310. {
  1311. SocketEndpointArray epa;
  1312. SocketEndpoint ep;
  1313. Owned<IGroup> grp;
  1314. testGrp(epa);
  1315. ep.set("10.150.10.80");
  1316. epa.append(ep);
  1317. testGrp(epa);
  1318. ep.set("10.150.10.81");
  1319. epa.append(ep);
  1320. testGrp(epa);
  1321. ep.set("10.150.10.82");
  1322. epa.append(ep);
  1323. testGrp(epa);
  1324. ep.set("10.150.10.83:111");
  1325. epa.append(ep);
  1326. testGrp(epa);
  1327. ep.set("10.150.10.84:111");
  1328. epa.append(ep);
  1329. testGrp(epa);
  1330. ep.set("10.150.10.84:111");
  1331. epa.append(ep);
  1332. testGrp(epa);
  1333. ep.set("10.150.10.84:111");
  1334. epa.append(ep);
  1335. testGrp(epa);
  1336. ep.set("10.150.10.84:111");
  1337. epa.append(ep);
  1338. testGrp(epa);
  1339. ep.set("10.150.10.85:111");
  1340. epa.append(ep);
  1341. testGrp(epa);
  1342. ep.set("10.150.10.86:111");
  1343. epa.append(ep);
  1344. testGrp(epa);
  1345. ep.set("10.150.10.87");
  1346. epa.append(ep);
  1347. testGrp(epa);
  1348. ep.set("10.150.10.87");
  1349. epa.append(ep);
  1350. testGrp(epa);
  1351. ep.set("10.150.10.88");
  1352. epa.append(ep);
  1353. testGrp(epa);
  1354. ep.set("10.150.11.88");
  1355. epa.append(ep);
  1356. testGrp(epa);
  1357. ep.set("10.173.10.88");
  1358. epa.append(ep);
  1359. testGrp(epa);
  1360. ep.set("10.173.10.88:22222");
  1361. epa.append(ep);
  1362. testGrp(epa);
  1363. ep.set("192.168.10.88");
  1364. epa.append(ep);
  1365. testGrp(epa);
  1366. }
  1367. #ifndef COMPAT
  1368. void testDF1()
  1369. {
  1370. const char * fname = "testing::propfile2";
  1371. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1372. Owned<IPropertyTree> pt = createPTree("Attr");
  1373. RemoteFilename rfn;
  1374. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/part._1_of_3");
  1375. pt->setPropInt("@size",123);
  1376. fdesc->setPart(0,rfn,pt);
  1377. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/part._2_of_3");
  1378. pt->setPropInt("@size",456);
  1379. fdesc->setPart(1,rfn,pt);
  1380. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/part._3_of_3");
  1381. pt->setPropInt("@size",789);
  1382. fdesc->setPart(2,rfn,pt);
  1383. dispFDesc(fdesc);
  1384. try {
  1385. removeLogical(fname, user);
  1386. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1387. {
  1388. DistributedFilePropertyLock lock(file);
  1389. lock.queryAttributes().setProp("@testing","1");
  1390. }
  1391. file->attach(fname,user);
  1392. } catch (IException *e) {
  1393. StringBuffer msg;
  1394. e->errorMessage(msg);
  1395. logctx.CTXLOG("Caught exception while setting property: %s", msg.str());
  1396. e->Release();
  1397. }
  1398. }
  1399. void testDF2() // 4*3 superfile
  1400. {
  1401. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1402. Owned<IPropertyTree> pt = createPTree("Attr");
  1403. RemoteFilename rfn;
  1404. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/partone._1_of_3");
  1405. pt->setPropInt("@size",1231);
  1406. fdesc->setPart(0,rfn,pt);
  1407. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/parttwo._1_of_3");
  1408. pt->setPropInt("@size",1232);
  1409. fdesc->setPart(1,rfn,pt);
  1410. rfn.setRemotePath("//10.150.10.80/c$/thordata/test/partthree._1_of_3");
  1411. pt->setPropInt("@size",1233);
  1412. fdesc->setPart(2,rfn,pt);
  1413. rfn.setRemotePath("//10.150.10.80/c$/thordata/test2/partfour._1_of_3");
  1414. pt->setPropInt("@size",1234);
  1415. fdesc->setPart(3,rfn,pt);
  1416. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/partone._2_of_3");
  1417. pt->setPropInt("@size",4565);
  1418. fdesc->setPart(4,rfn,pt);
  1419. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/parttwo._2_of_3");
  1420. pt->setPropInt("@size",4566);
  1421. fdesc->setPart(5,rfn,pt);
  1422. rfn.setRemotePath("//10.150.10.81/c$/thordata/test/partthree._2_of_3");
  1423. pt->setPropInt("@size",4567);
  1424. fdesc->setPart(6,rfn,pt);
  1425. rfn.setRemotePath("//10.150.10.81/c$/thordata/test2/partfour._2_of_3");
  1426. pt->setPropInt("@size",4568);
  1427. fdesc->setPart(7,rfn,pt);
  1428. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/partone._3_of_3");
  1429. pt->setPropInt("@size",7899);
  1430. fdesc->setPart(8,rfn,pt);
  1431. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/parttwo._3_of_3");
  1432. pt->setPropInt("@size",78910);
  1433. fdesc->setPart(9,rfn,pt);
  1434. rfn.setRemotePath("//10.150.10.82/c$/thordata/test/partthree._3_of_3");
  1435. pt->setPropInt("@size",78911);
  1436. fdesc->setPart(10,rfn,pt);
  1437. rfn.setRemotePath("//10.150.10.82/c$/thordata/test2/partfour._3_of_3");
  1438. pt->setPropInt("@size",78912);
  1439. fdesc->setPart(11,rfn,pt);
  1440. ClusterPartDiskMapSpec mspec;
  1441. mspec.interleave = 4;
  1442. fdesc->endCluster(mspec);
  1443. dispFDesc(fdesc);
  1444. }
  1445. void testMisc()
  1446. {
  1447. ClusterPartDiskMapSpec mspec;
  1448. Owned<IGroup> grp = createIGroup("10.150.10.1-3");
  1449. RemoteFilename rfn;
  1450. for (unsigned i=0;i<3;i++)
  1451. for (unsigned ic=0;ic<mspec.defaultCopies;ic++) {
  1452. constructPartFilename(grp,i+1,3,(i==1)?"test.txt":NULL,"test._$P$_of_$N$","/c$/thordata/test",ic,mspec,rfn);
  1453. StringBuffer tmp;
  1454. printf("%d,%d: %s\n",i,ic,rfn.getRemotePath(tmp).str());
  1455. }
  1456. }
  1457. void testDFile()
  1458. {
  1459. ClusterPartDiskMapSpec map;
  1460. { // 1: single part file old method
  1461. #define TN "1"
  1462. removeLogical("test::ftest" TN, user);
  1463. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1464. RemoteFilename rfn;
  1465. rfn.setRemotePath("//10.150.10.1/c$/thordata/test/ftest" TN "._1_of_1");
  1466. fdesc->setPart(0,rfn);
  1467. fdesc->endCluster(map);
  1468. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1469. file->attach("test::ftest" TN,user);
  1470. #undef TN
  1471. }
  1472. { // 2: single part file new method
  1473. #define TN "2"
  1474. removeLogical("test::ftest" TN, user);
  1475. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1476. fdesc->setPartMask("ftest" TN "._$P$_of_$N$");
  1477. fdesc->setNumParts(1);
  1478. Owned<IGroup> grp = createIGroup("10.150.10.1");
  1479. fdesc->addCluster(grp,map);
  1480. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1481. file->attach("test::ftest" TN,user);
  1482. #undef TN
  1483. }
  1484. Owned<IGroup> grp3 = createIGroup("10.150.10.1,10.150.10.2,10.150.10.3");
  1485. queryNamedGroupStore().add("__testgroup3__",grp3,true);
  1486. { // 3: three parts file old method
  1487. #define TN "3"
  1488. removeLogical("test::ftest" TN, user);
  1489. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1490. RemoteFilename rfn;
  1491. rfn.setRemotePath("//10.150.10.1/c$/thordata/test/ftest" TN "._1_of_3");
  1492. fdesc->setPart(0,rfn);
  1493. rfn.setRemotePath("//10.150.10.2/c$/thordata/test/ftest" TN "._2_of_3");
  1494. fdesc->setPart(1,rfn);
  1495. rfn.setRemotePath("//10.150.10.3/c$/thordata/test/ftest" TN "._3_of_3");
  1496. fdesc->setPart(2,rfn);
  1497. fdesc->endCluster(map);
  1498. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1499. file->attach("test::ftest" TN,user);
  1500. #undef TN
  1501. }
  1502. { // 4: three part file new method
  1503. #define TN "4"
  1504. removeLogical("test::ftest" TN, user);
  1505. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1506. fdesc->setPartMask("ftest" TN "._$P$_of_$N$");
  1507. fdesc->setNumParts(3);
  1508. fdesc->addCluster(grp3,map);
  1509. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc);
  1510. file->attach("test::ftest" TN,user);
  1511. #undef TN
  1512. }
  1513. }
  1514. #endif
  1515. void testDFSDel()
  1516. {
  1517. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1518. setupDFS("del");
  1519. // Sub-file deletion
  1520. logctx.CTXLOG("Creating regress::del::super1 and attaching sub");
  1521. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::del::super1", user, false, false, transaction);
  1522. sfile->addSubFile("regress::del::sub1", false, NULL, false, transaction);
  1523. sfile->addSubFile("regress::del::sub4", false, NULL, false, transaction);
  1524. sfile.clear();
  1525. logctx.CTXLOG("Deleting regress::del::sub1, should fail");
  1526. try {
  1527. if (dir.removeEntry("regress::del::sub1", user, transaction)) {
  1528. ASSERT(0 && "Could remove sub, this will make the DFS inconsistent!");
  1529. return;
  1530. }
  1531. } catch (IException *e) {
  1532. // expecting an exception
  1533. e->Release();
  1534. }
  1535. logctx.CTXLOG("Removing regress::del::sub1 from super1, no del");
  1536. sfile.setown(transaction->lookupSuperFile("regress::del::super1"));
  1537. sfile->removeSubFile("regress::del::sub1", false, false, transaction);
  1538. ASSERT(sfile->numSubFiles() == 1 && "File sub1 was not removed from super1");
  1539. sfile.clear();
  1540. ASSERT(dir.exists("regress::del::sub1", user) && "File sub1 was removed from the file system");
  1541. logctx.CTXLOG("Removing regress::del::sub4 from super1, del");
  1542. sfile.setown(transaction->lookupSuperFile("regress::del::super1"));
  1543. sfile->removeSubFile("regress::del::sub4", true, false, transaction);
  1544. ASSERT(!sfile->numSubFiles() && "File sub4 was not removed from super1");
  1545. sfile.clear();
  1546. ASSERT(!dir.exists("regress::del::sub4", user) && "File sub4 was NOT removed from the file system");
  1547. // Logical Remove
  1548. logctx.CTXLOG("Deleting 'regress::del::super1, should work");
  1549. ASSERT(dir.removeEntry("regress::del::super1", user) && "Can't remove super1");
  1550. logctx.CTXLOG("Deleting 'regress::del::sub1 autoCommit, should work");
  1551. ASSERT(dir.removeEntry("regress::del::sub1", user) && "Can't remove sub1");
  1552. logctx.CTXLOG("Removing 'regress::del::sub2 - rollback");
  1553. transaction->start();
  1554. dir.removeEntry("regress::del::sub2", user, transaction);
  1555. transaction->rollback();
  1556. ASSERT(dir.exists("regress::del::sub2", user, true, false) && "Shouldn't have removed sub2 on rollback");
  1557. logctx.CTXLOG("Removing 'regress::del::sub2 - commit");
  1558. transaction->start();
  1559. dir.removeEntry("regress::del::sub2", user, transaction);
  1560. transaction->commit();
  1561. ASSERT(!dir.exists("regress::del::sub2", user, true, false) && "Should have removed sub2 on commit");
  1562. // Physical Remove
  1563. logctx.CTXLOG("Physically removing 'regress::del::sub3 - rollback");
  1564. transaction->start();
  1565. dir.removeEntry("regress::del::sub3", user, transaction);
  1566. transaction->rollback();
  1567. ASSERT(dir.exists("regress::del::sub3", user, true, false) && "Shouldn't have removed sub3 on rollback");
  1568. logctx.CTXLOG("Physically removing 'regress::del::sub3 - commit");
  1569. transaction->start();
  1570. dir.removeEntry("regress::del::sub3", user, transaction);
  1571. transaction->commit();
  1572. ASSERT(!dir.exists("regress::del::sub3", user, true, false) && "Should have removed sub3 on commit");
  1573. }
  1574. void testDFSRename()
  1575. {
  1576. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1577. if (dir.exists("regress::rename::other1",user,false,false))
  1578. ASSERT(dir.removeEntry("regress::rename::other1", user) && "Can't remove 'regress::rename::other1'");
  1579. if (dir.exists("regress::rename::other2",user,false,false))
  1580. ASSERT(dir.removeEntry("regress::rename::other2", user) && "Can't remove 'regress::rename::other2'");
  1581. setupDFS("rename");
  1582. try {
  1583. logctx.CTXLOG("Renaming 'regress::rename::sub1 to 'sub2' with auto-commit, should fail");
  1584. dir.renamePhysical("regress::rename::sub1", "regress::rename::sub2", user, transaction);
  1585. ASSERT(0 && "Renamed to existing file should have failed!");
  1586. return;
  1587. } catch (IException *e) {
  1588. // Expecting exception
  1589. e->Release();
  1590. }
  1591. logctx.CTXLOG("Renaming 'regress::rename::sub1 to 'other1' with auto-commit");
  1592. dir.renamePhysical("regress::rename::sub1", "regress::rename::other1", user, transaction);
  1593. ASSERT(dir.exists("regress::rename::other1", user, true, false) && "Renamed to other failed");
  1594. logctx.CTXLOG("Renaming 'regress::rename::sub2 to 'other2' and rollback");
  1595. transaction->start();
  1596. dir.renamePhysical("regress::rename::sub2", "regress::rename::other2", user, transaction);
  1597. transaction->rollback();
  1598. ASSERT(!dir.exists("regress::rename::other2", user, true, false) && "Renamed to other2 when it shouldn't");
  1599. logctx.CTXLOG("Renaming 'regress::rename::sub2 to 'other2' and commit");
  1600. transaction->start();
  1601. dir.renamePhysical("regress::rename::sub2", "regress::rename::other2", user, transaction);
  1602. transaction->commit();
  1603. ASSERT(dir.exists("regress::rename::other2", user, true, false) && "Renamed to other failed");
  1604. try {
  1605. logctx.CTXLOG("Renaming 'regress::rename::sub3 to 'sub3' with auto-commit, should fail");
  1606. dir.renamePhysical("regress::rename::sub3", "regress::rename::sub3", user, transaction);
  1607. ASSERT(0 && "Renamed to same file should have failed!");
  1608. return;
  1609. } catch (IException *e) {
  1610. // Expecting exception
  1611. e->Release();
  1612. }
  1613. // To make sure renamed files are cleaned properly
  1614. printf("Renaming 'regress::rename::other2 to 'sub2' on auto-commit\n");
  1615. dir.renamePhysical("regress::rename::other2", "regress::rename::sub2", user, transaction);
  1616. ASSERT(dir.exists("regress::rename::sub2", user, true, false) && "Renamed from other2 failed");
  1617. }
  1618. void testDFSClearAdd()
  1619. {
  1620. setupDFS("clearadd");
  1621. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1622. logctx.CTXLOG("Creating regress::clearadd::super1 and attaching sub1 & sub4");
  1623. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::clearadd::super1", user, false, false, transaction);
  1624. sfile->addSubFile("regress::clearadd::sub1", false, NULL, false, transaction);
  1625. sfile->addSubFile("regress::clearadd::sub4", false, NULL, false, transaction);
  1626. sfile.clear();
  1627. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1628. transaction->start();
  1629. logctx.CTXLOG("Removing sub1 from super1, within transaction");
  1630. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1631. sfile->removeSubFile("regress::clearadd::sub1", false, false, transaction);
  1632. sfile.clear();
  1633. logctx.CTXLOG("Adding sub1 back into to super1, within transaction");
  1634. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1635. sfile->addSubFile("regress::clearadd::sub1", false, NULL, false, transaction);
  1636. sfile.clear();
  1637. try
  1638. {
  1639. transaction->commit();
  1640. }
  1641. catch (IException *e)
  1642. {
  1643. StringBuffer eStr;
  1644. e->errorMessage(eStr);
  1645. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1646. e->Release();
  1647. }
  1648. sfile.setown(dir.lookupSuperFile("regress::clearadd::super1", user));
  1649. ASSERT(NULL != sfile->querySubFileNamed("regress::clearadd::sub1") && "regress::clearadd::sub1, should be a subfile of super1");
  1650. // same but remove all (clear)
  1651. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1652. transaction->start();
  1653. logctx.CTXLOG("Adding sub2 into to super1, within transaction");
  1654. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1655. sfile->addSubFile("regress::clearadd::sub2", false, NULL, false, transaction);
  1656. sfile.clear();
  1657. logctx.CTXLOG("Removing all sub files from super1, within transaction");
  1658. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1659. sfile->removeSubFile(NULL, false, false, transaction);
  1660. sfile.clear();
  1661. logctx.CTXLOG("Adding sub2 back into to super1, within transaction");
  1662. sfile.setown(transaction->lookupSuperFile("regress::clearadd::super1"));
  1663. sfile->addSubFile("regress::clearadd::sub2", false, NULL, false, transaction);
  1664. sfile.clear();
  1665. try
  1666. {
  1667. transaction->commit();
  1668. }
  1669. catch (IException *e)
  1670. {
  1671. StringBuffer eStr;
  1672. e->errorMessage(eStr);
  1673. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1674. e->Release();
  1675. }
  1676. sfile.setown(dir.lookupSuperFile("regress::clearadd::super1", user));
  1677. ASSERT(NULL != sfile->querySubFileNamed("regress::clearadd::sub2") && "regress::clearadd::sub2, should be a subfile of super1");
  1678. ASSERT(NULL == sfile->querySubFileNamed("regress::clearadd::sub1") && "regress::clearadd::sub1, should NOT be a subfile of super1");
  1679. ASSERT(NULL == sfile->querySubFileNamed("regress::clearadd::sub4") && "regress::clearadd::sub4, should NOT be a subfile of super1");
  1680. ASSERT(1 == sfile->numSubFiles() && "regress::clearadd::super1 should contain 1 subfile");
  1681. }
  1682. void testDFSAddFailReAdd()
  1683. {
  1684. setupDFS("addreadd");
  1685. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1686. logctx.CTXLOG("Creating super1 and supet2, adding sub1 and sub2 to super1 and sub3 to super2");
  1687. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::addreadd::super1", user, false, false, transaction);
  1688. sfile->addSubFile("regress::addreadd::sub1", false, NULL, false, transaction);
  1689. sfile->addSubFile("regress::addreadd::sub2", false, NULL, false, transaction);
  1690. sfile.clear();
  1691. Owned<IDistributedSuperFile> sfile2 = dir.createSuperFile("regress::addreadd::super2", user, false, false, transaction);
  1692. sfile2->addSubFile("regress::addreadd::sub3", false, NULL, false, transaction);
  1693. sfile2.clear();
  1694. class CShortLock : implements IThreaded
  1695. {
  1696. StringAttr fileName;
  1697. unsigned secDelay;
  1698. CThreaded threaded;
  1699. public:
  1700. CShortLock(const char *_fileName, unsigned _secDelay) : fileName(_fileName), secDelay(_secDelay), threaded("CShortLock", this) { }
  1701. ~CShortLock()
  1702. {
  1703. threaded.join();
  1704. }
  1705. virtual void main()
  1706. {
  1707. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fileName, NULL);
  1708. if (!file)
  1709. {
  1710. PROGLOG("File %s not found", fileName.get());
  1711. return;
  1712. }
  1713. PROGLOG("Locked file: %s, sleeping (before unlock) for %d secs", fileName.get(), secDelay);
  1714. MilliSleep(secDelay * 1000);
  1715. PROGLOG("Unlocking file: %s", fileName.get());
  1716. }
  1717. void start() { threaded.start(); }
  1718. };
  1719. /* Tests transaction failing, due to lock and retrying after having partial success */
  1720. CShortLock sL("regress::addreadd::sub2", 30); // the 2nd subfile of super1
  1721. sL.start();
  1722. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1723. logctx.CTXLOG("Starting transaction");
  1724. transaction->start();
  1725. logctx.CTXLOG("Adding contents of regress::addreadd::super1 to regress::addreadd::super2, within transaction");
  1726. sfile.setown(transaction->lookupSuperFile("regress::addreadd::super2"));
  1727. sfile->addSubFile("regress::addreadd::super1", false, NULL, true, transaction); // add contents of super1 to super2
  1728. sfile.setown(transaction->lookupSuperFile("regress::addreadd::super1"));
  1729. sfile->removeSubFile(NULL, false, false, transaction); // clears super1
  1730. sfile.clear();
  1731. try
  1732. {
  1733. transaction->commit();
  1734. }
  1735. catch (IException *e)
  1736. {
  1737. StringBuffer eStr;
  1738. e->errorMessage(eStr);
  1739. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1740. e->Release();
  1741. }
  1742. transaction.clear();
  1743. sfile.setown(dir.lookupSuperFile("regress::addreadd::super2", user));
  1744. ASSERT(3 == sfile->numSubFiles() && "regress::addreadd::super2 should contain 3 subfiles");
  1745. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub1") && "regress::addreadd::sub1, should be a subfile of super2");
  1746. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub2") && "regress::addreadd::sub2, should be a subfile of super2");
  1747. ASSERT(NULL != sfile->querySubFileNamed("regress::addreadd::sub3") && "regress::addreadd::sub3, should be a subfile of super2");
  1748. sfile.setown(dir.lookupSuperFile("regress::addreadd::super1", user));
  1749. ASSERT(0 == sfile->numSubFiles() && "regress::addreadd::super1 should contain 0 subfiles");
  1750. }
  1751. void testDFSRetrySuperLock()
  1752. {
  1753. setupDFS("retrysuperlock");
  1754. logctx.CTXLOG("Creating regress::retrysuperlock::super1 and regress::retrysuperlock::sub1");
  1755. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::retrysuperlock::super1", user, false, false);
  1756. sfile->addSubFile("regress::retrysuperlock::sub1", false, NULL, false);
  1757. sfile.clear();
  1758. class CShortLock : implements IThreaded
  1759. {
  1760. StringAttr fileName;
  1761. unsigned secDelay;
  1762. CThreaded threaded;
  1763. public:
  1764. CShortLock(const char *_fileName, unsigned _secDelay) : fileName(_fileName), secDelay(_secDelay), threaded("CShortLock", this) { }
  1765. ~CShortLock()
  1766. {
  1767. threaded.join();
  1768. }
  1769. virtual void main()
  1770. {
  1771. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(fileName, NULL);
  1772. if (!file)
  1773. {
  1774. PROGLOG("File %s not found", fileName.get());
  1775. return;
  1776. }
  1777. PROGLOG("Locked file: %s, sleeping (before unlock) for %d secs", fileName.get(), secDelay);
  1778. MilliSleep(secDelay * 1000);
  1779. PROGLOG("Unlocking file: %s", fileName.get());
  1780. }
  1781. void start() { threaded.start(); }
  1782. };
  1783. /* Tests transaction failing, due to lock and retrying after having partial success */
  1784. CShortLock sL("regress::retrysuperlock::super1", 15);
  1785. sL.start();
  1786. sfile.setown(dir.lookupSuperFile("regress::retrysuperlock::super1", user));
  1787. if (sfile)
  1788. {
  1789. logctx.CTXLOG("Removing subfiles from regress::retrysuperlock::super1");
  1790. sfile->removeSubFile(NULL, false, false);
  1791. logctx.CTXLOG("SUCCEEDED");
  1792. }
  1793. // put it back, for next test
  1794. sfile->addSubFile("regress::retrysuperlock::sub1", false, NULL, false);
  1795. sfile.clear();
  1796. // try again, this time in transaction
  1797. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1798. logctx.CTXLOG("Starting transaction");
  1799. transaction->start();
  1800. sfile.setown(transaction->lookupSuperFile("regress::retrysuperlock::super1"));
  1801. if (sfile)
  1802. {
  1803. logctx.CTXLOG("Removing subfiles from regress::retrysuperlock::super1 with transaction");
  1804. sfile->removeSubFile(NULL, false, false, transaction);
  1805. logctx.CTXLOG("SUCCEEDED");
  1806. }
  1807. sfile.clear();
  1808. logctx.CTXLOG("Committing transaction");
  1809. transaction->commit();
  1810. }
  1811. void testDFSRename2()
  1812. {
  1813. setupDFS("rename2");
  1814. /* Create a super and sub1 and sub4 in a auto-commit transaction
  1815. * Inside a transaction, do:
  1816. * a) rename sub2 to renamedsub2
  1817. * b) remove sub1
  1818. * c) add sub1
  1819. * d) add renamedsub2
  1820. * e) commit transaction
  1821. * f) renamed files existing and superfile contents
  1822. */
  1823. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1824. logctx.CTXLOG("Creating regress::rename2::super1 and attaching sub1 & sub4");
  1825. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::rename2::super1", user, false, false, transaction);
  1826. sfile->addSubFile("regress::rename2::sub1", false, NULL, false, transaction);
  1827. sfile->addSubFile("regress::rename2::sub4", false, NULL, false, transaction);
  1828. sfile.clear();
  1829. if (dir.exists("regress::rename2::renamedsub2",user,false,false))
  1830. ASSERT(dir.removeEntry("regress::rename2::renamedsub2", user) && "Can't remove 'regress::rename2::renamedsub2'");
  1831. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1832. logctx.CTXLOG("Starting transaction");
  1833. transaction->start();
  1834. logctx.CTXLOG("Renaming regress::rename2::sub2 TO regress::rename2::renamedsub2");
  1835. dir.renamePhysical("regress::rename2::sub2", "regress::rename2::renamedsub2", user, transaction);
  1836. logctx.CTXLOG("Removing regress::rename2::sub1 from regress::rename2::super1");
  1837. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1838. sfile->removeSubFile("regress::rename2::sub1", false, false, transaction);
  1839. sfile.clear();
  1840. logctx.CTXLOG("Adding renamedsub2 to super1");
  1841. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1842. sfile->addSubFile("regress::rename2::renamedsub2", false, NULL, false, transaction);
  1843. sfile.clear();
  1844. logctx.CTXLOG("Adding back sub1 to super1");
  1845. sfile.setown(transaction->lookupSuperFile("regress::rename2::super1"));
  1846. sfile->addSubFile("regress::rename2::sub1", false, NULL, false, transaction);
  1847. sfile.clear();
  1848. try
  1849. {
  1850. logctx.CTXLOG("Committing transaction");
  1851. transaction->commit();
  1852. }
  1853. catch (IException *e)
  1854. {
  1855. StringBuffer eStr;
  1856. e->errorMessage(eStr);
  1857. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1858. e->Release();
  1859. }
  1860. transaction.clear();
  1861. // validate..
  1862. ASSERT(dir.exists("regress::rename2::renamedsub2", user, true, false) && "regress::rename2::renamedsub2 should exist now transaction committed");
  1863. sfile.setown(dir.lookupSuperFile("regress::rename2::super1", user));
  1864. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::renamedsub2") && "regress::rename2::renamedsub2, should be a subfile of super1");
  1865. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::sub1") && "regress::rename2::sub1, should be a subfile of super1");
  1866. ASSERT(NULL == sfile->querySubFileNamed("regress::rename2::sub2") && "regress::rename2::sub2, should NOT be a subfile of super1");
  1867. ASSERT(NULL != sfile->querySubFileNamed("regress::rename2::sub4") && "regress::rename2::sub4, should be a subfile of super1");
  1868. ASSERT(3 == sfile->numSubFiles() && "regress::rename2::super1 should contain 4 subfiles");
  1869. }
  1870. void testDFSRenameThenDelete()
  1871. {
  1872. setupDFS("renamedelete");
  1873. if (dir.exists("regress::renamedelete::renamedsub2",user,false,false))
  1874. ASSERT(dir.removeEntry("regress::renamedelete::renamedsub2", user) && "Can't remove 'regress::renamedelete::renamedsub2'");
  1875. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1876. logctx.CTXLOG("Starting transaction");
  1877. transaction->start();
  1878. logctx.CTXLOG("Renaming regress::renamedelete::sub2 TO regress::renamedelete::renamedsub2");
  1879. dir.renamePhysical("regress::renamedelete::sub2", "regress::renamedelete::renamedsub2", user, transaction);
  1880. logctx.CTXLOG("Removing regress::renamedelete::renamedsub2");
  1881. ASSERT(dir.removeEntry("regress::renamedelete::renamedsub2", user, transaction) && "Can't remove 'regress::rename2::renamedsub2'");
  1882. try
  1883. {
  1884. logctx.CTXLOG("Committing transaction");
  1885. transaction->commit();
  1886. }
  1887. catch (IException *e)
  1888. {
  1889. StringBuffer eStr;
  1890. e->errorMessage(eStr);
  1891. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1892. e->Release();
  1893. }
  1894. transaction.clear();
  1895. // validate..
  1896. ASSERT(!dir.exists("regress::renamedelete::sub2", user, true, false) && "regress::renamedelete::sub2 should NOT exist now transaction has been committed");
  1897. ASSERT(!dir.exists("regress::renamedelete::renamedsub2", user, true, false) && "regress::renamedelete::renamedsub2 should NOT exist now transaction has been committed");
  1898. }
  1899. // NB: This test requires access (via dafilesrv) to an external IP (10.239.222.21 used below, but could be any)
  1900. void testDFSRename3()
  1901. {
  1902. setupDFS("rename3");
  1903. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
  1904. if (dir.exists("regress::tenwayfile",user))
  1905. ASSERT(dir.removeEntry("regress::tenwayfile", user) && "Can't remove");
  1906. Owned<IFileDescriptor> fdesc = createDescriptor("regress", "tenwayfile", 1, 17);
  1907. Owned<IGroup> grp1 = createIGroup("10.239.222.1");
  1908. ClusterPartDiskMapSpec mapping;
  1909. fdesc->setClusterGroup(0, grp1);
  1910. Linked<IPartDescriptor> part = fdesc->getPart(0);
  1911. RemoteFilename rfn;
  1912. part->getFilename(0, rfn);
  1913. StringBuffer path;
  1914. rfn.getPath(path);
  1915. recursiveCreateDirectoryForFile(path.str());
  1916. OwnedIFile ifile = createIFile(path.str());
  1917. Owned<IFileIO> io;
  1918. io.setown(ifile->open(IFOcreate));
  1919. io->write(0, 17, "12345678901234567");
  1920. io->close();
  1921. Owned<IDistributedFile> dsub = dir.createNew(fdesc);
  1922. dsub->attach("regress::tenwayfile", user);
  1923. dsub.clear();
  1924. fdesc.clear();
  1925. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1926. transaction->start();
  1927. logctx.CTXLOG("Renaming regress::rename3::sub2 TO regress::tenwayfile@mythor");
  1928. dir.renamePhysical("regress::rename3::sub2", "regress::tenwayfile@mythor", user, transaction);
  1929. try
  1930. {
  1931. transaction->commit();
  1932. }
  1933. catch (IException *e)
  1934. {
  1935. StringBuffer eStr;
  1936. e->errorMessage(eStr);
  1937. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1938. e->Release();
  1939. }
  1940. transaction.setown(createDistributedFileTransaction(user));
  1941. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1942. transaction->start();
  1943. logctx.CTXLOG("Renaming regress::tenwayfile TO regress::rename3::sub2");
  1944. dir.renamePhysical("regress::tenwayfile@mythor", "regress::rename3::sub2", user, transaction);
  1945. try
  1946. {
  1947. transaction->commit();
  1948. }
  1949. catch (IException *e)
  1950. {
  1951. StringBuffer eStr;
  1952. e->errorMessage(eStr);
  1953. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1954. e->Release();
  1955. }
  1956. }
  1957. void testDFSRemoveSuperSub()
  1958. {
  1959. setupDFS("removesupersub");
  1960. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user);
  1961. logctx.CTXLOG("Creating regress::removesupersub::super1 and attaching sub1 & sub4");
  1962. Owned<IDistributedSuperFile> sfile = dir.createSuperFile("regress::removesupersub::super1", user, false, false, transaction);
  1963. sfile->addSubFile("regress::removesupersub::sub1", false, NULL, false, transaction);
  1964. sfile->addSubFile("regress::removesupersub::sub4", false, NULL, false, transaction);
  1965. sfile.clear();
  1966. transaction.setown(createDistributedFileTransaction(user)); // disabled, auto-commit
  1967. logctx.CTXLOG("Starting transaction");
  1968. transaction->start();
  1969. logctx.CTXLOG("Removing super removesupersub::super1 along with it's subfiles sub1 and sub4");
  1970. dir.removeSuperFile("regress::removesupersub::super1", true, user, transaction);
  1971. try
  1972. {
  1973. logctx.CTXLOG("Committing transaction");
  1974. transaction->commit();
  1975. }
  1976. catch (IException *e)
  1977. {
  1978. StringBuffer eStr;
  1979. e->errorMessage(eStr);
  1980. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  1981. e->Release();
  1982. }
  1983. transaction.clear();
  1984. // validate..
  1985. ASSERT(!dir.exists("regress::removesupersub::super1", user, true, false) && "regress::removesupersub::super1 should NOT exist");
  1986. ASSERT(!dir.exists("regress::removesupersub::sub1", user, true, false) && "regress::removesupersub::sub1 should NOT exist");
  1987. ASSERT(!dir.exists("regress::removesupersub::sub4", user, true, false) && "regress::removesupersub::sub4 should NOT exist");
  1988. }
  1989. void testDFSHammer()
  1990. {
  1991. unsigned numFiles = 100;
  1992. unsigned numReads = 40000;
  1993. unsigned hammerThreads = 10;
  1994. setupDFS("hammer", 0, numFiles);
  1995. StringBuffer msg("Reading ");
  1996. msg.append(numFiles).append(" files").append(numReads).append(" times, on ").append(hammerThreads).append(" threads");
  1997. logctx.CTXLOG("%s", msg.str());
  1998. class CHammerFactory : public CSimpleInterface, implements IThreadFactory
  1999. {
  2000. public:
  2001. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2002. virtual IPooledThread *createNew()
  2003. {
  2004. class CHammerThread : public CSimpleInterface, implements IPooledThread
  2005. {
  2006. StringAttr filename;
  2007. public:
  2008. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2009. virtual void init(void *param)
  2010. {
  2011. filename.set((const char *)param);
  2012. }
  2013. virtual void main()
  2014. {
  2015. try
  2016. {
  2017. Owned<IPropertyTree> tree = queryDistributedFileDirectory().getFileTree(filename,user);
  2018. }
  2019. catch (IException *e)
  2020. {
  2021. PrintExceptionLog(e, NULL);
  2022. }
  2023. }
  2024. virtual bool stop() { return true; }
  2025. virtual bool canReuse() { return true; }
  2026. };
  2027. return new CHammerThread();
  2028. }
  2029. } poolFactory;
  2030. CTimeMon tm;
  2031. Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, hammerThreads, 2000);
  2032. while (numReads--)
  2033. {
  2034. StringBuffer filename("regress::hammer::sub");
  2035. unsigned fn = 1+(getRandom()%numFiles);
  2036. filename.append(fn);
  2037. PROGLOG("Hammer file: %s", filename.str());
  2038. pool->start((void *)filename.str());
  2039. }
  2040. pool->joinAll();
  2041. PROGLOG("Hammer test took: %d ms", tm.elapsed());
  2042. }
  2043. void testSDSSubs2()
  2044. {
  2045. class CSubscriber : public CSimpleInterfaceOf<ISDSSubscription>
  2046. {
  2047. StringAttr xpath;
  2048. bool sub;
  2049. StringBuffer &result;
  2050. SubscriptionId id;
  2051. public:
  2052. CSubscriber(StringBuffer &_result, const char *_xpath, bool _sub) : result(_result), xpath(_xpath), sub(_sub)
  2053. {
  2054. id = querySDS().subscribe(xpath, *this, sub, !sub);
  2055. PROGLOG("Subscribed to %s", xpath.get());
  2056. }
  2057. ~CSubscriber()
  2058. {
  2059. querySDS().unsubscribe(id);
  2060. }
  2061. virtual void notify(SubscriptionId id, const char *_xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2062. {
  2063. PROGLOG("CSubscriber notified path=%s for subscriber=%s, sub=%s", _xpath, xpath.get(), sub?"true":"false");
  2064. if (result.length())
  2065. result.append("|");
  2066. result.append(xpath);
  2067. if (!sub && valueLen)
  2068. result.append(",").append(valueLen, (const char *)valueData);
  2069. }
  2070. };
  2071. Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS/TestSub2", myProcessSession(), RTM_CREATE, INFINITE);
  2072. Owned<IPropertyTree> tree = createPTreeFromXMLString("<a><b1><c/></b1><b2/><b3><d><e/></d></b3></a>");
  2073. IPropertyTree *root = conn->queryRoot();
  2074. root->setPropTree("a", tree.getClear());
  2075. conn->commit();
  2076. StringBuffer result;
  2077. Owned<ISDSSubscription> sub1 = new CSubscriber(result, "/DAREGRESS/TestSub2/a", true);
  2078. Owned<ISDSSubscription> sub2 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b1", false);
  2079. Owned<ISDSSubscription> sub3 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b2", false);
  2080. Owned<ISDSSubscription> sub4 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b1/c", false);
  2081. Owned<ISDSSubscription> sub5 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b3", true);
  2082. MilliSleep(1000);
  2083. StringArray expectedResults;
  2084. expectedResults.append("/DAREGRESS/TestSub2/a");
  2085. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1,testv");
  2086. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b2,testv");
  2087. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1/c,testv");
  2088. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1,testv");
  2089. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b2,testv");
  2090. expectedResults.append("/DAREGRESS/TestSub2/a");
  2091. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b3");
  2092. expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b3");
  2093. StringArray props;
  2094. props.appendList("S:a,S:a/b1,S:a/b2,S:a/b1/c,S:a/b1/d,S:a/b2/e,S:a/b2/e/f,D:a/b3/d/e,D:a/b3/d", ",");
  2095. assertex(expectedResults.ordinality() == props.ordinality());
  2096. ForEachItemIn(p, props)
  2097. {
  2098. result.clear(); // filled by subscriber notifications
  2099. const char *cmd = props.item(p);
  2100. const char *propPath=cmd+2;
  2101. switch (*cmd)
  2102. {
  2103. case 'S':
  2104. {
  2105. PROGLOG("Changing %s", propPath);
  2106. root->setProp(propPath, "testv");
  2107. break;
  2108. }
  2109. case 'D':
  2110. {
  2111. PROGLOG("Deleting %s", propPath);
  2112. root->removeProp(propPath);
  2113. break;
  2114. }
  2115. default:
  2116. throwUnexpected();
  2117. }
  2118. conn->commit();
  2119. MilliSleep(100); // time for notifications to come through
  2120. PROGLOG("Checking results");
  2121. StringArray resultArray;
  2122. resultArray.appendList(result, "|");
  2123. result.clear();
  2124. resultArray.sortAscii();
  2125. ForEachItemIn(r, resultArray)
  2126. {
  2127. if (result.length())
  2128. result.append("|");
  2129. result.append(resultArray.item(r));
  2130. }
  2131. const char *expectedResult = expectedResults.item(p);
  2132. if (0 == strcmp(expectedResult, result))
  2133. PROGLOG("testSDSSubs2 [ %s ]: MATCH", cmd);
  2134. else
  2135. {
  2136. VStringBuffer errMsg("testSDSSubs2 [ %s ]: MISMATCH", cmd);
  2137. errMsg.newline().append("Expected: ").append(expectedResult);
  2138. errMsg.newline().append("Got: ").append(result);
  2139. PROGLOG("%s", errMsg.str());
  2140. CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
  2141. }
  2142. }
  2143. }
  2144. };
  2145. CPPUNIT_TEST_SUITE_REGISTRATION( CDaliTestsStress );
  2146. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CDaliTestsStress, "Dali" );
  2147. class CDaliUtils : public CppUnit::TestFixture
  2148. {
  2149. CPPUNIT_TEST_SUITE(CDaliUtils);
  2150. CPPUNIT_TEST(testDFSLfn);
  2151. CPPUNIT_TEST_SUITE_END();
  2152. public:
  2153. void testDFSLfn()
  2154. {
  2155. const char *lfns[] = { "~foreign::192.168.16.1::scope1::file1",
  2156. "~file::192.168.16.1::dir1::file1",
  2157. "~file::192.168.16.1::>some query or another",
  2158. "~file::192.168.16.1::wild?card1",
  2159. "~file::192.168.16.1::wild*card2",
  2160. "~file::192.168.16.1::^C^a^S^e^d",
  2161. "~file::192.168.16.1::file@cluster1",
  2162. "~prefix::{multi1*,multi2*}",
  2163. "{~foreign::192.168.16.1::multi1, ~foreign::192.168.16.2::multi2}",
  2164. // NB: CDfsLogicalFileName allows these with strict=false (the default)
  2165. ". :: scope1 :: file",
  2166. ":: scope1 :: file",
  2167. "~ scope1 :: scope2 :: file ",
  2168. ". :: scope1 :: file nine",
  2169. ". :: scope1 :: file ten ",
  2170. ". :: scope1 :: file",
  2171. NULL // terminator
  2172. };
  2173. const char *invalidLfns[] = {
  2174. ". :: sc~ope1::file",
  2175. ". :: ::file",
  2176. "~~scope1::file",
  2177. "~sc~ope1::file2",
  2178. ".:: scope1::file*",
  2179. NULL // terminator
  2180. };
  2181. PROGLOG("Checking valid logical filenames");
  2182. unsigned nlfn=0;
  2183. loop
  2184. {
  2185. const char *lfn = lfns[nlfn++];
  2186. if (NULL == lfn)
  2187. break;
  2188. PROGLOG("lfn = %s", lfn);
  2189. CDfsLogicalFileName dlfn;
  2190. try
  2191. {
  2192. dlfn.set(lfn);
  2193. }
  2194. catch (IException *e)
  2195. {
  2196. VStringBuffer err("Logical filename '%s' failed.", lfn);
  2197. EXCLOG(e, err.str());
  2198. CPPUNIT_FAIL(err.str());
  2199. e->Release();
  2200. }
  2201. }
  2202. PROGLOG("Checking invalid logical filenames");
  2203. nlfn = 0;
  2204. loop
  2205. {
  2206. const char *lfn = invalidLfns[nlfn++];
  2207. if (NULL == lfn)
  2208. break;
  2209. PROGLOG("lfn = %s", lfn);
  2210. CDfsLogicalFileName dlfn;
  2211. try
  2212. {
  2213. dlfn.set(lfn);
  2214. VStringBuffer err("Logical filename '%s' passed and should have failed.", lfn);
  2215. ERRLOG("%s", err.str());
  2216. CPPUNIT_FAIL(err.str());
  2217. }
  2218. catch (IException *e)
  2219. {
  2220. EXCLOG(e, "Expected error:");
  2221. e->Release();
  2222. }
  2223. }
  2224. }
  2225. };
  2226. CPPUNIT_TEST_SUITE_REGISTRATION( CDaliUtils );
  2227. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CDaliUtils, "DaliUtils" );
  2228. #endif // _USE_CPPUNIT