datest.cpp 112 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <string>
  14. #include <array>
  15. #include "platform.h"
  16. #include "jlib.hpp"
  17. #include "jexcept.hpp"
  18. #include "jmisc.hpp"
  19. #include "mpbase.hpp"
  20. #include "mpcomm.hpp"
  21. #include "daclient.hpp"
  22. #include "dadfs.hpp"
  23. #include "dafdesc.hpp"
  24. #include "dasds.hpp"
  25. #include "danqs.hpp"
  26. #include "dautils.hpp"
  27. #include "dasess.hpp"
  28. #include "mplog.hpp"
  29. #include "rmtclient.hpp"
  30. #include "rtlformat.hpp"
  31. #include "jptree.hpp"
  32. #include "wsdfuaccess.hpp"
  33. using namespace wsdfuaccess;
  34. using namespace dafsstream;
  35. #define DEFAULT_TEST "RANDTEST"
  36. static const char *whichTest = DEFAULT_TEST;
  37. static StringArray testParams;
  38. static unsigned nIter = 1;
  39. #define TEST(X) (0==stricmp(whichTest, X))
  40. //#define TEST_REMOTEFILE
  41. //#define TEST_REMOTEFILE2
  42. //#define TEST_REMOTEFILE3
  43. //#define TEST_COPYFILE
  44. //#define TEST_DEADLOCK
  45. //#define TEST_THREADS
  46. #define MDELAY 100
  47. static void addTestFile(const char *name,unsigned n)
  48. {
  49. queryDistributedFileDirectory().removeEntry(name,UNKNOWN_USER);
  50. SocketEndpointArray epa;
  51. for (unsigned i=0;i<n;i++) {
  52. StringBuffer ips("192.168.0.");
  53. ips.append((byte)i+1);
  54. SocketEndpoint ep(ips.str());
  55. epa.append(ep);
  56. }
  57. Owned<IGroup> group = createIGroup(epa);
  58. Owned<IPropertyTree> fileInfo = createPTree();
  59. Owned<IFileDescriptor> fileDesc = createFileDescriptor();
  60. StringBuffer dir;
  61. getLFNDirectoryUsingDefaultBaseDir(dir, name, DFD_OSdefault);
  62. StringBuffer partmask;
  63. getPartMask(partmask,name,n);
  64. StringBuffer path;
  65. for (unsigned m=0; m<n; m++) {
  66. RemoteFilename rfn;
  67. constructPartFilename(group,m+1,n,NULL,partmask.str(),dir.str(),false,1,rfn);
  68. rfn.getLocalPath(path.clear());
  69. Owned<IPropertyTree> pp = createPTree("Part");
  70. pp->setPropInt64("@size",1234*(m+1));
  71. fileDesc->setPart(m,&group->queryNode(m), path.str(), pp);
  72. }
  73. Owned<IDistributedFile> dfile = queryDistributedFileDirectory().createNew(fileDesc);
  74. {
  75. DistributedFilePropertyLock lock(dfile);
  76. IPropertyTree &t = lock.queryAttributes();
  77. t.setProp("@owned","nigel");
  78. t.setPropInt("@recordSize",1);
  79. t.setProp("ECL","TESTECL();");
  80. }
  81. dfile->attach(name,UNKNOWN_USER);
  82. }
  83. #define TEST_SUPER_FILE "nhtest::super"
  84. #define TEST_SUB_FILE "nhtest::sub"
  85. void Test_SuperFile()
  86. {
  87. // create subfile
  88. // first remove if exists
  89. unsigned i;
  90. unsigned n;
  91. Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(UNKNOWN_USER);
  92. Owned<IDistributedSuperFile> sfile;
  93. Owned<IDistributedFilePartIterator> piter;
  94. Owned<IDistributedFileIterator> iter;
  95. queryDistributedFileDirectory().removeEntry(TEST_SUPER_FILE"4",UNKNOWN_USER);
  96. queryDistributedFileDirectory().removeEntry(TEST_SUPER_FILE"3",UNKNOWN_USER);
  97. queryDistributedFileDirectory().removeEntry(TEST_SUPER_FILE"2",UNKNOWN_USER);
  98. #if 1
  99. sfile.setown(queryDistributedFileDirectory().lookupSuperFile(TEST_SUPER_FILE"1",UNKNOWN_USER));
  100. if (sfile) {
  101. sfile->removeSubFile(NULL,true);
  102. sfile.clear();
  103. queryDistributedFileDirectory().removeEntry(TEST_SUPER_FILE"1",UNKNOWN_USER);
  104. }
  105. sfile.setown(queryDistributedFileDirectory().createSuperFile(TEST_SUPER_FILE"1",UNKNOWN_USER,true));
  106. for (i = 0;i<3;i++) {
  107. StringBuffer name(TEST_SUB_FILE);
  108. name.append(i+1);
  109. addTestFile(name.str(),i+2);
  110. sfile->addSubFile(name);
  111. }
  112. sfile.clear();
  113. #endif
  114. sfile.setown(queryDistributedFileDirectory().lookupSuperFile(TEST_SUPER_FILE"1",UNKNOWN_USER));
  115. printf("NumSubFiles = %d\n",sfile->numSubFiles());
  116. #if 1
  117. i=0;
  118. iter.setown(sfile->getSubFileIterator());
  119. ForEach(*iter) {
  120. StringBuffer name;
  121. iter->getName(name);
  122. printf(" %d: %s\n",i+1,name.str());
  123. IDistributedFile *f = &iter->query();
  124. assertex(stricmp(f->queryLogicalName(),name.str())==0);
  125. assertex(&sfile->querySubFile(i)==f);
  126. assertex(sfile->querySubFileNamed(name.str())==f);
  127. i++;
  128. }
  129. iter.clear();
  130. #endif
  131. piter.setown(sfile->getIterator());
  132. n = sfile->numParts();
  133. printf("NumSubParts = %d\n",n);
  134. i = 0;
  135. ForEach(*piter) {
  136. IDistributedFilePart *part = &piter->query();
  137. unsigned subp;
  138. IDistributedFile *subf = sfile->querySubPart(i,subp);
  139. assertex(subf);
  140. const char* lname = subf->queryLogicalName();
  141. StringBuffer pname;
  142. part->getPartName(pname);
  143. for (unsigned c=0;c<part->numCopies();c++) {
  144. RemoteFilename rfn;
  145. StringBuffer tmp;
  146. printf(" %d: %s[%d,%d] %s %s\n",i,lname,subp,c,pname.str(),part->getFilename(rfn,c).getRemotePath(tmp).str());
  147. }
  148. i++;
  149. }
  150. piter.clear();
  151. sfile.setown(queryDistributedFileDirectory().createSuperFile(TEST_SUPER_FILE"2",UNKNOWN_USER,true));
  152. transaction->start();
  153. for (i = 0;i<3;i++) {
  154. StringBuffer name(TEST_SUB_FILE);
  155. name.append(i+1);
  156. sfile->addSubFile(name,false,NULL,false,transaction);
  157. }
  158. sfile.clear(); // mustn't have owner open when commit transaction
  159. transaction->commit();
  160. sfile.setown(queryDistributedFileDirectory().createSuperFile(TEST_SUPER_FILE"3",UNKNOWN_USER,true));
  161. for (i = 0;i<3;i++) {
  162. StringBuffer name(TEST_SUB_FILE);
  163. name.append(i+1);
  164. sfile->addSubFile(name,false,NULL,false,transaction);
  165. }
  166. sfile.clear(); // mustn't have owner open when commit transaction
  167. transaction->rollback();
  168. sfile.setown(queryDistributedFileDirectory().lookupSuperFile(TEST_SUPER_FILE"2",UNKNOWN_USER));
  169. transaction->start();
  170. sfile->removeSubFile(TEST_SUB_FILE"1",false,false,transaction);
  171. StringBuffer name(TEST_SUB_FILE"4");
  172. addTestFile(name.str(),3);
  173. sfile->addSubFile(name,false,NULL,false,transaction);
  174. sfile.clear(); // mustn't have owner open when commit transaction
  175. transaction->commit();
  176. sfile.setown(queryDistributedFileDirectory().createSuperFile(TEST_SUPER_FILE"4",UNKNOWN_USER,true));
  177. transaction->start();
  178. sfile->addSubFile(TEST_SUPER_FILE"1",false,NULL,false,transaction);
  179. sfile->addSubFile(TEST_SUPER_FILE"2",false,NULL,false,transaction);
  180. sfile->addSubFile(TEST_SUPER_FILE"3",false,NULL,false,transaction);
  181. sfile.clear(); // mustn't have owner open when commit transaction
  182. transaction->commit();
  183. sfile.setown(queryDistributedFileDirectory().lookupSuperFile(TEST_SUPER_FILE"4",UNKNOWN_USER));
  184. i=0;
  185. iter.setown(sfile->getSubFileIterator());
  186. ForEach(*iter) {
  187. StringBuffer name;
  188. iter->getName(name);
  189. printf(" %d: %s\n",i+1,name.str());
  190. IDistributedFile *f = &iter->query();
  191. assertex(stricmp(f->queryLogicalName(),name.str())==0);
  192. assertex(&sfile->querySubFile(i)==f);
  193. assertex(sfile->querySubFileNamed(name.str())==f);
  194. i++;
  195. }
  196. i = 0;
  197. iter.setown(sfile->getSubFileIterator(true));
  198. ForEach(*iter) {
  199. StringBuffer name;
  200. iter->getName(name);
  201. printf(" %d: %s\n",i+1,name.str());
  202. IDistributedFile *f = &iter->query();
  203. assertex(stricmp(f->queryLogicalName(),name.str())==0);
  204. //assertex(&sfile->querySubFile(i)==f);
  205. //assertex(sfile->querySubFileNamed(name.str())==f);
  206. i++;
  207. }
  208. iter.clear();
  209. piter.setown(sfile->getIterator());
  210. n = sfile->numParts();
  211. printf("NumSubParts = %d\n",n);
  212. i = 0;
  213. ForEach(*piter) {
  214. IDistributedFilePart *part = &piter->query();
  215. unsigned subp;
  216. IDistributedFile *subf = sfile->querySubPart(i,subp);
  217. assertex(subf);
  218. const char* lname = subf->queryLogicalName();
  219. StringBuffer pname;
  220. part->getPartName(pname);
  221. for (unsigned c=0;c<part->numCopies();c++) {
  222. RemoteFilename rfn;
  223. StringBuffer tmp;
  224. printf(" %d: %s[%d,%d] %s %s\n",i,lname,subp,c,pname.str(),part->getFilename(rfn,c).getRemotePath(tmp).str());
  225. }
  226. i++;
  227. }
  228. }
  229. void Test_SuperFile2()
  230. {
  231. // create subfile
  232. // first remove if exists
  233. unsigned i;
  234. Owned<IDistributedSuperFile> sfile;
  235. queryDistributedFileDirectory().removeEntry(TEST_SUPER_FILE"B1",UNKNOWN_USER);
  236. sfile.setown(queryDistributedFileDirectory().createSuperFile(TEST_SUPER_FILE"B1",UNKNOWN_USER,true));
  237. for (unsigned tst=0;tst<2;tst++) {
  238. printf("sfile size = %" I64F "d\n",sfile->getFileSize(false,false));
  239. for (i = 0;i<3;i++) {
  240. StringBuffer name(TEST_SUB_FILE);
  241. name.append(i+1);
  242. addTestFile(name.str(),i+2);
  243. Owned<IDistributedFile> sbfile = queryDistributedFileDirectory().lookup(name,UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  244. printf("adding size = %" I64F "d\n",sbfile->getFileSize(false,false));
  245. sfile->addSubFile(name);
  246. printf("sfile size = %" I64F "d\n",sfile->getFileSize(false,false));
  247. }
  248. sfile.clear();
  249. sfile.setown(queryDistributedFileDirectory().lookupSuperFile(TEST_SUPER_FILE"B1",UNKNOWN_USER));
  250. printf("NumSubFiles = %d\n",sfile->numSubFiles());
  251. if (tst==1) {
  252. sfile->removeSubFile(NULL,false);
  253. printf("sfile size = %" I64F "d\n",sfile->getFileSize(false,false));
  254. }
  255. else {
  256. for (i = 0;i<3;i++) {
  257. StringBuffer name(TEST_SUB_FILE);
  258. name.append(i+1);
  259. Owned<IDistributedFile> sbfile = queryDistributedFileDirectory().lookup(name,UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  260. printf("removing size = %" I64F "d\n",sbfile->getFileSize(false,false));
  261. sfile->removeSubFile(name,false);
  262. printf("sfile size = %" I64F "d\n",sfile->getFileSize(false,false));
  263. }
  264. }
  265. printf("NumSubFiles = %d\n",sfile->numSubFiles());
  266. }
  267. }
  268. void Test_PartIter()
  269. {
  270. unsigned start = msTick();
  271. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup("nhtest::file_name_ssn20030805",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  272. Owned<IDistributedFilePartIterator> parts = file->getIterator();
  273. ForEach(*parts) {
  274. IDistributedFilePart & thisPart = parts->query();
  275. IPropertyTree &partProps = thisPart.queryAttributes();
  276. }
  277. printf("time taken = %d\n",msTick()-start);
  278. }
  279. void testCDfsLogicalFileName()
  280. {
  281. CDfsLogicalFileName cdlfn;
  282. const char *lfn;
  283. SocketEndpoint ep;
  284. StringBuffer eps;
  285. StringBuffer path;
  286. StringBuffer dir;
  287. StringBuffer scopes;
  288. StringBuffer tail;
  289. assertex(!cdlfn.isSet());
  290. assertex(!cdlfn.isForeign());
  291. assertex(!cdlfn.isExternal());
  292. lfn=cdlfn.get(); assertex(strlen(lfn)==0);
  293. lfn=cdlfn.get(true); assertex(strlen(lfn)==0);
  294. cdlfn.set("xYz");
  295. assertex(cdlfn.isSet());
  296. assertex(!cdlfn.isForeign());
  297. assertex(!cdlfn.isExternal());
  298. lfn=cdlfn.get(); assertex(strcmp(lfn,".::xyz")==0);
  299. lfn=cdlfn.get(true); assertex(strcmp(lfn,".::xyz")==0);
  300. cdlfn.set("X");
  301. assertex(cdlfn.isSet());
  302. assertex(!cdlfn.isForeign());
  303. assertex(!cdlfn.isExternal());
  304. lfn=cdlfn.get(); assertex(strcmp(lfn,".::x")==0);
  305. lfn=cdlfn.get(true); assertex(strcmp(lfn,".::x")==0);
  306. cdlfn.set(" xYz ");
  307. assertex(cdlfn.isSet());
  308. assertex(!cdlfn.isForeign());
  309. assertex(!cdlfn.isExternal());
  310. lfn=cdlfn.get(); assertex(strcmp(lfn,".::xyz")==0);
  311. lfn=cdlfn.get(true); assertex(strcmp(lfn,".::xyz")==0);
  312. cdlfn.getScopes(scopes.clear()); assertex(strcmp(scopes.str(),".")==0);
  313. cdlfn.getTail(tail.clear()); assertex(strcmp(tail.str(),"xyz")==0);
  314. cdlfn.set("aBc::xYz");
  315. assertex(cdlfn.isSet());
  316. assertex(!cdlfn.isForeign());
  317. assertex(!cdlfn.isExternal());
  318. lfn=cdlfn.get(); assertex(strcmp(lfn,"abc::xyz")==0);
  319. lfn=cdlfn.get(true); assertex(strcmp(lfn,"abc::xyz")==0);
  320. cdlfn.set("A::X");
  321. assertex(cdlfn.isSet());
  322. assertex(!cdlfn.isForeign());
  323. assertex(!cdlfn.isExternal());
  324. lfn=cdlfn.get(); assertex(strcmp(lfn,"a::x")==0);
  325. lfn=cdlfn.get(true); assertex(strcmp(lfn,"a::x")==0);
  326. cdlfn.set(" AbC :: xYz ");
  327. assertex(cdlfn.isSet());
  328. assertex(!cdlfn.isForeign());
  329. assertex(!cdlfn.isExternal());
  330. cdlfn.getScopes(scopes.clear()); assertex(strcmp(scopes.str(),"abc")==0);
  331. cdlfn.getTail(tail.clear()); assertex(strcmp(tail.str(),"xyz")==0);
  332. lfn=cdlfn.get(); assertex(strcmp(lfn,"abc::xyz")==0);
  333. lfn=cdlfn.get(true); assertex(strcmp(lfn,"abc::xyz")==0);
  334. cdlfn.set("123::aBc::xYz");
  335. assertex(cdlfn.isSet());
  336. assertex(!cdlfn.isForeign());
  337. assertex(!cdlfn.isExternal());
  338. lfn=cdlfn.get(); assertex(strcmp(lfn,"123::abc::xyz")==0);
  339. lfn=cdlfn.get(true); assertex(strcmp(lfn,"123::abc::xyz")==0);
  340. cdlfn.set("1:: A ::X");
  341. assertex(cdlfn.isSet());
  342. assertex(!cdlfn.isForeign());
  343. assertex(!cdlfn.isExternal());
  344. lfn=cdlfn.get(); assertex(strcmp(lfn,"1::a::x")==0);
  345. lfn=cdlfn.get(true); assertex(strcmp(lfn,"1::a::x")==0);
  346. cdlfn.set(" 123 :: AbC :: xYz ");
  347. assertex(cdlfn.isSet());
  348. assertex(!cdlfn.isForeign());
  349. assertex(!cdlfn.isExternal());
  350. lfn=cdlfn.get(); assertex(strcmp(lfn,"123::abc::xyz")==0);
  351. lfn=cdlfn.get(true); assertex(strcmp(lfn,"123::abc::xyz")==0);
  352. cdlfn.getScopes(scopes.clear()); assertex(strcmp(scopes.str(),"123::abc")==0);
  353. cdlfn.getTail(tail.clear()); assertex(strcmp(tail.str(),"xyz")==0);
  354. cdlfn.set("file::10.150.10.75::c$::test::file.xyz");
  355. assertex(cdlfn.isSet());
  356. assertex(!cdlfn.isForeign());
  357. assertex(cdlfn.isExternal());
  358. lfn=cdlfn.get(); assertex(strcmp(lfn,"file::10.150.10.75::c$::test::file.xyz")==0);
  359. lfn=cdlfn.get(true); assertex(strcmp(lfn,"file::10.150.10.75::c$::test::file.xyz")==0);
  360. verifyex(cdlfn.getEp(ep));
  361. ep.getUrlStr(eps.clear()); assertex(strcmp(eps.str(),"10.150.10.75")==0);
  362. verifyex(cdlfn.getExternalPath(path.clear(),path,true)); assertex(strcmp(path.str(),"c:\\test\\file.xyz")==0);
  363. verifyex(cdlfn.getExternalPath(dir.clear(),path.clear(),true)); assertex(strcmp(path.str(),"file.xyz")==0);
  364. verifyex(cdlfn.getExternalPath(path.clear(),path,false)); assertex(strcmp(path.str(),"/c$/test/file.xyz")==0);
  365. verifyex(cdlfn.getExternalPath(dir.clear(),path.clear(),false)); assertex(strcmp(path.str(),"file.xyz")==0);
  366. cdlfn.set("file::10.150.10.75:7100::c$::test::file.xyz");
  367. assertex(cdlfn.isSet());
  368. assertex(!cdlfn.isForeign());
  369. assertex(cdlfn.isExternal());
  370. lfn=cdlfn.get(); assertex(strcmp(lfn,"file::10.150.10.75:7100::c$::test::file.xyz")==0);
  371. verifyex(cdlfn.getEp(ep));
  372. ep.getUrlStr(eps.clear()); assertex(strcmp(eps.str(),"10.150.10.75:7100")==0);
  373. verifyex(cdlfn.getExternalPath(path.clear(),path,true)); assertex(strcmp(path.str(),"c:\\test\\file.xyz")==0);
  374. verifyex(cdlfn.getExternalPath(dir.clear(),path.clear(),true)); assertex(strcmp(path.str(),"file.xyz")==0);
  375. verifyex(cdlfn.getExternalPath(path.clear(),path,false)); assertex(strcmp(path.str(),"/c$/test/file.xyz")==0);
  376. verifyex(cdlfn.getExternalPath(dir.clear(),path.clear(),false)); assertex(strcmp(path.str(),"file.xyz")==0);
  377. cdlfn.set("foreign::10.150.10.75::test::file.xyz");
  378. assertex(cdlfn.isSet());
  379. assertex(cdlfn.isForeign());
  380. assertex(!cdlfn.isExternal());
  381. verifyex(cdlfn.getEp(ep));
  382. ep.getUrlStr(eps.clear()); assertex(strcmp(eps.str(),"10.150.10.75")==0);
  383. lfn=cdlfn.get(); assertex(strcmp(lfn,"foreign::10.150.10.75::test::file.xyz")==0);
  384. lfn=cdlfn.get(true); assertex(strcmp(lfn,"test::file.xyz")==0);
  385. cdlfn.getScopes(scopes.clear()); assertex(strcmp(scopes.str(),"foreign::10.150.10.75::test")==0);
  386. cdlfn.getScopes(scopes.clear(),true); assertex(strcmp(scopes.str(),"test")==0);
  387. cdlfn.getTail(tail.clear()); assertex(strcmp(tail.str(),"file.xyz")==0);
  388. StringBuffer baseq;
  389. CDfsLogicalFileName dlfn;
  390. /*
  391. #define T(s,a,q) \
  392. dlfn.set(s,"x"); \
  393. dlfn.makeScopeQuery(baseq.clear(),a); \
  394. assertex(strcmp(q,baseq.str())==0)
  395. */
  396. #define T(s,a,q) assertex(strcmp(q,s)==0)
  397. T("abcd::efgh",true,"abcd");
  398. /*
  399. // "Files/Scope[@name=\"abcd\"]/Scope[@name=\"efgh\"]");
  400. T("efgh",true,"Files/Scope[@name=\"efgh\"]");
  401. T(".",true,"Files/Scope[@name=\".\"]");
  402. T("",true,"Files/Scope[@name=\".\"]");
  403. T(NULL,true,"Files/Scope[@name=\".\"]");
  404. T("abcd::efgh",false,"Scope[@name=\"abcd\"]/Scope[@name=\"efgh\"]");
  405. T("efgh",false,"Scope[@name=\"efgh\"]");
  406. T(".",false,"Scope[@name=\".\"]");
  407. T("",false,"Scope[@name=\".\"]");
  408. T(NULL,false,"Scope[@name=\".\"]");
  409. */
  410. }
  411. void Test_DFS()
  412. {
  413. testCDfsLogicalFileName();
  414. #if 0
  415. IDistributedFileIterator& iter = *queryDistributedFileDirectory().getIterator("thor_data400::*");
  416. StringBuffer name;
  417. ForEach(iter) {
  418. iter.getName(name.clear());
  419. if (strchr(name.str(),'\001')) {
  420. printf("name = %s\n",name.str());
  421. break;
  422. }
  423. }
  424. iter.Release();
  425. IDistributedFile *dfiler = queryDistributedFileDirectory().lookup(name.str());
  426. if (!dfiler) {
  427. printf("not found");
  428. return;
  429. }
  430. dfiler->detach();
  431. dfiler->attach("thor_data400::unknown_corrupt");
  432. dfiler->Release();
  433. return;
  434. #endif
  435. Owned<IPropertyTree> pp = createPTree("Part");
  436. IFileDescriptor *fdesc = createFileDescriptor();
  437. fdesc->setDefaultDir("c:\\thordata\\test");
  438. INode *node = createINode("192.168.0.1");
  439. pp->setPropInt64("@size",1234);
  440. fdesc->setPart(0,node,"testfile1.d00._1_of_3",pp);
  441. node->Release();
  442. node = createINode("192.168.0.2");
  443. pp->setPropInt64("@size",2345);
  444. fdesc->setPart(1,node,"testfile1.d00._2_of_3",pp);
  445. node->Release();
  446. node = createINode("192.168.0.3");
  447. pp->setPropInt64("@size",3456);
  448. fdesc->setPart(2,node,"testfile1.d00._3_of_3",pp);
  449. node->Release();
  450. queryDistributedFileDirectory().removeEntry("nigel::test::testfile1",UNKNOWN_USER);
  451. IDistributedFile *dfile = queryDistributedFileDirectory().createNew(fdesc);
  452. dfile->attach("nigel::test::testfile1",UNKNOWN_USER);
  453. dfile->Release();
  454. fdesc->Release();
  455. fdesc = createFileDescriptor();
  456. fdesc->setDefaultDir("c:\\thordata");
  457. node = createINode("192.168.0.1");
  458. pp->setPropInt64("@size",23456);
  459. fdesc->setPart(0,node,"testfile2.d00._1_of_3");
  460. node->Release();
  461. node = createINode("192.168.0.2");
  462. pp->setPropInt64("@size",33456);
  463. fdesc->setPart(1,node,"testfile2.d00._2_of_3");
  464. node->Release();
  465. node = createINode("192.168.0.3");
  466. pp->setPropInt64("@size",43456);
  467. fdesc->setPart(2,node,"testfile2.d00._3_of_3");
  468. node->Release();
  469. node = createINode("192.168.0.4");
  470. fdesc->setPart(1,node,"testfile2.d00._2_of_3");
  471. node->Release();
  472. queryDistributedFileDirectory().removeEntry("nigel::test::testfile2",UNKNOWN_USER);
  473. dfile = queryDistributedFileDirectory().createNew(fdesc);
  474. dfile->attach("nigel::test::testfile2",UNKNOWN_USER);
  475. dfile->Release();
  476. fdesc->Release();
  477. fdesc = createFileDescriptor();
  478. fdesc->setDefaultDir("c:\\thordata");
  479. node = createINode("192.168.0.1");
  480. fdesc->setPart(0,node,"testfile3.d00._1_of_3");
  481. node->Release();
  482. node = createINode("192.168.0.2");
  483. fdesc->setPart(1,node,"testfile3.d00._2_of_3");
  484. node->Release();
  485. node = createINode("192.168.0.3");
  486. fdesc->setPart(2,node,"testfile3.d00._3_of_3");
  487. node->Release();
  488. queryDistributedFileDirectory().removeEntry("nigel::test::testfile3",UNKNOWN_USER);
  489. dfile = queryDistributedFileDirectory().createNew(fdesc);
  490. dfile->attach("nigel::test::testfile3",UNKNOWN_USER);
  491. dfile->Release();
  492. fdesc->Release();
  493. IDistributedFile *f = queryDistributedFileDirectory().lookup("nigel::test::testfile2",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  494. if (!f)
  495. printf("failed 1");
  496. ::Release(f);
  497. f = queryDistributedFileDirectory().lookup("nigel::zest::testfile1",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  498. assertex(!f);
  499. f = queryDistributedFileDirectory().lookup("nigel::test::zestfile1",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  500. assertex(!f);
  501. f = queryDistributedFileDirectory().lookup("nigel::test::testfile1",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  502. if (!f)
  503. printf("failed 2 ");
  504. ::Release(f);
  505. f = queryDistributedFileDirectory().lookup("nigel::test::testfile3",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  506. if (!f)
  507. printf("failed 3");
  508. StringBuffer str;
  509. for (unsigned p=0;p<f->numParts();p++) {
  510. Owned<IDistributedFilePart> part = f->getPart(p);
  511. RemoteFilename rfn;
  512. part->getFilename(rfn).getRemotePath(str.clear());
  513. printf("part[%d,0] = %s\n",p+1,str.str());
  514. part->getFilename(rfn,1).getRemotePath(str.clear());
  515. printf("part[%d,1] = %s\n",p+1,str.str());
  516. }
  517. ::Release(f);
  518. #if 0
  519. IDistributedFileIterator& iter = *queryDistributedFileDirectory().getIterator("*::*::*");
  520. ForEach(iter) {
  521. StringBuffer name;
  522. printf("name = %s\n",iter.getName(name).str());
  523. }
  524. ForEach(iter) {
  525. StringBuffer name;
  526. printf("name = %s\n",iter.getName(name).str());
  527. }
  528. iter.Release();
  529. queryDistributedFileDirectory().removeEntry("nigel::test::testfile4");
  530. f = queryDistributedFileDirectory().lookup("nigel::test::testfile2");
  531. fdesc = f->getFileDescriptor();
  532. dfile = queryDistributedFileDirectory().createNew(fdesc);
  533. dfile->attach("nigel::test::testfile4");
  534. dfile->Release();
  535. fdesc->Release();
  536. f->Release();
  537. #endif
  538. }
  539. void Test_DFSU()
  540. {
  541. Owned<IPropertyTree> pp = createPTree("Part");
  542. IFileDescriptor *fdesc = createFileDescriptor();
  543. fdesc->setDefaultDir("/c$/thordata/test");
  544. INode *node = createINode("192.168.0.1");
  545. pp->setPropInt64("@size",1234);
  546. fdesc->setPart(0,node,"testfile1.d00._1_of_3",pp);
  547. node->Release();
  548. node = createINode("192.168.0.2");
  549. pp->setPropInt64("@size",2345);
  550. fdesc->setPart(1,node,"testfile1.d00._2_of_3",pp);
  551. node->Release();
  552. node = createINode("192.168.0.3");
  553. pp->setPropInt64("@size",3456);
  554. fdesc->setPart(2,node,"testfile1.d00._3_of_3",pp);
  555. node->Release();
  556. queryDistributedFileDirectory().removeEntry("nigel::test::testfile1u",UNKNOWN_USER);
  557. IDistributedFile *dfile = queryDistributedFileDirectory().createNew(fdesc);
  558. dfile->attach("nigel::test::testfile1u",UNKNOWN_USER);
  559. dfile->Release();
  560. fdesc->Release();
  561. fdesc = createFileDescriptor();
  562. fdesc->setDefaultDir("/c$/thordata");
  563. node = createINode("192.168.0.1");
  564. pp->setPropInt64("@size",23456);
  565. fdesc->setPart(0,node,"testfile2.d00._1_of_3");
  566. node->Release();
  567. node = createINode("192.168.0.2");
  568. pp->setPropInt64("@size",33456);
  569. fdesc->setPart(1,node,"testfile2.d00._2_of_3");
  570. node->Release();
  571. node = createINode("192.168.0.3");
  572. pp->setPropInt64("@size",43456);
  573. fdesc->setPart(2,node,"testfile2.d00._3_of_3");
  574. node->Release();
  575. node = createINode("192.168.0.4");
  576. fdesc->setPart(1,node,"testfile2.d00._2_of_3");
  577. node->Release();
  578. queryDistributedFileDirectory().removeEntry("nigel::test::testfile2u",UNKNOWN_USER);
  579. dfile = queryDistributedFileDirectory().createNew(fdesc);
  580. dfile->attach("nigel::test::testfile2u",UNKNOWN_USER);
  581. dfile->Release();
  582. fdesc->Release();
  583. fdesc = createFileDescriptor();
  584. fdesc->setDefaultDir("/c$/thordata");
  585. node = createINode("192.168.0.1");
  586. fdesc->setPart(0,node,"testfile3.d00._1_of_3");
  587. node->Release();
  588. node = createINode("192.168.0.2");
  589. fdesc->setPart(1,node,"testfile3.d00._2_of_3");
  590. node->Release();
  591. node = createINode("192.168.0.3");
  592. fdesc->setPart(2,node,"testfile3.d00._3_of_3");
  593. node->Release();
  594. queryDistributedFileDirectory().removeEntry("nigel::test::testfile3u",UNKNOWN_USER);
  595. dfile = queryDistributedFileDirectory().createNew(fdesc);
  596. dfile->attach("nigel::test::testfile3u",UNKNOWN_USER);
  597. dfile->Release();
  598. fdesc->Release();
  599. IDistributedFile *f = queryDistributedFileDirectory().lookup("nigel::test::testfile2u",UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  600. if (!f)
  601. printf("failed 1");
  602. StringBuffer str;
  603. for (unsigned p=0;p<f->numParts();p++) {
  604. Owned<IDistributedFilePart> part = f->getPart(p);
  605. RemoteFilename rfn;
  606. part->getFilename(rfn).getRemotePath(str.clear());
  607. printf("upart[%d,0] = %s\n",p+1,str.str());
  608. part->getFilename(rfn,1).getRemotePath(str.clear());
  609. printf("upart[%d,1] = %s\n",p+1,str.str());
  610. }
  611. ::Release(f);
  612. }
  613. void testcode()
  614. {
  615. }
  616. void Test_MultiFile()
  617. {
  618. Owned<IPropertyTree> pp = createPTree("Part");
  619. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  620. fdesc->setDefaultDir("c:\\thordata\\test");
  621. INode *node = createINode("10.150.10.16");
  622. pp->setPropInt64("@size",1234);
  623. fdesc->setPart(0,node,"firstpart,testfile1.*._1_of_3,c:\\test\\lastpart",pp);
  624. node->Release();
  625. node = createINode("10.150.10.17");
  626. pp->setPropInt64("@size",2345);
  627. fdesc->setPart(1,node,"testfile1.d00._2_of_3",pp);
  628. node->Release();
  629. node = createINode("10.150.10.18");
  630. pp->setPropInt64("@size",3456);
  631. fdesc->setPart(2,node,"testfile1.*._3_of_3",pp);
  632. node->Release();
  633. assertex(fdesc->isMulti());
  634. unsigned n = fdesc->numParts();
  635. for (unsigned p=0;p<n;p++) {
  636. for (unsigned cpy=0;cpy<2;cpy++) {
  637. RemoteMultiFilename rmfn;
  638. fdesc->getMultiFilename(p,cpy,rmfn);
  639. bool iswild = rmfn.isWild();
  640. printf("Part %d[%d]%s%s\n",p+1,cpy,fdesc->isMulti(p)?", MULTI":"",iswild?", WILD":"");
  641. unsigned nc = rmfn.ordinality();
  642. StringBuffer rfns;
  643. for (unsigned j=0;j<nc;j++) {
  644. const RemoteFilename &rfn = rmfn.item(j);
  645. printf(" Component %d %s%s\n",j,rfn.getRemotePath(rfns.clear()).str(),
  646. rmfn.isWild(j)?", WILD":"");
  647. }
  648. if (iswild) {
  649. try {
  650. rmfn.expandWild();
  651. nc = rmfn.ordinality();
  652. for (unsigned k=0;k<nc;k++) {
  653. const RemoteFilename &rfn = rmfn.item(k);
  654. printf(" Resolved %d %s\n",k,rfn.getRemotePath(rfns.clear()).str());
  655. assertex(!rmfn.isWild(k));
  656. }
  657. }
  658. catch (IException *e) {
  659. EXCLOG(e,"expandWild");
  660. e->Release();
  661. }
  662. }
  663. }
  664. }
  665. }
  666. #define BUFFSIZE 0x8000
  667. #define COUNT 10
  668. struct RecordStruct
  669. {
  670. char fill[16];
  671. unsigned idx;
  672. unsigned key;
  673. unsigned check;
  674. unsigned sum;
  675. };
  676. #define NRECS ((__int64)1024*1024*1024*20/sizeof(RecordStruct)) // i.e. ~20GB
  677. // #define NRECS ((__int64)1024*1024*1024*2/sizeof(RecordStruct)) // i.e. ~2GB
  678. // #define NRECS ((__int64)1024*1024*500/sizeof(RecordStruct)) // i.e. ~500MB
  679. // #define NRECS ((__int64)1024*500/sizeof(RecordStruct)) // i.e. ~500KB
  680. void TestCopyFile(char *srcfname, char *dstfname) // TEST_COPYFILE
  681. {
  682. // example cmdline: datest srcfile //1.2.3.4:7100/home/username/dstfile
  683. Owned<IFile> srcfile = createIFile(srcfname);
  684. Owned<IFileIO> srcio = srcfile->open(IFOcreate);
  685. char buf[100] = { "TestCopyFile" };
  686. srcio->write(0, 18, buf);
  687. srcio->close();
  688. Owned<IFile> dstfile = createIFile(dstfname);
  689. srcfile->copyTo(dstfile,0x100000,NULL,false);
  690. }
  691. void TestRemoteFile3(int nfiles, int fsizemb)
  692. {
  693. //SocketEndpoint ep("10.150.10.8:7100");
  694. SocketEndpoint ep("127.0.1.1:7100");
  695. //ISocket *sock = ISocket::connect(7100,"10.150.10.8");
  696. // ------------------------------
  697. printf("TestRemoteFile3: nfiles = %d fsizemb = %d\n", nfiles, fsizemb);
  698. IFile *fileA[32769];
  699. IFileIO *ioA[32769];
  700. char filen[256] = { "" };
  701. size_t nrecs = (fsizemb*1024*1024) / sizeof(RecordStruct);
  702. unsigned t=msTick();
  703. if(nfiles > 0){
  704. for (int oi=0;oi<=nfiles;oi++) {
  705. sprintf(filen, "testfile.%d", oi);
  706. fileA[oi] = createRemoteFile(ep, filen);
  707. ioA[oi] = fileA[oi]->open(IFOcreate);
  708. }
  709. for (int oi=0;oi<=nfiles;oi++) {
  710. ioA[oi]->Release();
  711. fileA[oi]->Release();
  712. }
  713. unsigned r = msTick()-t;
  714. printf("elapsed time createRemoteFile (%d) = %lf (sec)\n", nfiles, (double)r/1000.0);
  715. }
  716. // ------------------------------
  717. if(nrecs > 0){
  718. #if 1
  719. for (int oi=0;oi<=0;oi++) {
  720. IFile *file = createRemoteFile(ep, "testfile20.d00");
  721. IFileIO *io = file->open(IFOcreate);
  722. byte *buffer = (byte *)malloc(0x8000);
  723. unsigned br = 0x8000/sizeof(RecordStruct);
  724. size32_t buffsize = br*sizeof(RecordStruct);
  725. unsigned curidx = 0;
  726. unsigned nr = nrecs;
  727. __int64 pos = 0;
  728. t=msTick();
  729. #if 1
  730. unsigned j;
  731. RecordStruct *rs;
  732. while (nr) {
  733. if (nr<br)
  734. br = nr;
  735. rs = (RecordStruct *)buffer;
  736. for (j=0;j<br;j++) {
  737. rs->idx = curidx++;
  738. itoa(rs->idx,rs->fill,16);
  739. unsigned k;
  740. for (k=strlen(rs->fill);k<sizeof(rs->fill);k++)
  741. rs->fill[k] = ' ';
  742. rs->key = getRandom()%1000+1;
  743. rs->check = rs->idx*rs->key;
  744. rs->sum = 0;
  745. rs++;
  746. }
  747. size32_t wr = io->write(pos, br*sizeof(RecordStruct),buffer);
  748. assertex(wr==br*sizeof(RecordStruct));
  749. pos += br*sizeof(RecordStruct);
  750. nr -= br;
  751. }
  752. io->Release();
  753. unsigned r = msTick()-t;
  754. printf("Write (buffsize = %dk): elapsed time write = %lf (sec)\n",(buffsize+1023)/1024,(double)r/1000.0);
  755. Sleep(10);
  756. #endif
  757. br = 0x2000/sizeof(RecordStruct);
  758. for (unsigned iter=1;iter<10;iter++) {
  759. t=msTick();
  760. buffsize = br*sizeof(RecordStruct);
  761. buffer = (byte *)realloc(buffer,buffsize);
  762. curidx = 0;
  763. nr = nrecs;
  764. pos = 0;
  765. unsigned r = msTick();
  766. IFileIO *io = file->open(IFOread);
  767. while (nr) {
  768. if (nr<br)
  769. br = nr;
  770. size32_t rd = io->read(pos, br*sizeof(RecordStruct),buffer);
  771. // fprintf(stderr, "nr = %u rd = %u br = %u sizeof(RecordStruct) = %lu\n", nr, rd, br, sizeof(RecordStruct));
  772. assertex(rd==br*sizeof(RecordStruct));
  773. pos += br*sizeof(RecordStruct);
  774. nr -= br;
  775. }
  776. io->Release();
  777. r = msTick()-t;
  778. printf("Read (buffsize = %dk): elapsed time read = %lf (sec)\n",(buffsize+1023)/1024,(double)r/1000.0);
  779. Sleep(10);
  780. br += br;
  781. }
  782. file->Release();
  783. }
  784. #endif
  785. }
  786. return;
  787. }
  788. void TestRemoteFile2()
  789. {
  790. unsigned t=msTick();
  791. SocketEndpoint ep("10.150.10.8:7100");
  792. //ISocket *sock = ISocket::connect(7100,"10.150.10.8");
  793. IFile *file = createRemoteFile(ep, "testfile20.d00");
  794. IFileIO *io = file->open(IFOcreate);
  795. byte *buffer = (byte *)malloc(0x8000);
  796. unsigned br = 0x8000/sizeof(RecordStruct);
  797. size32_t buffsize = br*sizeof(RecordStruct);
  798. unsigned curidx = 0;
  799. unsigned nr = NRECS;
  800. __int64 pos = 0;
  801. #if 0
  802. unsigned j;
  803. RecordStruct *rs;
  804. while (nr) {
  805. if (nr<br)
  806. br = nr;
  807. rs = (RecordStruct *)buffer;
  808. for (j=0;j<br;j++) {
  809. rs->idx = curidx++;
  810. itoa(rs->idx,rs->fill,16);
  811. unsigned k;
  812. for (k=strlen(rs->fill);k<sizeof(rs->fill);k++)
  813. rs->fill[k] = ' ';
  814. rs->key = getRandom()%1000+1;
  815. rs->check = rs->idx*rs->key;
  816. rs->sum = 0;
  817. rs++;
  818. }
  819. size32_t wr = io->write(pos, br*sizeof(RecordStruct),buffer);
  820. assertex(wr==br*sizeof(RecordStruct));
  821. pos += br*sizeof(RecordStruct);
  822. nr -= br;
  823. }
  824. io->Release();
  825. r = msTick()-t;
  826. printf("Write (buffsize = %dk): elapsed time write = %d\n",(buffsize+1023)/1024,t/1000);
  827. Sleep(10);
  828. #endif
  829. br = 0x2000/sizeof(RecordStruct);
  830. for (unsigned iter=1;iter<10;iter++) {
  831. buffsize = br*sizeof(RecordStruct);
  832. buffer = (byte *)realloc(buffer,buffsize);
  833. curidx = 0;
  834. nr = NRECS;
  835. pos = 0;
  836. unsigned r = msTick();
  837. IFileIO *io = file->open(IFOread);
  838. while (nr) {
  839. if (nr<br)
  840. br = nr;
  841. size32_t rd = io->read(pos, br*sizeof(RecordStruct),buffer);
  842. assertex(rd==br*sizeof(RecordStruct));
  843. pos += br*sizeof(RecordStruct);
  844. nr -= br;
  845. }
  846. io->Release();
  847. r = msTick()-t;
  848. printf("Read (buffsize = %dk): elapsed time write = %d\n",(buffsize+1023)/1024,r/1000);
  849. Sleep(10);
  850. br += br;
  851. }
  852. file->Release();
  853. }
  854. void TestRemoteFile(unsigned part,unsigned of)
  855. {
  856. byte *buffer = (byte *)malloc(0x10000);
  857. unsigned br = 0x8000/sizeof(RecordStruct);
  858. size32_t buffsize = br*sizeof(RecordStruct);
  859. unsigned t=msTick();
  860. unsigned nr = ((20*1024*1024)/(sizeof(RecordStruct)*of))*1024;
  861. //ISocket *sock = ISocket::connect(7100,"10.150.10.8");
  862. SocketEndpoint ep("10.150.10.8:7100");
  863. IFile *infile = createRemoteFile(ep, "testfile20.d00");
  864. IFileIO *inio = infile->open(IFOread);
  865. StringBuffer str("c:\\dali\\test1.");
  866. str.append(part);
  867. IFile *outfile = createIFile(str.str());
  868. IFileIO *outio = outfile->open(IFOcreate);
  869. offset_t inpos = (offset_t)(part-1)*nr*sizeof(RecordStruct);
  870. offset_t outpos = 0;
  871. while (nr) {
  872. if (nr<br)
  873. br = nr;
  874. size32_t rd = inio->read(inpos, br*sizeof(RecordStruct),buffer);
  875. assertex(rd==br*sizeof(RecordStruct));
  876. outio->write(outpos, br*sizeof(RecordStruct),buffer);
  877. inpos += br*sizeof(RecordStruct);
  878. outpos += br*sizeof(RecordStruct);
  879. nr -= br;
  880. }
  881. inio->Release();
  882. outio->Release();
  883. t = msTick()-t;
  884. printf("Transfer: elapsed time write = %d\n",t/1000);
  885. infile->Release();
  886. outfile->Release();
  887. free(buffer);
  888. }
  889. void QTest(bool testput)
  890. {
  891. PROGLOG("starting QTest %s",testput?"put":"get");
  892. Owned<INamedQueueConnection> conn = createNamedQueueConnection(0);
  893. Owned<IQueueChannel> channel = conn->open("testq");
  894. unsigned i;
  895. unsigned t1 = msTick();
  896. byte *buf=(byte *)malloc(1024*128);
  897. memset(buf,77,1024*128);
  898. if (!testput) {
  899. while (channel->probe()) {
  900. MemoryBuffer mb;
  901. channel->get(mb);
  902. }
  903. }
  904. unsigned qn = 0;
  905. unsigned n;
  906. for (n=1;n<=128;n++) {
  907. PROGLOG("start %d",n);
  908. unsigned t1 = msTick();
  909. for (i=0;i<1000;i++) {
  910. qn++;
  911. Sleep(getRandom()%1000);
  912. MemoryBuffer mb;
  913. if (testput) {
  914. mb.append("Hello").append(i);
  915. queryMyNode()->serialize(mb);
  916. buf[0] = qn%256;
  917. buf[1024*n-1] = 255-buf[0];
  918. mb.append(1024*n,buf);
  919. channel->put(mb);
  920. #if 1
  921. if (i%100==99) {
  922. PROGLOG("Put %i - %d on queue",i,channel->probe());
  923. PROGLOG("time taken = %d",msTick()-t1);
  924. t1 = msTick();
  925. }
  926. #endif
  927. }
  928. else {
  929. #if 1
  930. for (;;) {
  931. if (channel->get(mb,0,100))
  932. break;
  933. printf(".");
  934. }
  935. printf("\n");
  936. #else
  937. channel->get(mb);
  938. #endif
  939. StringAttr str;
  940. unsigned p;
  941. mb.read(str).read(p);
  942. Owned<INode> node = deserializeINode(mb);
  943. size32_t sz = mb.length()-mb.getPos();
  944. if (sz) {
  945. mb.read(sz,buf);
  946. if ((buf[0]!=255-buf[sz-1])||(buf[0]!=qn%256)) {
  947. PROGLOG("%d: sz=%d, buf[0]=%d, buf[sz-1]=%d %d %d",qn,sz,(int)buf[0],(int)buf[sz-1],mb.length(),mb.getPos());
  948. return;
  949. }
  950. assertex(buf[0]==255-buf[sz-1]);
  951. #if 1
  952. StringBuffer eps;
  953. if (i%100==99) {
  954. PROGLOG("Got %s - %d from %s",str.get(),n,node->endpoint().getUrlStr(eps).str());
  955. PROGLOG("time taken = %d",msTick()-t1);
  956. t1 = msTick();
  957. }
  958. #endif
  959. }
  960. }
  961. }
  962. PROGLOG("average message of %dK took %dms ",n,msTick()-t1);
  963. }
  964. free(buf);
  965. }
  966. class cNotify: public CInterface, implements ISessionNotify
  967. {
  968. public:
  969. IMPLEMENT_IINTERFACE;
  970. Semaphore sem;
  971. void closed(SessionId id)
  972. {
  973. PROGLOG("Session closed %" I64F "d",id);
  974. sem.signal();
  975. }
  976. void aborted(SessionId id)
  977. {
  978. PROGLOG("Session aborted %" I64F "d",id);
  979. sem.signal();
  980. }
  981. };
  982. void Test_Session(const char *eps) // test for sessions
  983. {
  984. if (!eps||!*eps) {
  985. for (unsigned i=0;i<100;i++) {
  986. PROGLOG("Tick %d",i);
  987. Sleep(1000);
  988. }
  989. return;
  990. }
  991. Owned<cNotify> cnotify = new cNotify;
  992. querySessionManager().subscribeSession(12345678,cnotify);
  993. cnotify->sem.wait();
  994. INode *node = createINode(eps,7777);
  995. SessionId id;
  996. for (;;) {
  997. id = querySessionManager().lookupProcessSession(node);
  998. if (id) {
  999. PROGLOG("Session looked up %" I64F "d",id);
  1000. break;
  1001. }
  1002. Sleep(1000);
  1003. }
  1004. querySessionManager().subscribeSession(id,cnotify);
  1005. cnotify->sem.wait();
  1006. Sleep(1000);
  1007. }
  1008. void QTest2(bool testput)
  1009. {
  1010. Owned<INamedQueueConnection> conn = createNamedQueueConnection(0);
  1011. Owned<IQueueChannel> channel = conn->open("testq");
  1012. CMessageBuffer mb;
  1013. if (testput) {
  1014. SessionId session = querySessionManager().startSession(0);
  1015. PROGLOG("session started = %" I64F "d",session);
  1016. mb.append(session);
  1017. channel->put(mb);
  1018. while (!querySessionManager().sessionStopped(session,1000*5))
  1019. PROGLOG("Still going!");
  1020. }
  1021. else {
  1022. channel->get(mb);
  1023. SessionId session;
  1024. mb.read(session);
  1025. PROGLOG("Started");
  1026. Sleep(1000*6);
  1027. PROGLOG("stopping session %" I64F "d",session);
  1028. querySessionManager().stopSession(session,false);
  1029. PROGLOG("Stopped");
  1030. }
  1031. }
  1032. #define TSUB
  1033. #ifdef TSUB
  1034. class TestSubscription : public CInterface, implements ISDSSubscription
  1035. {
  1036. public:
  1037. IMPLEMENT_IINTERFACE;
  1038. // ISDSSubscription impl.
  1039. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1040. {
  1041. static int nno = 0;
  1042. PROGLOG("%d: Notification(%" I64F "d) of %s - flags = %d", nno++, (__int64) id, xpath, flags);
  1043. if (valueData)
  1044. {
  1045. StringBuffer data;
  1046. appendURL(&data, (const char *)valueData, valueLen, 0);
  1047. PROGLOG("ValueData = %s", data.str());
  1048. }
  1049. }
  1050. };
  1051. #endif
  1052. class TSDSThread : public CInterface, implements IPooledThread
  1053. {
  1054. StringAttr path;
  1055. public:
  1056. IMPLEMENT_IINTERFACE;
  1057. virtual void init(void *param) override
  1058. {
  1059. path.set((char *) param);
  1060. }
  1061. virtual void threadmain() override
  1062. {
  1063. try
  1064. {
  1065. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_LOCK_SUB, 1000000);
  1066. PROGLOG("connecting to %s", path.get());
  1067. if (!conn)
  1068. throw MakeStringException(-1, "Failed to connect to path %s", path.get());
  1069. IPropertyTree *root = conn->queryRoot();
  1070. root->setPropInt("TTestProp1", fastRand());
  1071. root->setPropInt("TTestProp2", fastRand());
  1072. }
  1073. catch (IException *e)
  1074. {
  1075. PrintExceptionLog(e, NULL);
  1076. }
  1077. }
  1078. virtual bool stop() override
  1079. {
  1080. return true;
  1081. }
  1082. virtual bool canReuse() const override { return true; }
  1083. };
  1084. class TSDSTestPool : public CInterface, implements IThreadFactory
  1085. {
  1086. public:
  1087. IMPLEMENT_IINTERFACE;
  1088. virtual IPooledThread *createNew()
  1089. {
  1090. return new TSDSThread();
  1091. }
  1092. };
  1093. class CQPutTest : public Thread
  1094. {
  1095. public:
  1096. CQPutTest() : Thread("CQPutTest") { start(); }
  1097. virtual int run()
  1098. {
  1099. try {
  1100. QTest(true);
  1101. }
  1102. catch (IException *e) {
  1103. pexception("QTest(true): Exception",e);
  1104. }
  1105. return 0;
  1106. }
  1107. };
  1108. class CQGetTest : public Thread
  1109. {
  1110. public:
  1111. CQGetTest() : Thread("CQPutTest") { start(); }
  1112. virtual int run()
  1113. {
  1114. try {
  1115. QTest(false);
  1116. }
  1117. catch (IException *e) {
  1118. pexception("QTest(false): Exception",e);
  1119. }
  1120. return 0;
  1121. }
  1122. };
  1123. void testSubscription(bool subscriber, int subs, int comms)
  1124. {
  1125. class TestSubscription : public CInterface, implements ISDSSubscription
  1126. {
  1127. public:
  1128. IMPLEMENT_IINTERFACE;
  1129. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1130. {
  1131. PROGLOG("Notification(%" I64F "x) of %s - flags = %d",(__int64) id, xpath, flags);
  1132. if (valueData)
  1133. {
  1134. StringBuffer data;
  1135. appendURL(&data, (const char *)valueData, valueLen, 0);
  1136. PROGLOG("ValueData = %s", data.str());
  1137. }
  1138. }
  1139. };
  1140. unsigned subscriptions = subs==-1?400:subs;
  1141. unsigned commits = comms==-1?400:comms;
  1142. unsigned i;
  1143. if (subscriber) {
  1144. TestSubscription **subs = (TestSubscription **) alloca(sizeof(TestSubscription *)*subscriptions);
  1145. SubscriptionId *ids = (SubscriptionId *) alloca(sizeof(SubscriptionId)*subscriptions);
  1146. for (i=0; i<subscriptions; i++){
  1147. subs[i] = new TestSubscription;
  1148. PROGLOG("Subscribe %d",i);
  1149. StringBuffer key;
  1150. key.append("/TESTS/TEST").append(i);
  1151. ids[i] = querySDS().subscribe(key.str(), *subs[i], true);
  1152. }
  1153. PROGLOG("paused 1");
  1154. getchar();
  1155. for (i=0; i<subscriptions; i++) {
  1156. querySDS().unsubscribe(ids[i]);
  1157. subs[i]->Release();
  1158. }
  1159. PROGLOG("paused");
  1160. getchar();
  1161. }
  1162. else {
  1163. Owned<IRemoteConnection> conn = querySDS().connect("/TESTS",
  1164. myProcessSession(), RTM_CREATE_QUERY, 1000000);
  1165. IPropertyTree *root = conn->queryRoot();
  1166. unsigned i, _i;
  1167. for (_i=0; _i<commits; _i++) {
  1168. i = _i%subscriptions;
  1169. StringBuffer key;
  1170. key.append("TEST").append(i);
  1171. root->setPropTree(key.str(), createPTree());
  1172. }
  1173. conn->commit();
  1174. PROGLOG("paused 1");
  1175. getchar();
  1176. for (_i=0; _i<commits; _i++) {
  1177. i = _i%subscriptions;
  1178. StringBuffer key;
  1179. key.append("TEST").append(i).append("/index");
  1180. root->setPropInt(key.str(), i);
  1181. PROGLOG("Commit %d", i);
  1182. conn->commit();
  1183. }
  1184. PROGLOG("paused 2");
  1185. getchar();
  1186. for (_i=0; _i<commits; _i++) {
  1187. i = _i%subscriptions;
  1188. StringBuffer key;
  1189. key.append("TEST").append(i).append("/index");
  1190. root->setPropInt(key.str(), subscriptions-i);
  1191. conn->commit();
  1192. }
  1193. PROGLOG("paused 3");
  1194. getchar();
  1195. for (_i=0; _i<commits; _i++) {
  1196. i = _i%subscriptions;
  1197. StringBuffer key;
  1198. key.append("TEST").append(i).append("/index");
  1199. root->setPropInt(key.str(), i);
  1200. conn->commit();
  1201. }
  1202. }
  1203. }
  1204. class CCSub : public CInterface, implements ISDSConnectionSubscription
  1205. {
  1206. StringAttr conn;
  1207. public:
  1208. IMPLEMENT_IINTERFACE;
  1209. CCSub(const char *_conn) : conn(_conn) { }
  1210. virtual void notify()
  1211. {
  1212. PROGLOG("Connection %s changed", conn.get());
  1213. }
  1214. };
  1215. class CChange : public Thread
  1216. {
  1217. Owned<IRemoteConnection> conn;
  1218. Owned<CCSub> sub;
  1219. StringAttr path;
  1220. public:
  1221. CChange(const char *_path) : path(_path)
  1222. {
  1223. conn.setown(querySDS().connect(_path, myProcessSession(), RTM_CREATE_QUERY, 1000000));
  1224. id1 = id2 = 0;
  1225. start();
  1226. }
  1227. virtual int run()
  1228. {
  1229. for (;;)
  1230. {
  1231. conn->queryRoot()->setPropInt("testprop", fastRand()*100);
  1232. conn->commit();
  1233. if (id1)
  1234. {
  1235. conn->unsubscribe(id1);
  1236. id1 = 0;
  1237. }
  1238. else
  1239. {
  1240. StringBuffer s("sub1 ");
  1241. s.append(path);
  1242. id1 = conn->subscribe(*new CCSub(s.str()));
  1243. }
  1244. if (id2)
  1245. {
  1246. conn->unsubscribe(id2);
  1247. id2 = 0;
  1248. }
  1249. else
  1250. {
  1251. StringBuffer s("sub2 ");
  1252. s.append(path);
  1253. id2 = conn->subscribe(*new CCSub(s.str()));
  1254. }
  1255. }
  1256. throwUnexpected(); // loop never terminates, but some compilers complain about missing return without this line
  1257. }
  1258. SubscriptionId id1, id2;
  1259. };
  1260. void testConnectionSubscription()
  1261. {
  1262. IArrayOf<CChange> a;
  1263. unsigned c;
  1264. for (c=0; c<10; c++)
  1265. {
  1266. StringBuffer s("/TESTS/CONSUB");
  1267. s.append(c+1);
  1268. a.append(*new CChange(s.str()));
  1269. }
  1270. ForEachItemIn(cc, a)
  1271. a.item(cc).join();
  1272. }
  1273. void TestStress()
  1274. {
  1275. // config.
  1276. unsigned maxRunning = 40;
  1277. unsigned count = 1000;
  1278. unsigned branches = 4;
  1279. unsigned pauseWhenBusyDelay = 200;
  1280. const char *branchPrefix = "branch";
  1281. bool queues = false;
  1282. //
  1283. Owned<IRemoteConnection> conn = querySDS().connect("/Stress", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, 1000000);
  1284. PROGLOG("connected to /Stress");
  1285. IPropertyTree *root = conn->queryRoot();
  1286. unsigned b;
  1287. for (b=0; b<branches; b++)
  1288. {
  1289. StringBuffer branch(branchPrefix);
  1290. branch.append(b+1);
  1291. root->setPropTree(branch.str(), createPTree());
  1292. }
  1293. conn->commit();
  1294. //
  1295. Owned<CQPutTest> putTest;
  1296. Owned<CQGetTest> getTest;
  1297. //
  1298. if (queues)
  1299. {
  1300. putTest.setown(new CQPutTest());
  1301. getTest.setown(new CQGetTest());
  1302. }
  1303. TSDSTestPool poolFactory;
  1304. Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory);
  1305. unsigned path = 0;
  1306. while (count)
  1307. {
  1308. Owned<IPooledThreadIterator> rIter = pool->running();
  1309. unsigned c=0;
  1310. ForEach (*rIter) c++;
  1311. if (c>=maxRunning)
  1312. {
  1313. PROGLOG("Pause");
  1314. Sleep(pauseWhenBusyDelay);
  1315. }
  1316. else
  1317. {
  1318. StringBuffer branch("Stress/");
  1319. branch.append(branchPrefix).append(++path);
  1320. pool->start((void *)branch.str());
  1321. if (path >= branches)
  1322. path = 0;
  1323. count--;
  1324. }
  1325. }
  1326. PROGLOG("Joining all TSDSThread running threads");
  1327. pool->joinAll();
  1328. pool.clear();
  1329. if (queues)
  1330. {
  1331. PROGLOG("Joining putTest");
  1332. putTest->join();
  1333. PROGLOG("Joining gettTest");
  1334. getTest->join();
  1335. }
  1336. PROGLOG("Finished");
  1337. return;
  1338. }
  1339. #define SDS_LOCK_TIMEOUT (unsigned)-1
  1340. class CStressTestBase : public CInterface, implements IInterface
  1341. {
  1342. StringAttr name;
  1343. unsigned occurence;
  1344. public:
  1345. IMPLEMENT_IINTERFACE;
  1346. CStressTestBase(const char *_name, unsigned _occurence) : name(_name), occurence(_occurence)
  1347. {
  1348. }
  1349. void threadmain()
  1350. {
  1351. PROGLOG("test; %s - start", name.get());
  1352. test();
  1353. PROGLOG("test; %s - end", name.get());
  1354. }
  1355. virtual void test() = 0;
  1356. const char *queryName() { return name; }
  1357. unsigned queryOccurence() { return occurence; }
  1358. };
  1359. class CStressTest1 : public CStressTestBase
  1360. {
  1361. public:
  1362. CStressTest1(unsigned o) : CStressTestBase("StressTest1", o) { }
  1363. virtual void test()
  1364. {
  1365. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/common", myProcessSession(), RTM_CREATE|RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
  1366. conn->queryRoot()->addProp(queryName(), queryName());
  1367. MilliSleep(getRandom()%100);
  1368. conn->close(true);
  1369. }
  1370. };
  1371. class CStressTest2 : public CStressTestBase
  1372. {
  1373. public:
  1374. CStressTest2(unsigned o) : CStressTestBase("StressTest2", o) { }
  1375. virtual void test()
  1376. {
  1377. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/common", myProcessSession(), RTM_LOCK_READ|RTM_CREATE_ADD|RTM_DELETE_ON_DISCONNECT, SDS_LOCK_TIMEOUT);
  1378. conn->queryRoot()->addProp(queryName(), queryName());
  1379. MilliSleep(getRandom()%100);
  1380. }
  1381. };
  1382. class CStressTest3 : public CStressTestBase
  1383. {
  1384. public:
  1385. CStressTest3(unsigned o) : CStressTestBase("StressTest3", o) { }
  1386. virtual void test()
  1387. {
  1388. // semantics a little odd, lock ensures n
  1389. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/common/", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
  1390. conn->queryRoot()->addProp(queryName(), queryName());
  1391. MilliSleep(getRandom()%100);
  1392. conn->close(true);
  1393. }
  1394. };
  1395. class CStressTest4 : public CStressTestBase
  1396. {
  1397. public:
  1398. CStressTest4(unsigned o) : CStressTestBase("StressTest4", o) { }
  1399. virtual void test()
  1400. {
  1401. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/common", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  1402. // possible legitimate exception : ambiguous
  1403. if (conn) // may not exist
  1404. {
  1405. conn->changeMode(RTM_LOCK_WRITE); // non-exclusive->exclusive lock, something else can grab and orphan, possible exception: orphaned node
  1406. conn->queryRoot()->addProp(queryName(), queryName());
  1407. MilliSleep(getRandom()%100);
  1408. }
  1409. }
  1410. };
  1411. class CStressTest5 : public CStressTestBase
  1412. {
  1413. public:
  1414. CStressTest5(unsigned o) : CStressTestBase("StressTest5", o) { }
  1415. virtual void test()
  1416. {
  1417. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/common", myProcessSession(), RTM_LOCK_WRITE|RTM_DELETE_ON_DISCONNECT, SDS_LOCK_TIMEOUT);
  1418. // possible legitimate exception : ambiguous
  1419. if (conn) // may not exist
  1420. {
  1421. conn->changeMode(RTM_LOCK_READ);
  1422. conn->queryRoot()->addProp(queryName(), queryName());
  1423. MilliSleep(getRandom()%100);
  1424. }
  1425. }
  1426. };
  1427. class CStressTest6 : public CStressTestBase
  1428. {
  1429. public:
  1430. CStressTest6(unsigned o) : CStressTestBase("StressTest6", o) { }
  1431. virtual void test()
  1432. {
  1433. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2/ext/", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
  1434. MemoryBuffer mb;
  1435. void *mem = mb.reserveTruncate(100*1024);
  1436. memset(mem, 1, mb.length());
  1437. conn->queryRoot()->setPropBin("bin", mb.length(), mb.toByteArray());
  1438. conn.clear();
  1439. MilliSleep(getRandom()%500);
  1440. conn.setown(querySDS().connect("/Tests/Stress2/ext/", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
  1441. conn->queryRoot()->removeProp("bin");
  1442. conn.clear();
  1443. }
  1444. };
  1445. class CStressPoolFactory : public CInterface, public IThreadFactory
  1446. {
  1447. class CStressPoolHandler : public CInterface, implements IPooledThread
  1448. {
  1449. Linked<CStressTestBase> stressTest;
  1450. unsigned delay, test;
  1451. public:
  1452. IMPLEMENT_IINTERFACE;
  1453. virtual void init(void *startInfo) override
  1454. {
  1455. stressTest.set((CStressTestBase *)startInfo);
  1456. }
  1457. virtual void threadmain() override
  1458. {
  1459. try
  1460. {
  1461. stressTest->threadmain();
  1462. }
  1463. catch (IException *e)
  1464. {
  1465. StringBuffer s(stressTest->queryName());
  1466. EXCLOG(e, s.append(" failure").str());
  1467. }
  1468. }
  1469. virtual bool canReuse() const override
  1470. {
  1471. return true;
  1472. }
  1473. virtual bool stop() override
  1474. {
  1475. return true;
  1476. }
  1477. };
  1478. public:
  1479. IMPLEMENT_IINTERFACE;
  1480. IPooledThread *createNew()
  1481. {
  1482. return new CStressPoolHandler();
  1483. }
  1484. };
  1485. struct Range
  1486. {
  1487. unsigned which;
  1488. unsigned lower;
  1489. unsigned upper;
  1490. };
  1491. static int rangeFind(const void *_key, const void *e)
  1492. {
  1493. unsigned key = *(unsigned *)_key;
  1494. Range &range = *(Range *)e;
  1495. if (key < range.lower)
  1496. return -1;
  1497. else if (key >= range.upper)
  1498. return 1;
  1499. else
  1500. return 0;
  1501. }
  1502. void TestStress2()
  1503. {
  1504. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/Stress2", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  1505. conn->changeMode(RTM_LOCK_READ);
  1506. Owned<CStressPoolFactory> factory = new CStressPoolFactory();
  1507. Owned<IThreadPool> threadPool = createThreadPool("Stress2 Thread Pool", factory, NULL, 60);
  1508. unsigned totalCount = 0;
  1509. unsigned subCount = 1;
  1510. unsigned whichTest = (unsigned)-1;
  1511. unsigned o = testParams.ordinality();
  1512. if (o)
  1513. {
  1514. totalCount = atoi(testParams.item(0));
  1515. if (o>1)
  1516. whichTest = atoi(testParams.item(1));
  1517. }
  1518. CIArrayOf<CStressTestBase> tests;
  1519. tests.append(*new CStressTest1(20));
  1520. tests.append(*new CStressTest2(20));
  1521. tests.append(*new CStressTest3(15));
  1522. tests.append(*new CStressTest4(15));
  1523. tests.append(*new CStressTest5(15));
  1524. tests.append(*new CStressTest6(15));
  1525. MemoryBuffer rangeMb;
  1526. Range *ranges = (Range *) rangeMb.reserveTruncate(sizeof(Range) * tests.ordinality());
  1527. unsigned r = 0;
  1528. ForEachItemIn(p, tests)
  1529. {
  1530. Range &range = ranges[p];
  1531. range.which = p;
  1532. unsigned perc = tests.item(p).queryOccurence();
  1533. range.lower = r;
  1534. r += perc;
  1535. assertex(r <= 100);
  1536. range.upper = r;
  1537. }
  1538. // count # of each test
  1539. seedRandom((unsigned)get_cycles_now());
  1540. bool stop = false;
  1541. while (!stop)
  1542. {
  1543. unsigned test;
  1544. if ((unsigned)-1 == whichTest)
  1545. {
  1546. test = getRandom() % 100;
  1547. const void *result = bsearch(&test, ranges, tests.ordinality(), sizeof(Range), rangeFind);
  1548. if (result)
  1549. {
  1550. Range &range = *(Range *)result;
  1551. test = range.which;
  1552. }
  1553. else
  1554. test = NotFound;
  1555. }
  1556. else
  1557. {
  1558. test = whichTest-1; // (input = 1 based)
  1559. if (test >= tests.ordinality())
  1560. throw MakeStringException(0, "Test out of range, there are only %d tests", tests.ordinality());
  1561. }
  1562. if (NotFound == test)
  1563. {
  1564. PROGLOG("No test run this cycle");
  1565. unsigned delay = getRandom()%200;
  1566. MilliSleep(delay);
  1567. }
  1568. else
  1569. {
  1570. unsigned _subCount = subCount;
  1571. while (_subCount--)
  1572. {
  1573. unsigned delay = getRandom()%200+150;
  1574. MilliSleep(delay);
  1575. try
  1576. {
  1577. threadPool->startNoBlock(&tests.item(test));
  1578. }
  1579. catch (IException *e)
  1580. {
  1581. EXCLOG(e, NULL);
  1582. MilliSleep(1000);
  1583. }
  1584. if (totalCount && --totalCount == 0)
  1585. {
  1586. stop = true;
  1587. break;
  1588. }
  1589. }
  1590. }
  1591. }
  1592. threadPool->joinAll();
  1593. }
  1594. #define UNDERTHRESHOLD 0x10
  1595. #define OVERTHRESHOLD 0x100000
  1596. #define CREATEINITIAL
  1597. //#define REMOVEEXT
  1598. void TestExternal()
  1599. {
  1600. Owned<IRemoteConnection> conn;
  1601. IPropertyTree *root;
  1602. size32_t sz;
  1603. unsigned l;
  1604. void *test = malloc(OVERTHRESHOLD);
  1605. MemoryBuffer mb;
  1606. mb.setBuffer(OVERTHRESHOLD, test, true);
  1607. memset(test, 'A', OVERTHRESHOLD);
  1608. StringBuffer extPropName("testExternal");
  1609. extPropName.append(getRandom()%3);
  1610. for (l=0; l<2; l++)
  1611. {
  1612. conn.setown(querySDS().connect("/Tests", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, 2000*MDELAY));
  1613. root = conn->queryRoot();
  1614. #ifdef CREATEINITIAL
  1615. {
  1616. Owned<IFile> ifile = createIFile("c:\\utils\\pkzipc.exe");
  1617. Owned<IFileIO> fileIO = ifile->open(IFOread);
  1618. assertex(fileIO);
  1619. sz = (size32_t)ifile->size();
  1620. void *mem = malloc(sz);
  1621. fileIO->read(0, sz, mem);
  1622. root->setPropBin(extPropName.str(), sz, mem);
  1623. free(mem);
  1624. }
  1625. PROGLOG("Writing binary to SDS, size=%d", sz);
  1626. conn->commit();
  1627. PROGLOG("Written binary to SDS, size=%d", sz);
  1628. conn.clear();
  1629. #endif
  1630. conn.setown(querySDS().connect("/Tests", myProcessSession(), RTM_LOCK_READ, 2000*MDELAY));
  1631. root = conn->queryRoot();
  1632. MemoryBuffer mb;
  1633. PROGLOG("Reading binary to SDS");
  1634. verifyex(root->getPropBin(extPropName.str(), mb));
  1635. {
  1636. Owned<IFile> ifile = createIFile("testExt.exe");
  1637. Owned<IFileIO> fileIO = ifile->open(IFOcreate);
  1638. assertex(fileIO);
  1639. sz = mb.length();
  1640. fileIO->write(0, mb.length(), mb.toByteArray());
  1641. }
  1642. PROGLOG("Read back binary, size=%d", sz);
  1643. PROGLOG("Writing large string");
  1644. char *str = (char *)test;
  1645. str[32000] = '\0';
  1646. root->setProp("largeString", str);
  1647. const char *b = root->queryProp("@sds:ext");
  1648. conn->commit();
  1649. PROGLOG("Written large string");
  1650. conn.clear();
  1651. }
  1652. return;
  1653. conn.setown(querySDS().connect("/Tests", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, 2000*MDELAY));
  1654. root = conn->queryRoot();
  1655. root->setPropBin(extPropName.str(), UNDERTHRESHOLD, test);
  1656. PROGLOG("setting binary to small, size=%d", UNDERTHRESHOLD);
  1657. conn->commit();
  1658. PROGLOG("set binary to small, size=%d", UNDERTHRESHOLD);
  1659. root->setPropBin(extPropName.str(), 1024, test);
  1660. PROGLOG("setting binary to big, size=%d", OVERTHRESHOLD);
  1661. conn->commit();
  1662. PROGLOG("set binary to big, size=%d", OVERTHRESHOLD);
  1663. #ifdef REMOVEEXT
  1664. root->removeProp(extPropName.str());
  1665. PROGLOG("removing binary to small, size=%d", UNDERTHRESHOLD);
  1666. conn->commit();
  1667. PROGLOG("removed prop");
  1668. #endif
  1669. }
  1670. class CSubTest : public Thread
  1671. {
  1672. public:
  1673. CSubTest(const char *_path) : path(_path) { start(); }
  1674. virtual int run()
  1675. {
  1676. srand( (unsigned)time( NULL ) );
  1677. try
  1678. {
  1679. unsigned extra = fastRand()%2 ? RTM_LOCK_SUB : 0;
  1680. Owned<IRemoteConnection> conn2 = querySDS().connect(path, myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|extra, 1000);
  1681. }
  1682. catch (IException *e)
  1683. {
  1684. StringBuffer s("Timed out connecting to");
  1685. s.append(path);
  1686. PrintExceptionLog(e, s.str());
  1687. }
  1688. return 1;
  1689. }
  1690. private:
  1691. StringAttr path;
  1692. };
  1693. void TestSubLocks()
  1694. {
  1695. IPropertyTree *root;
  1696. { Owned<IRemoteConnection> conn = querySDS().connect("/Tests", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE, 2000*MDELAY);
  1697. }
  1698. Owned<IRemoteConnection> conn1 = querySDS().connect("/Tests/SubLocks", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|RTM_LOCK_SUB, WAIT_FOREVER);
  1699. root = conn1->queryRoot();
  1700. root->setProp("b1", "branch 1");
  1701. root->setProp("b2", "branch 2");
  1702. conn1->commit();
  1703. try
  1704. {
  1705. // expect to fail.
  1706. Owned<IRemoteConnection> conn2 = querySDS().connect("/Tests/SubLocks/b1", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|RTM_LOCK_SUB, 1000);
  1707. }
  1708. catch (IException *e)
  1709. {
  1710. PrintExceptionLog(e, "Timed out connecting to /Tests/SubLocks/b1");
  1711. }
  1712. conn1.clear();
  1713. conn1.setown(querySDS().connect("/Tests/SubLocks/b1", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|RTM_LOCK_SUB, 1000));
  1714. try
  1715. {
  1716. // expect to succeed.
  1717. Owned<IRemoteConnection> conn = querySDS().connect("/Tests/SubLocks/b2", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|RTM_LOCK_SUB, 1000);
  1718. }
  1719. catch (IException *e)
  1720. {
  1721. PrintExceptionLog(e, "Timed out connecting to /Tests/SubLocks/b2");
  1722. }
  1723. try
  1724. {
  1725. // expect to fail.
  1726. Owned<IRemoteConnection> conn2 = querySDS().connect("/Tests/SubLocks", myProcessSession(), RTM_CREATE_QUERY|RTM_LOCK_WRITE|RTM_LOCK_SUB, 1000);
  1727. }
  1728. catch (IException *e)
  1729. {
  1730. PrintExceptionLog(e, "Timed out connecting to /Tests/SubLocks");
  1731. }
  1732. // random testing
  1733. unsigned num = 100;
  1734. IArrayOf<CSubTest> threads;
  1735. StringArray paths;
  1736. paths.append("/Tests/SubLocks");
  1737. paths.append("/Tests/SubLocks/b1");
  1738. paths.append("/Tests/SubLocks/b2");
  1739. unsigned i;
  1740. for (i=0; i<num; i++)
  1741. {
  1742. CSubTest * t = new CSubTest(paths.item(fastRand()%paths.ordinality()));
  1743. threads.append(* t);
  1744. }
  1745. PROGLOG("joining");
  1746. for (i=0; i<num; i++)
  1747. {
  1748. threads.item(i).join();
  1749. }
  1750. PROGLOG("SubLocks test done");
  1751. }
  1752. void TestSDS1()
  1753. {
  1754. StringBuffer xml;
  1755. ISDSManager &sdsManager = querySDS();
  1756. IRemoteConnection *conn;
  1757. IPropertyTree *root;
  1758. #ifdef TSUB
  1759. Owned<TestSubscription> ts = new TestSubscription();
  1760. SubscriptionId id = querySDS().subscribe("/subtest", *ts, false, true);
  1761. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1762. root = conn->queryRoot();
  1763. unsigned i;
  1764. for (i=0; i<10; i++)
  1765. {
  1766. while (root->removeProp("subtest"));
  1767. conn->commit();
  1768. root->setPropInt("subtest", i+1);
  1769. root->setProp("subtest[1]/aaa", "2");
  1770. root->setProp("subtest[1]/aaa/aaasub", "2");
  1771. root->setProp("subtest[1]/bbb", "2");
  1772. root->setProp("blah", "3");
  1773. conn->commit();
  1774. }
  1775. conn->Release();
  1776. querySDS().unsubscribe(id);
  1777. return;
  1778. #endif
  1779. #if 1
  1780. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE | RTM_LOCK_SUB, 2000*MDELAY);
  1781. root = conn->queryRoot();
  1782. IPropertyTree *t1 = createPTree();
  1783. IPropertyTree *t2 = root->setPropTree("t2", createPTree());
  1784. IPropertyTree *_t1 = t2->setPropTree("t1", t1);
  1785. _t1->setPropTree("under_t1", createPTree());
  1786. IPropertyTree *t3 = root->setPropTree("t3", createPTree());
  1787. conn->commit();
  1788. _t1 = t2->getPropTree("t1");
  1789. t2->removeTree(_t1);
  1790. conn->commit();
  1791. t3->addPropTree("newName", _t1);
  1792. conn->commit();
  1793. conn->Release();
  1794. return;
  1795. Owned<IPropertyTreeIterator> diter = root->getElements("*");
  1796. while (diter->first())
  1797. {
  1798. IPropertyTree &child = diter->query();
  1799. PROGLOG("child = %s", child.queryName());
  1800. root->removeTree(&child);
  1801. }
  1802. conn->commit();
  1803. conn->Release();
  1804. #endif
  1805. #if 1 // test attribute deletion etc.
  1806. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1807. root = conn->queryRoot();
  1808. root->removeProp("attrtree");
  1809. IPropertyTree *attrTree = createPTree();
  1810. attrTree->addProp("@rattr1", "a");
  1811. attrTree->addProp("@rattr2", "b");
  1812. attrTree->addProp("@rattr3", "c");
  1813. attrTree->addProp("@rattr4", "d");
  1814. attrTree->addProp("adt", "e");
  1815. attrTree->addProp("adt/@attr1", "f");
  1816. attrTree->addProp("adt/@attr2", "g");
  1817. attrTree->addProp("adt/@attr3", "h");
  1818. root->setPropTree("attrtree", attrTree);
  1819. conn->Release();
  1820. conn = sdsManager.connect("/attrtree", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1821. root = conn->queryRoot();
  1822. verifyex(root->removeProp("@rattr3"));
  1823. verifyex(root->removeProp("adt/@attr3"));
  1824. conn->Release();
  1825. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1826. root = conn->queryRoot();
  1827. verifyex(root->removeProp("attrtree"));
  1828. conn->Release();
  1829. #endif
  1830. #if 1 // test1 qualified add/set
  1831. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1832. root = conn->queryRoot();
  1833. root->removeProp("blah");
  1834. root->addProp("blah", "blahv1");
  1835. conn->commit();
  1836. root->addProp("blah", "blahv2");
  1837. root->setProp("blah[2]/subb", "ggg");
  1838. conn->commit();
  1839. root->addProp("blah", "blahv3");
  1840. conn->commit();
  1841. conn->Release();
  1842. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1843. root = conn->queryRoot();
  1844. root->removeProp("blah");
  1845. conn->Release();
  1846. #endif
  1847. #if 1 // test2 qualified add/set
  1848. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1849. root = conn->queryRoot();
  1850. root->setProp("Software", "test");
  1851. root->addProp("Software", "jaketest1");
  1852. root->addProp("Software[1]/hello", "jaketest2-hello1");
  1853. root->addProp("Software[1]/hello[1]", "jaketest2-hello2");
  1854. root->addProp("Software", "jaketest3");
  1855. root->addProp("Software[2]", "jaketest4");
  1856. root->addProp("Software[1]", "jaketest5");
  1857. root->addProp("Software[1]/hello[1]", "jaketest5-hello1");
  1858. root->addProp("Software[2]/hello[1]", "jaketestX2-hello0");
  1859. conn->commit();
  1860. IPropertyTreeIterator *iter = root->getElements("Software");
  1861. iter->first();
  1862. int x=0;
  1863. while (iter->isValid())
  1864. {
  1865. IPropertyTree &t = iter->query();
  1866. printf("x=%d, val=%s\n", x, t.queryProp(NULL));
  1867. iter->next();
  1868. x++;
  1869. }
  1870. iter->Release();
  1871. StringBuffer s;
  1872. toXML(root, s);
  1873. printf("XML : %s\n", s.str());
  1874. conn->Release();
  1875. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1876. root = conn->queryRoot();
  1877. root->removeProp("Software");
  1878. conn->Release();
  1879. #endif
  1880. #if 1 // test similar to DFS file release.
  1881. // create f (local)
  1882. IPropertyTree *f = createPTree("file", ipt_caseInsensitive);
  1883. IPropertyTree *p = createPTree("part", ipt_caseInsensitive);
  1884. p->setProp("@num","1");
  1885. p->setProp("filename","testfile1.d00._1_of_3");
  1886. p->setProp("node","192.168.0.3");
  1887. f->addPropTree("part",p);
  1888. p = createPTree("part", ipt_caseInsensitive);
  1889. p->setProp("@num","2");
  1890. p->setProp("filename","testfile1.d00._2_of_3");
  1891. p->setProp("node","192.168.0.3");
  1892. f->addPropTree("part",p);
  1893. p = createPTree("part", ipt_caseInsensitive);
  1894. p->setProp("@num","3");
  1895. p->setProp("filename","testfile1.d00._3_of_3");
  1896. p->setProp("node","192.168.0.3");
  1897. f->addPropTree("part",p);
  1898. f->setProp("directory","c:\\thordata");
  1899. f->setProp("@name","testfile1");
  1900. IPropertyTree *f2 = createPTree("file", ipt_caseInsensitive);
  1901. p = createPTree("part", ipt_caseInsensitive);
  1902. p->setProp("@num","1");
  1903. p->setProp("filename","testfile2.d00._1_of_3");
  1904. p->setProp("node","192.168.0.3");
  1905. f2->addPropTree("part",p);
  1906. p = createPTree("part", ipt_caseInsensitive);
  1907. p->setProp("@num","2");
  1908. p->setProp("filename","testfile2.d00._2_of_3");
  1909. p->setProp("node","192.168.0.3");
  1910. f2->addPropTree("part",p);
  1911. p = createPTree("part", ipt_caseInsensitive);
  1912. p->setProp("@num","3");
  1913. p->setProp("f2ilename","testfile2.d00._3_of_3");
  1914. p->setProp("node","192.168.0.3");
  1915. f2->addPropTree("part",p);
  1916. f2->setProp("directory","c:\\thordata");
  1917. f2->setProp("@name","testfile2");
  1918. IPropertyTree *f3 = createPTree("file", ipt_caseInsensitive);
  1919. p = createPTree("part", ipt_caseInsensitive);
  1920. p->setProp("@num","1");
  1921. p->setProp("filename","testfile3.d00._1_of_3");
  1922. p->setProp("node","192.168.0.3");
  1923. f3->addPropTree("part",p);
  1924. p = createPTree("part", ipt_caseInsensitive);
  1925. p->setProp("@num","2");
  1926. p->setProp("filename","testfile3.d00._2_of_3");
  1927. p->setProp("node","192.168.0.3");
  1928. f3->addPropTree("part",p);
  1929. p = createPTree("part", ipt_caseInsensitive);
  1930. p->setProp("@num","3");
  1931. p->setProp("filename","testfile3.d00._3_of_3");
  1932. p->setProp("node","192.168.0.3");
  1933. f3->addPropTree("part",p);
  1934. f3->setProp("directory","c:\\thordata");
  1935. f3->setProp("@name","testfile3");
  1936. conn = sdsManager.connect("/Files", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  1937. root = conn->getRoot();
  1938. const char *name = root->queryName();
  1939. IPropertyTree *sroot = createPTree();
  1940. sroot->setProp("@name","nigel");
  1941. sroot = root->addPropTree("scope",sroot);
  1942. IPropertyTree *sroot2 = createPTree();
  1943. sroot2->setProp("@name","test");
  1944. sroot2 = sroot->addPropTree("scope",sroot2);
  1945. bool b1 = root->removeProp("scope[@name=\"nigel\"]/scope[@name=\"test\"]/file[@name=\"testfile1\"]");
  1946. sroot2->addPropTree("file",f);
  1947. conn->commit();
  1948. bool b2 = root->removeProp("scope[@name=\"nigel\"]/scope[@name=\"test\"]/file[@name=\"testfile2\"]");
  1949. sroot2->addPropTree("file",f2);
  1950. conn->commit();
  1951. bool b3 = root->removeProp("scope[@name=\"nigel\"]/scope[@name=\"test\"]/file[@name=\"testfile3\"]");
  1952. sroot2->addPropTree("file",f3);
  1953. conn->commit();
  1954. root->Release();
  1955. conn->Release();
  1956. /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  1957. conn = sdsManager.connect("Files/scope[@name=\"nigel\"]/scope[@name=\"test\"]/file[@name=\"testfile2\"]", myProcessSession(), RTM_LOCK_WRITE, 5000*MDELAY);
  1958. root = conn->queryRoot();
  1959. toXML(root, xml.clear());
  1960. PROGLOG("previously committed file : %s", xml.str());
  1961. conn->Release();
  1962. #endif
  1963. /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  1964. // conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 5000*MDELAY);
  1965. // conn = sdsManager.connect("/", myProcessSession(), 0, 5000*MDELAY);
  1966. // conn = sdsManager.connect("/newbranch", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, 5000*MDELAY);
  1967. // conn = querySDS().connect("/Files/scope[@name=\"test\"]/file[@name=\"testfile1\"]", myProcessSession(), RTM_LOCK_READ, 10000*MDELAY);
  1968. // conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_UNIQUE, 5000*MDELAY);
  1969. // conn = sdsManager.connect("/nonexist", myProcessSession(), RTM_LOCK_WRITE, 5000*MDELAY);
  1970. // conn = querySDS().connect("/Files", myProcessSession(), RTM_LOCK_READ, 10000*MDELAY);
  1971. // conn = querySDS().connect("/Files/WorkUnit", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_ADD, 10000*MDELAY);
  1972. // conn = querySDS().connect("Files/scope[@name=\"nigel\"]/scope[@name=\"test\"]/file[@name=\"testfile3\"]", myProcessSession(), RTM_LOCK_READ, 10000*MDELAY);
  1973. // conn = querySDS().connect("Files/scope[@name=\"nigel\"]/scope[@name=\"test\"]", myProcessSession(), RTM_LOCK_READ, 10000*MDELAY);
  1974. #if 1 // CREATE_ADD test
  1975. conn = sdsManager.connect("/newbranch", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_ADD, 5000*MDELAY);
  1976. root = conn->getRoot();
  1977. root->setPropInt("@id", 5);
  1978. IPropertyTree *a = createPTree(ipt_caseInsensitive);
  1979. a->setProp("@attr1", "123");
  1980. root->setPropTree("file", LINK(a));
  1981. root->Release();
  1982. conn->Release();
  1983. //////////////////
  1984. conn = sdsManager.connect("/newbranch[@id=\"5\"]", myProcessSession(), RTM_LOCK_WRITE, 5000*MDELAY);
  1985. root = conn->queryRoot();
  1986. bool b = root->removeProp("file[@attr1=\"123\"]");
  1987. conn->Release();
  1988. #endif
  1989. #if 1 // CREATE test
  1990. conn = sdsManager.connect("/newbranch", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, 5000*MDELAY);
  1991. root = conn->getRoot();
  1992. root->setPropInt("@id", 10);
  1993. root->setPropInt("sub1", 5);
  1994. IPropertyTree *sub = root->queryPropTree("sub1");
  1995. IPropertyTree *subsub = createPTree();
  1996. subsub->setProp("hello", "there");
  1997. sub->setPropTree("hellosubsub", subsub);
  1998. root->Release();
  1999. conn->Release();
  2000. conn = sdsManager.connect("/newbranch[@id=\"10\"]", myProcessSession(), RTM_LOCK_WRITE, 5000*MDELAY);
  2001. root = conn->queryRoot();
  2002. sub = root->queryPropTree("sub1");
  2003. toXML(sub, xml.clear());
  2004. PROGLOG("hello = %s", xml.str());
  2005. conn->Release();
  2006. #endif
  2007. }
  2008. void testDfuStreamRead(StringArray &params)
  2009. {
  2010. // reads a DFS file
  2011. try
  2012. {
  2013. const char *fname = params.item(0);
  2014. const char *filter = nullptr;
  2015. const char *outputECLFormat = nullptr;
  2016. if (params.ordinality()>1)
  2017. {
  2018. filter = params.item(1);
  2019. if (isEmptyString(filter))
  2020. filter = nullptr;
  2021. if (params.ordinality()>2)
  2022. {
  2023. outputECLFormat = params.item(2);
  2024. if (isEmptyString(outputECLFormat))
  2025. outputECLFormat = nullptr;
  2026. }
  2027. }
  2028. Owned<IUserDescriptor> userDesc = createUserDescriptor();
  2029. userDesc->set("jsmith","password");
  2030. Owned<IDFUFileAccess> srcFile = lookupDFUFile(fname, "testDfuStreamRead", 300, userDesc);
  2031. if (!srcFile)
  2032. {
  2033. WARNLOG("File '%s' not found!", fname);
  2034. return;
  2035. }
  2036. IOutputMetaData *meta = srcFile->queryEngineInterface()->queryMeta();
  2037. CommonXmlWriter xmlWriter(XWFnoindent);
  2038. unsigned sourceN = srcFile->queryNumParts();
  2039. for (unsigned p=0; p<sourceN; p++)
  2040. {
  2041. Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p, 0, nullptr, true);
  2042. if (outputECLFormat)
  2043. {
  2044. reader->setOutputRecordFormat(outputECLFormat);
  2045. meta = reader->queryMeta();
  2046. }
  2047. if (filter)
  2048. reader->addFieldFilter(filter);
  2049. reader->start();
  2050. while (true)
  2051. {
  2052. size32_t sz;
  2053. const void *row = reader->nextRow(sz);
  2054. if (!row)
  2055. {
  2056. if (!srcFile->queryIsGrouped())
  2057. break;
  2058. row = reader->nextRow(sz);
  2059. if (!row)
  2060. break;
  2061. }
  2062. meta->toXML((const byte *)row, xmlWriter.clear());
  2063. PROGLOG("Row: %s", xmlWriter.str());
  2064. }
  2065. }
  2066. }
  2067. catch (IException *e)
  2068. {
  2069. EXCLOG(e, nullptr);
  2070. e->Release();
  2071. }
  2072. }
  2073. void testDfuStreamWrite(const char *fname)
  2074. {
  2075. // reads a DFS file and writes it to <filename>_copy
  2076. try
  2077. {
  2078. Owned<IUserDescriptor> userDesc = createUserDescriptor();
  2079. userDesc->set("jsmith","password");
  2080. const char *newFileName = "dfsstream::newfile1";
  2081. if (!isEmptyString(fname))
  2082. newFileName = fname;
  2083. const char *newEclRecDef = "{ string10 fname; string10 sname; unsigned4 age; };";
  2084. Owned<IDFUFileAccess> tgtFile = createDFUFile(newFileName, "mythor", dft_flat, newEclRecDef, "datest-write-newfile1", 300, true, userDesc); // NB: compressed file
  2085. // NB: must match record definition
  2086. struct Row
  2087. {
  2088. std::string fname;
  2089. std::string sname;
  2090. unsigned age;
  2091. };
  2092. const std::array<Row, 6> rows = { { { "John ", "Smith ", 59 },
  2093. { "Samuel ", "Peeps ", 39 },
  2094. { "Bob ", "Marks ", 12 },
  2095. { "Jake ", "Smith ", 12 },
  2096. { "Paul ", "Smith ", 12 },
  2097. { "Sarah ", "Potters ", 28 }
  2098. }
  2099. };
  2100. offset_t fileSize = 0;
  2101. unsigned numRecs = 0;
  2102. unsigned targetN = tgtFile->queryNumParts();
  2103. for (unsigned p=0; p<targetN; p++)
  2104. {
  2105. Owned<IDFUFilePartWriter> writer = tgtFile->createFilePartWriter(p);
  2106. writer->start();
  2107. unsigned numPartRecs = 0;
  2108. offset_t partSize = 0;
  2109. for (auto &row: rows)
  2110. {
  2111. char rowMem[24];
  2112. memcpy(rowMem, row.fname.c_str(), 10);
  2113. memcpy(rowMem+10, row.sname.c_str(), 10);
  2114. memcpy(rowMem+20, &row.age, sizeof(row.age));
  2115. writer->write(sizeof(rowMem), rowMem);
  2116. partSize += sizeof(rowMem);
  2117. ++numPartRecs;
  2118. }
  2119. tgtFile->setPartPropertyInt(p, "@recordCount", numPartRecs); // JCSMORE
  2120. tgtFile->setPartPropertyInt(p, "@size", partSize);
  2121. numRecs += numPartRecs;
  2122. fileSize += partSize;
  2123. }
  2124. tgtFile->setFilePropertyInt("@recordCount", numRecs);
  2125. tgtFile->setFilePropertyInt("@size", fileSize);
  2126. publishDFUFile(tgtFile, true, userDesc);
  2127. tgtFile.clear();
  2128. // Read the file back
  2129. Owned<IDFUFileAccess> srcFile = lookupDFUFile(newFileName, "datest-read-newfile1", 300, userDesc);
  2130. if (!srcFile)
  2131. {
  2132. WARNLOG("File '%s' not found!", newFileName);
  2133. return;
  2134. }
  2135. unsigned sourceN = srcFile->queryNumParts();
  2136. for (unsigned p=0; p<sourceN; p++)
  2137. {
  2138. Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p);
  2139. // filter by Smith and project to new format
  2140. reader->addFieldFilter("sname=['Smith']");
  2141. reader->setOutputRecordFormat("{ string5 age; string20 fname; };");
  2142. reader->start();
  2143. while (true)
  2144. {
  2145. size32_t sz;
  2146. const byte *row = (const byte *)reader->nextRow(sz);
  2147. if (!row)
  2148. {
  2149. if (!srcFile->queryIsGrouped())
  2150. break;
  2151. row = (const byte *)reader->nextRow(sz);
  2152. if (!row)
  2153. break;
  2154. }
  2155. PROGLOG("Row: age=%.*s, fname=%.*s", 5, row, 20, row+5);
  2156. }
  2157. }
  2158. }
  2159. catch (IException *e)
  2160. {
  2161. EXCLOG(e, nullptr);
  2162. e->Release();
  2163. }
  2164. }
  2165. void testDfuStreamCopy(const char *srcFileName)
  2166. {
  2167. // reads a DFS file and writes it to <filename>_copy
  2168. try
  2169. {
  2170. if (isEmptyString(srcFileName))
  2171. throw makeStringException(0, "no source logical filename supplied");
  2172. Owned<IUserDescriptor> userDesc = createUserDescriptor();
  2173. userDesc->set("jsmith","password");
  2174. Owned<IDFUFileAccess> srcFile = lookupDFUFile(srcFileName, "datest", 60, userDesc);
  2175. if (!srcFile)
  2176. {
  2177. WARNLOG("File '%s' not found", srcFileName);
  2178. return;
  2179. }
  2180. IDFUFileAccessExt *srcFileEx = srcFile->queryEngineInterface();
  2181. const char *eclRecDef = srcFileEx->queryProperties().queryProp("ECL");
  2182. if (!eclRecDef)
  2183. throw makeStringExceptionV(0, "File '%s' has no record definition", srcFileName);
  2184. IOutputMetaData *srcMeta = srcFileEx->queryMeta();
  2185. const char *srcGroup = srcFile->queryClusterGroupName();
  2186. const char *clusterName = startsWith(srcGroup, "hthor__") ? "myeclagent" : "mythor";
  2187. StringBuffer tgtFileName(srcFileName);
  2188. tgtFileName.append("_copy");
  2189. Owned<IDFUFileAccess> tgtFile = createDFUFile(tgtFileName, clusterName, dft_flat, eclRecDef, "myRequestId", 300, false, userDesc);
  2190. IDFUFileAccessExt *tgtFileEx = srcFile->queryEngineInterface();
  2191. unsigned tgtFileParts = tgtFile->queryNumParts();
  2192. unsigned currentWriterPart = 0;
  2193. Owned<IDFUFilePartWriter> writer;
  2194. unsigned numRecs = 0;
  2195. unsigned srcFileParts = srcFile->queryNumParts();
  2196. unsigned tally = srcFileParts;
  2197. for (unsigned p=0; p<srcFileParts; p++)
  2198. {
  2199. Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p);
  2200. reader->start();
  2201. if (tally >= srcFileParts)
  2202. {
  2203. tally -= srcFileParts;
  2204. writer.setown(tgtFile->createFilePartWriter(currentWriterPart++));
  2205. writer->start();
  2206. }
  2207. tally += tgtFileParts;
  2208. while (true)
  2209. {
  2210. size32_t sz;
  2211. const void *row = reader->nextRow(sz);
  2212. if (!row)
  2213. {
  2214. if (!srcFile->queryIsGrouped())
  2215. break;
  2216. row = reader->nextRow(sz);
  2217. if (!row)
  2218. break;
  2219. }
  2220. ++numRecs;
  2221. CommonXmlWriter xmlwrite(0);
  2222. srcMeta->toXML((const byte *)row, xmlwrite);
  2223. PROGLOG("row: %s", xmlwrite.str());
  2224. writer->write(sz, row);
  2225. }
  2226. }
  2227. writer.clear();
  2228. // write some blank parts, if src # parts less than target # parts
  2229. while (currentWriterPart<tgtFileParts)
  2230. {
  2231. writer.setown(tgtFile->createFilePartWriter(currentWriterPart++));
  2232. writer->start();
  2233. writer.clear();
  2234. }
  2235. PROGLOG("numRecs writtern = %u", numRecs);
  2236. tgtFileEx->queryProperties().setPropInt64("@recordCount", numRecs);
  2237. //tgtFileEx->queryProperties().setPropInt64("@size", fileSize);
  2238. publishDFUFile(tgtFile, true, userDesc);
  2239. // read it back for good measure
  2240. Owned<IDFUFileAccess> newSrcFile = lookupDFUFile(tgtFileName, "datest", 60, userDesc);
  2241. if (!newSrcFile)
  2242. {
  2243. WARNLOG("File '%s' not found", tgtFileName.str());
  2244. return;
  2245. }
  2246. IOutputMetaData *tgtMeta = tgtFileEx->queryMeta();
  2247. CommonXmlWriter xmlWriter(0);
  2248. for (unsigned p=0; p<tgtFileParts; p++)
  2249. {
  2250. Owned<IDFUFilePartReader> reader = newSrcFile->createFilePartReader(p);
  2251. reader->start();
  2252. while (true)
  2253. {
  2254. size32_t sz;
  2255. const void *row = reader->nextRow(sz);
  2256. if (!row)
  2257. break;
  2258. ++numRecs;
  2259. tgtMeta->toXML((const byte *)row, xmlWriter);
  2260. PROGLOG("new file row: %s", xmlWriter.str());
  2261. }
  2262. }
  2263. }
  2264. catch (IException *e)
  2265. {
  2266. EXCLOG(e, nullptr);
  2267. e->Release();
  2268. }
  2269. }
  2270. class CClientTestSDS : public Thread
  2271. {
  2272. public:
  2273. CClientTestSDS() : Thread("ClientTestSDS"){ }
  2274. virtual int run()
  2275. {
  2276. try
  2277. {
  2278. TestSDS1();
  2279. }
  2280. catch (IException *e)
  2281. {
  2282. pexception("CClientTestSDS", e);
  2283. e->Release();
  2284. }
  2285. return 0;
  2286. }
  2287. };
  2288. void TestSDS2()
  2289. {
  2290. CClientTestSDS *t1 = new CClientTestSDS();
  2291. CClientTestSDS *t2 = new CClientTestSDS();
  2292. t1->start();
  2293. t2->start();
  2294. t1->join();
  2295. t2->join();
  2296. t1->Release();
  2297. t2->Release();
  2298. }
  2299. struct SDS3Params
  2300. {
  2301. ReadWriteLock *reinitLock;
  2302. IGroup *group;
  2303. };
  2304. class TestSDS3TestThread : public CInterface, implements IPooledThread
  2305. {
  2306. CThreaded threaded;
  2307. Owned<IRemoteConnection> conn;
  2308. StringAttr xpath;
  2309. unsigned mode, action;
  2310. ReadWriteLock *reinitLock;
  2311. IGroup *group;
  2312. public:
  2313. IMPLEMENT_IINTERFACE;
  2314. TestSDS3TestThread() : threaded("TestSDS3TestThread") { }
  2315. virtual void init(void *param) override
  2316. {
  2317. SDS3Params *params = (SDS3Params *)param;
  2318. reinitLock = params->reinitLock;
  2319. group = params->group;
  2320. }
  2321. virtual bool stop() override { return true; }
  2322. virtual bool canReuse() const override { return true; }
  2323. virtual void threadmain() override
  2324. {
  2325. action = getRandom() % 8;
  2326. mode = getRandom() % 2;
  2327. if (4 == action || 5 == action)
  2328. xpath.set("/aparent");
  2329. else
  2330. xpath.set("/aparent/achild");
  2331. if (3 == action || 1 == action)
  2332. mode = 0;
  2333. if (7 == action)
  2334. {
  2335. Sleep(1000);
  2336. WriteLockBlock b(*reinitLock);
  2337. PROGLOG("shutdown / reinit test");
  2338. reinitClientProcess(group, DCR_Testing);
  2339. }
  2340. else
  2341. {
  2342. unsigned times = getRandom() % 20 + 10;
  2343. while (times--)
  2344. {
  2345. MilliSleep(getRandom() %100 + 100);
  2346. ReadLockBlock b(*reinitLock);
  2347. conn.setown(querySDS().connect(xpath.get(), myProcessSession(), mode, 10000*MDELAY));
  2348. if (!conn)
  2349. {
  2350. for (;;)
  2351. {
  2352. PROGLOG("creating initial branch");
  2353. conn.setown(querySDS().connect(xpath.get(), myProcessSession(), RTM_CREATE| RTM_LOCK_WRITE, 10000*MDELAY));
  2354. if (6 == action || 5==action)
  2355. {
  2356. Owned<IRemoteConnection> conn2 = querySDS().connect(xpath.get(), myProcessSession(), RTM_CREATE_ADD| RTM_LOCK_WRITE, 10000*MDELAY); // add some ambiguity
  2357. }
  2358. conn.clear();
  2359. conn.setown(querySDS().connect(xpath.get(), myProcessSession(), mode, 10000*MDELAY));
  2360. if (conn)
  2361. break;
  2362. }
  2363. }
  2364. try
  2365. {
  2366. PROGLOG("xpath=%s, mode=%d, action=%d", xpath.get(), mode, action);
  2367. switch (action)
  2368. {
  2369. case 0:
  2370. {
  2371. conn->changeMode(0);
  2372. break;
  2373. }
  2374. case 2:
  2375. {
  2376. conn->close();
  2377. break;
  2378. }
  2379. case 1:
  2380. case 3:
  2381. {
  2382. conn->close(true);
  2383. break;
  2384. }
  2385. case 4:
  2386. {
  2387. conn->queryRoot()->removeProp("achild");
  2388. break;
  2389. }
  2390. default:
  2391. break;
  2392. }
  2393. }
  2394. catch (IException *e)
  2395. {
  2396. EXCLOG(e, NULL);
  2397. e->Release();
  2398. }
  2399. conn.clear();
  2400. }
  2401. }
  2402. }
  2403. };
  2404. void TestSDS3(IGroup *group)
  2405. {
  2406. class TSDS1 : public CInterface, implements IThreadFactory
  2407. {
  2408. public:
  2409. IMPLEMENT_IINTERFACE;
  2410. virtual IPooledThread *createNew() { return new TestSDS3TestThread(); }
  2411. } poolFactory;
  2412. unsigned nthreads = testParams.ordinality()?atoi(testParams.item(0)):10;
  2413. ReadWriteLock reinitLock;
  2414. Owned<IThreadPool> pool = createThreadPool("TSDS1", &poolFactory, NULL, nthreads);
  2415. SDS3Params params;
  2416. params.reinitLock = &reinitLock;
  2417. params.group = group;
  2418. for (;;)
  2419. {
  2420. pool->start(&params, NULL, 50000); // keep starting them as they become available
  2421. }
  2422. PROGLOG("Joining all TSDSThread running threads");
  2423. pool->joinAll();
  2424. pool.clear();
  2425. }
  2426. void TestNodeSubs()
  2427. {
  2428. class CNodeSubPool : public CSimpleInterfaceOf<IThreadFactory>
  2429. {
  2430. class CNodeSubscriber : public CSimpleInterfaceOf<ISDSNodeSubscription>
  2431. {
  2432. public:
  2433. virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2434. {
  2435. PROGLOG("CNodeSubscriber notified");
  2436. }
  2437. };
  2438. SubscriptionId sid;
  2439. CriticalSection sidCrit;
  2440. Owned<ISDSNodeSubscription> subscriber;
  2441. void test()
  2442. {
  2443. try
  2444. {
  2445. unsigned t = getRandom()%5;
  2446. switch (t)
  2447. {
  2448. case 0:
  2449. {
  2450. // connect thread
  2451. PROGLOG("Creating SDS node");
  2452. Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_CREATE|RTM_LOCK_WRITE, INFINITE);
  2453. MilliSleep(5+getRandom()%50);
  2454. conn.clear();
  2455. break;
  2456. }
  2457. case 1:
  2458. {
  2459. // node sub test
  2460. CriticalBlock b(sidCrit);
  2461. if (!sid)
  2462. {
  2463. PROGLOG("Subscribing to node");
  2464. sid = querySDS().subscribeExact("/nodesubtest", *subscriber, false);
  2465. }
  2466. break;
  2467. }
  2468. case 2:
  2469. {
  2470. // node sub test
  2471. CriticalBlock b(sidCrit);
  2472. if (sid)
  2473. {
  2474. PROGLOG("Unsubscribing to node");
  2475. querySDS().unsubscribeExact(sid);
  2476. sid = 0;
  2477. }
  2478. break;
  2479. }
  2480. case 3:
  2481. {
  2482. PROGLOG("Deleting node");
  2483. Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_LOCK_WRITE, INFINITE);
  2484. if (conn)
  2485. conn->close(true);
  2486. break;
  2487. }
  2488. case 4:
  2489. {
  2490. PROGLOG("Gathering subscriber info");
  2491. StringBuffer info;
  2492. querySDS().getSubscribers(info);
  2493. if (info.length())
  2494. PROGLOG("Subscribers: \n%s", info.str());
  2495. break;
  2496. }
  2497. }
  2498. }
  2499. catch (IException *e)
  2500. {
  2501. PrintExceptionLog(e, NULL);
  2502. e->Release();
  2503. }
  2504. }
  2505. class CNodeSubThread : public CInterface, implements IPooledThread
  2506. {
  2507. CNodeSubPool &owner;
  2508. public:
  2509. IMPLEMENT_IINTERFACE;
  2510. CNodeSubThread(CNodeSubPool &_owner) : owner(_owner) { }
  2511. virtual void init(void *param) override
  2512. {
  2513. }
  2514. virtual void threadmain() override
  2515. {
  2516. owner.test();
  2517. }
  2518. virtual bool stop() override { return true; }
  2519. virtual bool canReuse() const override { return true; }
  2520. };
  2521. public:
  2522. CNodeSubPool()
  2523. {
  2524. sid = 0;
  2525. subscriber.setown(new CNodeSubscriber());
  2526. }
  2527. virtual IPooledThread *createNew()
  2528. {
  2529. return new CNodeSubThread(*this);
  2530. }
  2531. } poolFactory;
  2532. Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000);
  2533. unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10;
  2534. for (unsigned t=0; t<tests; t++)
  2535. {
  2536. pool->start(NULL);
  2537. }
  2538. PROGLOG("Joining all TSDSThread running threads");
  2539. pool->joinAll();
  2540. pool.clear();
  2541. }
  2542. void TestSDSXPaths()
  2543. {
  2544. const char *testXML =
  2545. "<ROOT attrRoot=\"9\">"
  2546. " <A attrA=\"a1\" numA=\"1\">"
  2547. " <B attrB=\"a1b1\" numB=\"1\">bval_a1b1</B>"
  2548. " </A>"
  2549. " <A attrA=\"a2\" numA=\"2\">"
  2550. " <B attrB=\"a2b1\" numB=\"2\">bval_a2_b1"
  2551. " <C attrC=\"a2b1c1\" numC=\"1\"></C>"
  2552. " </B>"
  2553. " <B attrB=\"a2b2\" numB=\"3\">bval_a2_b2"
  2554. " <C attrC=\"a2b2c1\" numC=\"2\"></C>"
  2555. " </B>"
  2556. " <B attrB=\"a2b1\" numB=\"4\">bval_a2_b3</B>"
  2557. " </A>"
  2558. " <A numA=\"3\">"
  2559. " </A>"
  2560. " <A2 numA=\"1\">"
  2561. " <B attrB=\"a2b1\">bval_a21_b1"
  2562. " <C>"
  2563. " <B>bval_a21_b1_c1_b1"
  2564. " <C></C>"
  2565. " </B>"
  2566. " </C>"
  2567. " </B>"
  2568. " </A2>"
  2569. " <A3 numA=\"1\"></A3>"
  2570. "</ROOT>";
  2571. const char *xpathTests[] = {
  2572. "A",
  2573. "A[B]",
  2574. "A[@attrA]",
  2575. "*[@numA]",
  2576. "A/B[@attrB]/C",
  2577. "A[@attrA = \"a1\"]",
  2578. "A[@attrA = \"a1*\"]",
  2579. "A/B[@attrB=\"a2*\"]",
  2580. "A[@attrA = ~\"a1*\"]",
  2581. "A[B=\"bval2\"]",
  2582. "//B[C]",
  2583. "A//B[C]",
  2584. "A[@attrA][B=\"bval2\"]",
  2585. "A/B[@numB < \"2\"]",
  2586. "A/B[@numB <= \"2\"]",
  2587. "A/B[@numB = \"2\"]",
  2588. "A/B[@numB > \"2\"]",
  2589. "A/B[@numB >= \"2\"]",
  2590. "A/B[@attrB >> \"a1b1\"]",
  2591. NULL
  2592. };
  2593. class CSplitIFileIO : public CInterface, implements IFileIO
  2594. {
  2595. IArrayOf<IFileIO> iFileIOs;
  2596. public:
  2597. IMPLEMENT_IINTERFACE;
  2598. CSplitIFileIO() { }
  2599. void addIFileIO(IFileIO *iFileIO) { iFileIOs.append(*iFileIO); }
  2600. // IFileIO
  2601. virtual size32_t read(offset_t pos, size32_t len, void * data) { UNIMPLEMENTED; return 0; }
  2602. virtual offset_t size() { UNIMPLEMENTED; return 0; }
  2603. virtual size32_t write(offset_t pos, size32_t len, const void * data)
  2604. {
  2605. size32_t sz = iFileIOs.item(0).write(pos, len, data);
  2606. unsigned i=1;
  2607. for (i=1; i<iFileIOs.ordinality(); i++)
  2608. verifyex(sz == iFileIOs.item(i).write(pos, len, data));
  2609. return sz;
  2610. }
  2611. virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
  2612. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { UNIMPLEMENTED; return 0; }
  2613. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  2614. virtual void flush() { }
  2615. virtual void close() { }
  2616. };
  2617. const char *newFileName = "xpathTests.out";
  2618. OwnedIFile newFile;
  2619. OwnedIFileIO newFileIO;
  2620. HANDLE out;
  2621. #ifdef WIN32
  2622. out = GetStdHandle(STD_OUTPUT_HANDLE);
  2623. #else
  2624. out = fileno(stdout);
  2625. #endif
  2626. Owned<IFileIO> stdOutFileIO = createIFileIO(out,IFOwrite);
  2627. if (testParams.ordinality())
  2628. {
  2629. newFileName = testParams.item(0);
  2630. newFile.setown(createIFile(newFileName));
  2631. newFileIO.setown(newFile->open(IFOcreate));
  2632. }
  2633. else
  2634. {
  2635. newFile.setown(createIFile(newFileName));
  2636. CSplitIFileIO *split= new CSplitIFileIO();
  2637. newFileIO.setown(split);
  2638. split->addIFileIO(newFile->open(IFOcreate));
  2639. split->addIFileIO(LINK(stdOutFileIO));
  2640. }
  2641. OwnedIFile newFileSecondary = createIFile("xpathTestsSecondary.out");
  2642. Owned<IIOStream> newFileIOStream;
  2643. Owned<IPropertyTree> originalTree = createPTreeFromXMLString(testXML);
  2644. Owned<IPropertyTree> tree;
  2645. unsigned l;
  2646. MemoryBuffer newOutput, secondary;
  2647. for (l=0; l<2; l++)
  2648. {
  2649. newFileIOStream.clear();
  2650. if (0 == l)
  2651. {
  2652. newFileIOStream.setown(createIOStream(newFileIO));
  2653. newFileIO.clear();
  2654. tree.set(originalTree);
  2655. }
  2656. else
  2657. {
  2658. OwnedIFileIO newFileSecondaryIO = newFileSecondary->open(IFOcreate);
  2659. newFileIOStream.setown(createBufferedIOStream(newFileSecondaryIO));
  2660. newFileSecondaryIO.clear();
  2661. Owned<IRemoteConnection> conn = querySDS().connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  2662. conn->queryRoot()->setPropTree("ROOT", LINK(originalTree));
  2663. tree.setown(createPTreeFromIPT(conn->queryRoot()->queryPropTree("ROOT")));
  2664. }
  2665. unsigned test = 0;
  2666. while (xpathTests[test] != NULL)
  2667. {
  2668. try
  2669. {
  2670. Owned<IPropertyTreeIterator> iter;
  2671. Owned<IRemoteConnection> conn;
  2672. iter.setown(tree->getElements(xpathTests[test]));
  2673. if (1 == l)
  2674. {
  2675. unsigned count = 0;
  2676. ForEach (*iter)
  2677. ++count;
  2678. if (count > 1)
  2679. {
  2680. // PROGLOG("SDS connection made to root to avoid connection ambiguity for test: %s", xpathTests[test]);
  2681. conn.setown(querySDS().connect("/ROOT", myProcessSession(), RTM_LOCK_READ, 2000*MDELAY));
  2682. iter.setown(conn->getElements(xpathTests[test]));
  2683. }
  2684. else
  2685. {
  2686. StringBuffer path("/ROOT/");
  2687. path.append(xpathTests[test]);
  2688. conn.setown(querySDS().connect(path.str(), myProcessSession(), RTM_LOCK_READ, 2000*MDELAY));
  2689. if (conn)
  2690. iter.setown(conn->queryRoot()->getElements(NULL));
  2691. else
  2692. iter.setown(createNullPTreeIterator());
  2693. }
  2694. }
  2695. unsigned count = 0;
  2696. StringBuffer outMsg("Test = ");
  2697. writeStringToStream(*newFileIOStream, outMsg.append(xpathTests[test]).newline().str());
  2698. ForEach (*iter)
  2699. {
  2700. IPropertyTree &match = iter->query();
  2701. StringBuffer out("Matched node = ");
  2702. out.append(match.queryName());
  2703. const char *value = match.queryProp(NULL);
  2704. if (value)
  2705. out.append(", value = ").append(value);
  2706. writeStringToStream(*newFileIOStream, out.newline().str());
  2707. count++;
  2708. }
  2709. writeStringToStream(*newFileIOStream, outMsg.clear().append("Match count = ").append(count).newline().str());
  2710. }
  2711. catch (IException *e)
  2712. {
  2713. StringBuffer errMsg("Test ");
  2714. errMsg.append(test).append(" \"").append(xpathTests[test]).append("\" failed");
  2715. EXCLOG(e, errMsg.str());
  2716. e->Release();
  2717. }
  2718. test++;
  2719. }
  2720. newFileIOStream.clear();
  2721. if (0 == l)
  2722. {
  2723. OwnedIFileIO newFileIO = newFile->open(IFOread);
  2724. read(newFileIO, 0, (size32_t)newFile->size(), newOutput);
  2725. newFileIO.clear();
  2726. }
  2727. else
  2728. {
  2729. OwnedIFileIO newFileSecondaryIO = newFileSecondary->open(IFOread);
  2730. read(newFileSecondaryIO, 0, (size32_t)newFileSecondary->size(), secondary);
  2731. newFileSecondaryIO.clear();
  2732. if (newOutput.length() != secondary.length() || 0 != memcmp(newOutput.toByteArray(), secondary.toByteArray(), newOutput.length()))
  2733. throw MakeStringException(0, "Local and SDS outputs mismatch");
  2734. newFileSecondary->remove();
  2735. }
  2736. }
  2737. }
  2738. void TestLocks()
  2739. {
  2740. #if 0
  2741. ICommunicator *comm = createCommunicator(mygroup);
  2742. CMessageBuffer mb;
  2743. DistributedLockId lockid;
  2744. if (mygroup->rank()==0) {
  2745. lockid = createDistributedLockId();
  2746. mb.append(lockid);
  2747. comm->send(mb,RANK_ALL,1);
  2748. }
  2749. else {
  2750. comm->recv(mb,RANK_ALL,1);
  2751. mb.read(lockid);
  2752. }
  2753. IDistributedLock *dl = createDistributedLock(lockid);
  2754. for (unsigned i=0;i<100;i++) {
  2755. Sleep(getRandom()%3000);
  2756. bool excl = (getRandom()%3)==0;
  2757. PROGLOG("getting %s lock",excl?"exclusive":"non-exclusive");
  2758. dl->lock(excl);
  2759. PROGLOG("got %s lock",excl?"exclusive":"non-exclusive");
  2760. Sleep(getRandom()%5000);
  2761. dl->unlock();
  2762. PROGLOG("release %s lock",excl?"exclusive":"non-exclusive");
  2763. }
  2764. dl->Release();
  2765. comm->Release();
  2766. #endif
  2767. }
  2768. void TestServerShutdown(IGroup *group)
  2769. {
  2770. myProcessSession();
  2771. unsigned i=0;
  2772. while (i<10) {
  2773. i++;
  2774. try {
  2775. printf("Test 1\n"); Sleep(5000); //_getch();
  2776. ISDSManager &sdsManager = querySDS();
  2777. printf("Test 2\n"); Sleep(1000); //_getch();
  2778. IRemoteConnection *conn;
  2779. conn = sdsManager.connect("/", myProcessSession(), RTM_LOCK_WRITE, 2000*MDELAY);
  2780. IPropertyTree *root = conn->queryRoot();
  2781. printf("Test 3\n"); Sleep(1000); //_getch();
  2782. root->setPropInt("subtest", i);
  2783. printf("Test 4\n"); Sleep(1000); //_getch();
  2784. conn->commit();
  2785. printf("Test 5\n"); Sleep(1000); //_getch();
  2786. conn->changeMode( RTM_LOCK_READ);
  2787. printf("Test 6\n"); Sleep(1000); //_getch();
  2788. root->setPropInt("subtest/test", i);
  2789. printf("Test 7\n"); Sleep(1000); //_getch();
  2790. conn->changeMode( 0);
  2791. printf("Test 9\n"); Sleep(1000); //_getch();
  2792. root->setPropInt("subtest/test", i+100);
  2793. printf("Test 10\n"); Sleep(1000); //_getch();
  2794. root->setPropInt("subtest/test", i+200);
  2795. conn->changeMode( RTM_LOCK_WRITE);
  2796. conn->Release();
  2797. printf("Test 11\n"); Sleep(5000); //_getch();
  2798. }
  2799. catch (IException *e) {
  2800. pexception("Exception",e);
  2801. }
  2802. reinitClientProcess(group, DCR_Testing);
  2803. }
  2804. }
  2805. #define NCCS 40
  2806. #define NCCSTHREAD 20
  2807. static CriticalSection *CCS[NCCS];
  2808. class TestCCSThread : public Thread
  2809. {
  2810. const char *name;
  2811. public:
  2812. TestCCSThread(const char *_name) : Thread(_name) { name = strdup(_name); }
  2813. ~TestCCSThread() { free((void *)name); }
  2814. int run()
  2815. {
  2816. try {
  2817. for (;;) {
  2818. Sleep(getRandom()%1000);
  2819. unsigned i = getRandom()%NCCS;
  2820. PROGLOG("%s locking %d",name,i);
  2821. CCS[i]->enter();
  2822. Sleep(getRandom()%1000);
  2823. unsigned j = getRandom()%NCCS;
  2824. PROGLOG("%s locking %d",name,j);
  2825. CCS[j]->enter();
  2826. if (getRandom()%2==0) {
  2827. unsigned t = i;
  2828. i = j;
  2829. j = t;
  2830. }
  2831. Sleep(getRandom()%1000);
  2832. PROGLOG("%s unlocking %d",name,j);
  2833. CCS[j]->leave();
  2834. Sleep(getRandom()%1000);
  2835. PROGLOG("%s unlocking %d",name,i);
  2836. CCS[i]->leave();
  2837. }
  2838. }
  2839. catch (IException *e) {
  2840. pexception("Exception",e);
  2841. }
  2842. return 0;
  2843. }
  2844. };
  2845. static void TestCriticalSection()
  2846. {
  2847. char id[16];
  2848. char num[8];
  2849. TestCCSThread *threads[NCCSTHREAD];
  2850. for (unsigned i=0;i<NCCSTHREAD; i++) {
  2851. itoa(i,num,10);
  2852. strcpy(id,"Thread");
  2853. strcat(id,num);
  2854. threads[i] = new TestCCSThread(id);
  2855. }
  2856. for (unsigned j=0;j<NCCS; j++) {
  2857. itoa(j,num,10);
  2858. strcpy(id,"CCS");
  2859. strcat(id,num);
  2860. CCS[j] = new CriticalSection();
  2861. }
  2862. unsigned k;
  2863. for (k=0;k<NCCSTHREAD; k++)
  2864. threads[k]->start();
  2865. for (k=0;k<NCCSTHREAD; k++)
  2866. threads[k]->join();
  2867. }
  2868. #define NMEMPTRS 523
  2869. class TestMemThread : public Thread
  2870. {
  2871. const char *name;
  2872. void *ptrs[NMEMPTRS];
  2873. public:
  2874. TestMemThread(const char *_name) : Thread(_name) { name = strdup(_name); memset(ptrs, 0, NMEMPTRS * sizeof(void*));}
  2875. ~TestMemThread() { free((void *)name); }
  2876. int run()
  2877. {
  2878. try {
  2879. for (;;) {
  2880. unsigned i = getRandom()%NMEMPTRS;
  2881. if (ptrs[i])
  2882. free(ptrs[i]);
  2883. if (getRandom() & 1)
  2884. ptrs[i] = malloc(getRandom());
  2885. }
  2886. }
  2887. catch (IException *e) {
  2888. pexception("Exception",e);
  2889. }
  2890. return 0;
  2891. }
  2892. };
  2893. static void TestMemThreads()
  2894. {
  2895. char id[16];
  2896. char num[8];
  2897. TestMemThread *threads[NCCSTHREAD];
  2898. for (unsigned i=0;i<NCCSTHREAD; i++) {
  2899. itoa(i,num,10);
  2900. strcpy(id,"Thread");
  2901. strcat(id,num);
  2902. threads[i] = new TestMemThread(id);
  2903. }
  2904. unsigned k;
  2905. for (k=0;k<NCCSTHREAD; k++)
  2906. threads[k]->start();
  2907. for (k=0;k<NCCSTHREAD; k++)
  2908. threads[k]->join();
  2909. }
  2910. #define NUMPTRS2 0x10000
  2911. class TestMemThread2 : public Thread
  2912. {
  2913. const char *name;
  2914. static void *ptrs[NUMPTRS2];
  2915. public:
  2916. TestMemThread2(const char *_name) : Thread(_name)
  2917. {
  2918. memset(ptrs, 0, NUMPTRS2 * sizeof(void*));
  2919. }
  2920. int run()
  2921. {
  2922. try {
  2923. unsigned i;
  2924. unsigned res=0;
  2925. for (i=0;i<NUMPTRS2;i++) {
  2926. ptrs[i] = malloc(0x10000);
  2927. if (!ptrs[i])
  2928. break;
  2929. if (i%100==0)
  2930. printf("%d\n",i);
  2931. memset(ptrs[i],i,0x10000);
  2932. res+=0x10000;
  2933. }
  2934. while (--i)
  2935. free(ptrs[i]);
  2936. printf("allocated %x",res);
  2937. }
  2938. catch (IException *e) {
  2939. pexception("Exception",e);
  2940. }
  2941. catch (...) {
  2942. printf("unknown exception!");
  2943. }
  2944. return 0;
  2945. }
  2946. };
  2947. void *TestMemThread2::ptrs[NUMPTRS2];
  2948. void testMultiConnect()
  2949. {
  2950. const char *grpname = "dummy" ; // "thor18way"; // "thor_data400" ;
  2951. Owned<IGroup> grp = queryNamedGroupStore().lookup(grpname);
  2952. if (!grp) {
  2953. printf("Group %s not found\n",grpname);
  2954. return;
  2955. }
  2956. SocketEndpointArray eps;
  2957. unsigned j=0;
  2958. for (j=0;j<grp->ordinality();j++) {
  2959. SocketEndpoint ep = grp->queryNode(j).endpoint();
  2960. ep.port = 5051;
  2961. eps.append(ep);
  2962. }
  2963. class cNotify: implements ISocketConnectNotify
  2964. {
  2965. void connected(unsigned idx,const SocketEndpoint &ep,ISocket *socket)
  2966. {
  2967. StringBuffer epstr;
  2968. ep.getUrlStr(epstr);
  2969. printf("%s suceeded\n",epstr.str());
  2970. }
  2971. void failed(unsigned idx,const SocketEndpoint &ep,int err)
  2972. {
  2973. StringBuffer epstr;
  2974. ep.getUrlStr(epstr);
  2975. printf("%s failed (%d)\n",epstr.str(),err);
  2976. }
  2977. } notify;
  2978. unsigned t = msTick();
  2979. IPointerArrayOf<ISocket> out;
  2980. multiConnect(eps,notify,5000);
  2981. printf("connect took %d\n",msTick()-t);
  2982. }
  2983. void testlockprop(const char *lfn)
  2984. {
  2985. Owned<IDistributedFile> f1 = queryDistributedFileDirectory().lookup(lfn,UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  2986. Owned<IDistributedFile> f2 = queryDistributedFileDirectory().lookup(lfn,UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser);
  2987. f1->lockProperties();
  2988. f1->unlockProperties();
  2989. printf("done\n");
  2990. }
  2991. void usage(const char *error=NULL)
  2992. {
  2993. if (error) printf("%s\n", error);
  2994. printf("usage: DATEST <server_ip:port>* [/test <name> [<test params...>] [/NITER <iterations>]\n");
  2995. printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE | NODESUBS | DFUSTREAMREAD | DFUSTREAMWRITE | DFUSTREAMCOPY\n");
  2996. printf("eg: datest . /test QTEST put -- one coven server running locally, running qtest with param \"put\"\n");
  2997. printf(" datest eq0001016 eq0001017 -- two coven servers, use default test %s\n", DEFAULT_TEST);
  2998. }
  2999. struct ReleaseAtomBlock { ~ReleaseAtomBlock() { releaseAtoms(); } };
  3000. int main(int argc, char* argv[])
  3001. {
  3002. ReleaseAtomBlock rABlock;
  3003. InitModuleObjects();
  3004. EnableSEHtoExceptionMapping();
  3005. try {
  3006. StringBuffer cmd;
  3007. splitFilename(argv[0], NULL, NULL, &cmd, NULL);
  3008. StringBuffer lf;
  3009. openLogFile(lf, cmd.toLowerCase().append(".log").str());
  3010. #if defined(TEST_MEMTHREADS)
  3011. printf("start...\n");
  3012. TestMemThread2 t("test");
  3013. t.start();
  3014. t.join();
  3015. printf("end...\n");
  3016. return 0;
  3017. #endif
  3018. // Non dali tests
  3019. #if defined(TEST_THREADS)
  3020. TestMemThreads(); // doesn't terminate
  3021. #endif
  3022. #if defined(TEST_DEADLOCK)
  3023. TestCriticalSection(); // doesn't terminate
  3024. #endif
  3025. #if defined(TEST_REMOTEFILE)
  3026. TestRemoteFile(atoi(argv[1]),18);
  3027. return 0;
  3028. #endif
  3029. #if defined(TEST_REMOTEFILE2)
  3030. TestRemoteFile2();
  3031. return 0;
  3032. #endif
  3033. #if defined(TEST_REMOTEFILE3)
  3034. int nfiles = 1000;
  3035. int fsizemb = 512;
  3036. if(argc >= 2)
  3037. nfiles = atoi(argv[1]);
  3038. if(argc >= 3)
  3039. fsizemb = atoi(argv[2]);
  3040. TestRemoteFile3(nfiles, fsizemb);
  3041. return 0;
  3042. #endif
  3043. #if defined(TEST_COPYFILE)
  3044. if(argc >= 3)
  3045. TestCopyFile(argv[1], argv[2]);
  3046. else
  3047. PROGLOG("TestCopyFile(src-file, dst-file) missing arguments");
  3048. return 0;
  3049. #endif
  3050. if (argc<2) {
  3051. usage();
  3052. return 1;
  3053. }
  3054. printf("DATEST Starting\n");
  3055. SocketEndpoint ep;
  3056. SocketEndpointArray epa;
  3057. enum { unspecified, daservers, testparams } state = daservers;
  3058. unsigned i=0;
  3059. while (i<(unsigned)argc-1)
  3060. {
  3061. if (argv[++i][0]=='/')
  3062. {
  3063. if (0 == strcmp("?", argv[i]+1) || 0 == stricmp("HELP", argv[i]))
  3064. {
  3065. usage();
  3066. return 1;
  3067. }
  3068. else if (0 == stricmp("TEST", argv[i]+1))
  3069. {
  3070. state = testparams;
  3071. if (i==argc-1) { usage("missing test name"); return 1; }
  3072. whichTest = argv[++i];
  3073. }
  3074. else if (0 == stricmp("NITER", argv[i]+1))
  3075. {
  3076. state = unspecified;
  3077. if (i==argc-1) { usage("missing /NITER #"); return 1; }
  3078. nIter = atoi(argv[++i]);
  3079. }
  3080. else
  3081. {
  3082. usage("unrecognised option switch");
  3083. return 1;
  3084. }
  3085. }
  3086. else
  3087. {
  3088. switch (state)
  3089. {
  3090. case daservers:
  3091. ep.set(argv[i],DALI_SERVER_PORT);
  3092. epa.append(ep);
  3093. break;
  3094. case testparams:
  3095. testParams.append(argv[i]);
  3096. break;
  3097. default:
  3098. assertex(false);
  3099. }
  3100. }
  3101. }
  3102. if (!epa.ordinality())
  3103. {
  3104. usage("No dali servers specified");
  3105. return 1;
  3106. }
  3107. IGroup *group = createIGroup(epa);
  3108. if (TEST("SESSION"))
  3109. initClientProcess(group,DCR_Testing, testParams.ordinality() ? 0 : 7777);
  3110. else
  3111. initClientProcess(group, DCR_Testing);
  3112. //testlockprop("test::propagated_matchrecs");
  3113. for(unsigned iter=0;iter<nIter;iter++)
  3114. {
  3115. if (TEST("RANDTEST"))
  3116. {
  3117. switch (getRandom()%12) {
  3118. case 0: Test_DFS(); break;
  3119. case 1: QTest(true); break;
  3120. case 2: QTest(false); break;
  3121. case 3: QTest2(true); break;
  3122. case 4: QTest2(false); break;
  3123. case 5: TestSDS1(); break;
  3124. case 6: TestStress(); break;
  3125. case 7: TestSDS2(); break;
  3126. case 8: TestServerShutdown(group); break;
  3127. case 9: TestExternal(); break;
  3128. case 10: TestSubLocks(); break;
  3129. case 11: TestSDS3(group); break;
  3130. case 12: TestNodeSubs(); break;
  3131. }
  3132. }
  3133. else if (TEST("DFS"))
  3134. Test_DFS();
  3135. else if (TEST("SUPERFILE"))
  3136. Test_SuperFile();
  3137. else if (TEST("SUPERFILE2"))
  3138. Test_SuperFile2();
  3139. else if (TEST("MULTIFILE"))
  3140. Test_MultiFile();
  3141. else if (TEST("DFSU"))
  3142. Test_DFSU();
  3143. else if (TEST("SESSION"))
  3144. Test_Session(testParams.ordinality()?testParams.item(0):NULL);
  3145. else if (TEST("QTEST"))
  3146. QTest(testParams.ordinality()&&0==stricmp(testParams.item(0),"PUT"));
  3147. else if (TEST("QTEST2"))
  3148. QTest2(testParams.ordinality()&&0==stricmp(testParams.item(0),"PUT"));
  3149. else if (TEST("LOCKS"))
  3150. TestLocks();
  3151. else if (TEST("SDS1"))
  3152. TestSDS1();
  3153. else if (TEST("SDS2"))
  3154. TestSDS2();
  3155. else if (TEST("SDS3"))
  3156. TestSDS3(group);
  3157. else if (TEST("NODESUBS"))
  3158. TestNodeSubs();
  3159. else if (TEST("XPATHS"))
  3160. TestSDSXPaths();
  3161. else if (TEST("STRESS"))
  3162. TestStress();
  3163. else if (TEST("STRESS2"))
  3164. TestStress2();
  3165. else if (TEST("EXTERNAL"))
  3166. TestExternal();
  3167. else if (TEST("SUBLOCKS"))
  3168. TestSubLocks();
  3169. else if (TEST("SUBSCRIPTION"))
  3170. testSubscription(testParams.ordinality()&&0!=atoi(testParams.item(0)), testParams.isItem(1)?atoi(testParams.item(1)):-1, testParams.isItem(2)?atoi(testParams.item(2)):-1);
  3171. else if (TEST("CONNECTIONSUBS"))
  3172. testConnectionSubscription();
  3173. else if (TEST("SHUTDOWN"))
  3174. TestServerShutdown(group);
  3175. else if (TEST("FILEPARTS"))
  3176. Test_PartIter();
  3177. else if (TEST("MULTICONNECT"))
  3178. testMultiConnect();
  3179. else if (TEST("DFUSTREAMREAD"))
  3180. testDfuStreamRead(testParams);
  3181. else if (TEST("DFUSTREAMWRITE"))
  3182. testDfuStreamWrite(testParams.ordinality() ? testParams.item(0) : nullptr);
  3183. else if (TEST("DFUSTREAMCOPY"))
  3184. testDfuStreamCopy(testParams.ordinality() ? testParams.item(0) : nullptr);
  3185. // else if (TEST("DALILOG"))
  3186. // testDaliLog(testParams.ordinality()&&0!=atoi(testParams.item(0)));
  3187. else
  3188. {
  3189. usage("Unknown test");
  3190. return 1;
  3191. }
  3192. }
  3193. group->Release();
  3194. closedownClientProcess();
  3195. }
  3196. catch (IException *e) {
  3197. pexception("Exception",e);
  3198. }
  3199. catch (...) { if (!TEST("RANDTEST")) throw; }
  3200. return 0;
  3201. }