dfuwutest.cpp 46 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jmisc.hpp"
  16. #include "dfuwu.hpp"
  17. #include "jsuperhash.hpp"
  18. #include "mpbase.hpp"
  19. #include "mpcomm.hpp"
  20. #include "rmtfile.hpp"
  21. #include "daclient.hpp"
  22. #include "dafdesc.hpp"
  23. #include "dadfs.hpp"
  24. #include "dautils.hpp"
  25. #include "dfuutil.hpp"
  26. void testProgressUpdate()
  27. {
  28. // TBD
  29. }
  30. void testAbort(const char *wuid)
  31. {
  32. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  33. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(wuid,false);
  34. if (wu) {
  35. wu->requestAbort();
  36. }
  37. else
  38. UERRLOG("WUID %s not found", wuid);
  39. }
  40. StringBuffer& constructFileMask(const char* filename, StringBuffer& filemask)
  41. {
  42. filemask.clear().append(filename).toLowerCase().append("._$P$_of_$N$");
  43. return filemask;
  44. }
  45. void testProgressMonitor(const char *wuid)
  46. {
  47. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  48. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(wuid,false);
  49. class cProgressMon: public CInterface, implements IDFUprogressSubscriber
  50. {
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. void notify(IConstDFUprogress *progress)
  54. {
  55. StringBuffer s;
  56. progress->formatProgressMessage(s);
  57. PROGLOG(" %s",s.str());
  58. }
  59. } progressmon;
  60. wu->subscribeProgress(&progressmon);
  61. wu->waitForCompletion(1000*10*60);
  62. wu->subscribeProgress(NULL);
  63. IConstDFUprogress* progress = wu->queryProgress();
  64. StringBuffer s;
  65. PROGLOG(" state = %s",encodeDFUstate(progress->getState(),s.clear()).str());
  66. progress->formatSummaryMessage(s.clear());
  67. PROGLOG(" %s",s.str());
  68. }
  69. void copyTest()
  70. {
  71. for (unsigned i=0;i<1000;i++) {
  72. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  73. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  74. wu->setClusterName("thor18way");
  75. StringBuffer teststr;
  76. teststr.append("Test").append(i+1);
  77. wu->setJobName(teststr.str());
  78. wu->setQueue("dfuserver_queue");
  79. wu->setUser("Nigel");
  80. IDFUfileSpec *source = wu->queryUpdateSource();
  81. IDFUfileSpec *destination = wu->queryUpdateDestination();
  82. IDFUoptions *options = wu->queryUpdateOptions();
  83. IDFUprogress *progress = wu->queryUpdateProgress();
  84. StringBuffer srcname;
  85. srcname.append("thor_dev::testfilem").append(i%8+1);
  86. StringBuffer dstname;
  87. dstname.append("thor_dev::testfilem").append(i%8+2);
  88. wu->setCommand(DFUcmd_copy);
  89. source->setLogicalName(srcname.str());
  90. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  91. Owned<IGroup> grp = queryNamedGroupStore().lookup("thor18way");
  92. for (unsigned j=0;j<18;j++) {
  93. StringBuffer filename;
  94. filename.append("/c$/thordata/testfile").append(i%8+2).append("M._");
  95. filename.append(j+1);
  96. filename.append("_of_18");
  97. fdesc->setPart(j,&grp->queryNode(j),filename.str());
  98. }
  99. destination->setFromFileDescriptor(*fdesc);
  100. destination->setLogicalName(dstname.str());
  101. options->setReplicate(true);
  102. options->setOverwrite(true);
  103. submitDFUWorkUnit(wu.getClear());
  104. }
  105. }
  106. void importTest()
  107. {
  108. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  109. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  110. wu->setClusterName("thor18way");
  111. StringBuffer teststr;
  112. teststr.append("Import Test");
  113. wu->setJobName(teststr.str());
  114. wu->setQueue("dfuserver_queue");
  115. wu->setUser("Nigel");
  116. IDFUfileSpec *source = wu->queryUpdateSource();
  117. IDFUfileSpec *destination = wu->queryUpdateDestination();
  118. IDFUoptions *options = wu->queryUpdateOptions();
  119. IDFUprogress *progress = wu->queryUpdateProgress();
  120. StringBuffer dstname;
  121. dstname.append("thor_dev::testinput");
  122. wu->setCommand(DFUcmd_import);
  123. SocketEndpoint destep("rigel",7071);
  124. RemoteFilename destfn;
  125. destfn.setPath(destep,"/export/home/nhicks/daliservix/testfile.d00");
  126. source->setSingleFilename(destfn);
  127. source->setRecordSize(64); // needed cause going to split file
  128. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  129. Owned<IGroup> grp = queryNamedGroupStore().lookup("thor18way");
  130. for (unsigned j=0;j<18;j++) {
  131. StringBuffer filename;
  132. filename.append("/c$/thordata/importtest._");
  133. filename.append(j+1);
  134. filename.append("_of_18");
  135. fdesc->setPart(j, &grp->queryNode(j),filename.str());
  136. }
  137. destination->setFromFileDescriptor(*fdesc);
  138. destination->setLogicalName(dstname.str());
  139. options->setReplicate(true);
  140. options->setOverwrite(true);
  141. submitDFUWorkUnit(wu.getClear());
  142. }
  143. void SprayTest(unsigned num)
  144. {
  145. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  146. StringBuffer tmp;
  147. for (unsigned n = 0; n<num; n++) {
  148. for (int spray=1;spray>=0;spray--) {
  149. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  150. wu->setClusterName("thor_fi_01");
  151. if (spray)
  152. tmp.clear().append("Test spray #").append(n+1);
  153. else
  154. tmp.clear().append("Test despray #").append(n+1);
  155. wu->setJobName(tmp.str());
  156. wu->setQueue("dfuserver_queue");
  157. wu->setUser("Nigel");
  158. IDFUfileSpec *source = wu->queryUpdateSource();
  159. IDFUfileSpec *destination = wu->queryUpdateDestination();
  160. IDFUoptions *options = wu->queryUpdateOptions();
  161. IDFUprogress *progress = wu->queryUpdateProgress();
  162. if (spray) {
  163. wu->setCommand(DFUcmd_import);
  164. RemoteFilename rfn;
  165. SocketEndpoint ep("192.168.168.254");
  166. rfn.setPath(ep,"/data/testfile1.d00");
  167. source->setSingleFilename(rfn);
  168. source->setTitle("testfile1.d00");
  169. source->setRecordSize(32); // needed cause going to split file
  170. destination->setLogicalName("testing::nigel::testfile1");
  171. destination->setDirectory("/c$/thordata/testing/nigel");
  172. destination->setFileMask("testfile1._$P$_of_$N$");
  173. destination->setGroupName("thor_fi_01");
  174. options->setReplicate(true);
  175. options->setOverwrite(true);
  176. }
  177. else {
  178. wu->setCommand(DFUcmd_export);
  179. source->setLogicalName("testing::nigel::testfile1");
  180. RemoteFilename rfn;
  181. SocketEndpoint ep("192.168.168.254");
  182. rfn.setPath(ep,"/data/testfile1.d01");
  183. destination->setSingleFilename(rfn);
  184. destination->setTitle("testfile1.d02");
  185. options->setOverwrite(true);
  186. }
  187. PROGLOG("submitting %s",wu->queryId());
  188. submitDFUWorkUnit(wu.getClear());
  189. }
  190. }
  191. }
  192. static void printDesc(IFileDescriptor *desc)
  193. {
  194. if (desc) {
  195. Owned<IPropertyTree> pt = desc->getFileTree();
  196. StringBuffer out;
  197. toXML(pt,out);
  198. PROGLOG("\n%s",out.str());
  199. unsigned n = desc->numParts();
  200. PROGLOG(" numParts = %d",n);
  201. PROGLOG(" numCopies(0) = %d",desc->numCopies(0));
  202. StringBuffer tmp1;
  203. unsigned i;
  204. for (i=0;i<n;i++) {
  205. unsigned copy;
  206. for (copy = 0;copy<desc->numCopies(i);copy++) {
  207. RemoteFilename rfn;
  208. desc->getFilename(i,copy,rfn);
  209. PROGLOG(" file (%d,%d) = %s",copy,i,rfn.getRemotePath(tmp1.clear()).str());
  210. }
  211. }
  212. }
  213. }
  214. void testWUcreate(int kind,StringBuffer &wuidout)
  215. {
  216. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  217. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  218. wu->setClusterName("thor_data400");
  219. wu->setJobName("test job");
  220. wu->setQueue("nigel_dfuserver_queue");
  221. wu->setUser("Nigel");
  222. IDFUfileSpec *source = wu->queryUpdateSource();
  223. IDFUfileSpec *destination = wu->queryUpdateDestination();
  224. IDFUoptions *options = wu->queryUpdateOptions();
  225. IDFUprogress *progress = wu->queryUpdateProgress();
  226. switch (kind) {
  227. case 1: // 1->N spray
  228. {
  229. wu->setCommand(DFUcmd_import);
  230. RemoteFilename rfn;
  231. SocketEndpoint ep("172.16.18.175");
  232. rfn.setPath(ep,"/export/home/nhicks/test/testfile1.d00");
  233. source->setSingleFilename(rfn);
  234. source->setTitle("testfile1.d00");
  235. source->setRecordSize(32); // needed cause going to split file
  236. destination->setLogicalName("thor_dev::nigel::testfile1");
  237. destination->setDirectory("c:\\thordata\\thor_dev\\nigel");
  238. destination->setFileMask("testfile1._$P$_of_$N$");
  239. destination->setGroupName("thor_dev");
  240. options->setReplicate(true);
  241. options->setOverwrite(true);
  242. }
  243. break;
  244. case 2: // N->1 despray
  245. {
  246. wu->setCommand(DFUcmd_export);
  247. source->setLogicalName("thor_dev::nigel::testfile1");
  248. RemoteFilename rfn;
  249. SocketEndpoint ep("172.16.18.175");
  250. rfn.setPath(ep,"/export/home/nhicks/test/testfile2.d00");
  251. destination->setSingleFilename(rfn);
  252. destination->setTitle("testfile2.d00");
  253. }
  254. break;
  255. case 3: // N->N copy
  256. { // simple copy using group as destination
  257. wu->setCommand(DFUcmd_copy);
  258. source->setLogicalName("thor_dev::nigel::testfile1");
  259. destination->setLogicalName("thor_dev::nigel::testfile2");
  260. destination->setDirectory("c:\\thordata\\thor_dev\\nigel");
  261. destination->setFileMask("testfile2._$P$_of_$N$");
  262. destination->setGroupName("thor_dev");
  263. options->setReplicate(true);
  264. }
  265. break;
  266. case 4: // N->M copy
  267. { // copy specifying nodes
  268. wu->setCommand(DFUcmd_copy);
  269. source->setLogicalName("thor_dev::nigel::testfile2");
  270. destination->setLogicalName("nigeltest::testfile3");
  271. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  272. Owned<INode> node;
  273. node.setown(createINode("10.173.4.3"));
  274. fdesc->setPart(0,node,"/c$/roxiedata/part._1_of_10");
  275. node.setown(createINode("10.173.4.4"));
  276. fdesc->setPart(1,node,"/c$/roxiedata/part._2_of_10");
  277. node.setown(createINode("10.173.4.5"));
  278. fdesc->setPart(2,node,"/c$/roxiedata/part._3_of_10");
  279. node.setown(createINode("10.173.4.6"));
  280. fdesc->setPart(3,node,"/c$/roxiedata/part._4_of_10");
  281. node.setown(createINode("10.173.4.7"));
  282. fdesc->setPart(4,node,"/c$/roxiedata/part._5_of_10");
  283. node.setown(createINode("10.173.4.8"));
  284. fdesc->setPart(5,node,"/c$/roxiedata/part._6_of_10");
  285. node.setown(createINode("10.173.4.9"));
  286. fdesc->setPart(6,node,"/c$/roxiedata/part._7_of_10");
  287. node.setown(createINode("10.173.4.10"));
  288. fdesc->setPart(7,node,"/c$/roxiedata/part._8_of_10");
  289. node.setown(createINode("10.173.4.11"));
  290. fdesc->setPart(8,node,"/c$/roxiedata/part._9_of_10");
  291. node.setown(createINode("10.173.4.12"));
  292. fdesc->setPart(9,node,"/c$/roxiedata/part._10_of_10");
  293. destination->setFromFileDescriptor(*fdesc);
  294. options->setReplicate(true);
  295. destination->setWrap(true);
  296. destination->setNumPartsOverride(50);
  297. Owned<IFileDescriptor> fdesc2 = destination->getFileDescriptor(false);
  298. printDesc(fdesc2);
  299. }
  300. break;
  301. // tests
  302. case 5: // N->N copy
  303. { // simple copy using group as destination
  304. wu->setCommand(DFUcmd_copy);
  305. source->setLogicalName("nigel18::dist1.out");
  306. destination->setLogicalName("nigel18::dist1.cpy");
  307. destination->setDirectory("c:\\thordata\\nigel18");
  308. destination->setFileMask("dist1.cpy._$P$_of_$N$");
  309. destination->setGroupName("nigel18");
  310. options->setReplicate(true);
  311. }
  312. break;
  313. case 6: // 1->N spray
  314. { // simple copy using group as destination
  315. wu->setCommand(DFUcmd_import);
  316. RemoteFilename rfn;
  317. SocketEndpoint ep("10.150.10.16");
  318. rfn.setPath(ep,"c:\\thordata\\out1.d00._1_of_1");
  319. source->setSingleFilename(rfn);
  320. source->setTitle("testfile1.d00");
  321. source->setRecordSize(16); // needed cause going to split file
  322. destination->setLogicalName("nigel18::testout1");
  323. destination->setDirectory("c:\\thordata\\nigel18");
  324. destination->setFileMask("testout1._$P$_of_$N$");
  325. destination->setGroupName("nigel18");
  326. options->setReplicate(true);
  327. options->setOverwrite(true);
  328. }
  329. break;
  330. case 7: // copy from an external Dali
  331. {
  332. wu->setCommand(DFUcmd_copy);
  333. source->setLogicalName("nigel18::dist1.out");
  334. SocketEndpoint ep("10.150.10.75");
  335. source->setForeignDali(ep);
  336. destination->setLogicalName("nigel18::dist2.cpy");
  337. destination->setDirectory("c:\\thordata\\nigel18");
  338. destination->setFileMask("dist2.cpy._$P$_of_$N$");
  339. destination->setGroupName("nigel18");
  340. options->setReplicate(true);
  341. options->setOverwrite(true);
  342. }
  343. break;
  344. case 8: // monitor test
  345. {
  346. wu->setCommand(DFUcmd_monitor);
  347. wu->setQueue("dfumonitor_queue");
  348. IDFUmonitor *monitor = wu->queryUpdateMonitor();
  349. source->setLogicalName("nigel18::dist1.out");
  350. monitor->setShotLimit(1);
  351. }
  352. break;
  353. case 9: // copy test
  354. {
  355. wu->setCommand(DFUcmd_copy);
  356. SocketEndpoint ep("10.150.29.161");
  357. source->setForeignDali(ep);
  358. source->setForeignUser("nhicks","h1ck5");
  359. source->setLogicalName("thor_data400::nhtest::testspray1");
  360. destination->setLogicalName("thor_data400::nhtest::testspray1");
  361. destination->setDirectory("c:\\thordata\\thor_data400\\nhtest");
  362. destination->setFileMask("testspray1._$P$_of_$N$");
  363. destination->setGroupName("thor_data400");
  364. options->setReplicate(true);
  365. options->setOverwrite(true);
  366. }
  367. break;
  368. case 10: // copy test
  369. {
  370. wu->setCommand(DFUcmd_copy);
  371. SocketEndpoint ep("10.150.29.161");
  372. source->setForeignDali(ep);
  373. source->setForeignUser("nhicks","h1ck5");
  374. source->setLogicalName("thor_data400::base::hss_name_phonew20050107-143927");
  375. destination->setLogicalName("thor_data400::nhtest::testcopy1");
  376. destination->setDirectory("c:\\thordata\\thor_data400\\nhtest");
  377. destination->setFileMask("testcopy1._$P$_of_$N$");
  378. destination->setGroupName("thor_data400");
  379. options->setReplicate(true);
  380. options->setOverwrite(true);
  381. }
  382. break;
  383. }
  384. wuidout.append(wu->queryId());
  385. submitDFUWorkUnit(wu.getClear());
  386. }
  387. IFileDescriptor *createRoxieFileDescriptor(const char *cluster, const char *lfn, bool servers)
  388. {
  389. Owned<IRemoteConnection> envconn = querySDS().connect("/Environment",myProcessSession(),RTM_LOCK_READ, INFINITE);
  390. if (!envconn) {
  391. DBGLOG("Could not connect to %s",lfn);
  392. return NULL;
  393. }
  394. Owned<IPropertyTree> envroot = envconn->getRoot();
  395. StringBuffer grpname(cluster);
  396. if (servers)
  397. grpname.append("__servers");
  398. Owned<IGroup> grp = queryNamedGroupStore().lookup(grpname.str());
  399. if (!grp) {
  400. UERRLOG("Logical group %s not found",grpname.str());
  401. return NULL;
  402. }
  403. Owned<IFileDescriptor> ret = createFileDescriptor();
  404. unsigned width = grp->ordinality();
  405. for (unsigned i=0;i<width;i++) {
  406. StringBuffer filename;
  407. StringBuffer dirxpath;
  408. dirxpath.appendf("Software/RoxieCluster[@name=\"%s\"]/Roxie%sProcess[@channel=\"%d\"]/@dataDirectory",cluster,servers?"Server":"Slave",i+1);
  409. const char * dir = envroot->queryProp(dirxpath.str());
  410. if (!dir) {
  411. UERRLOG("dataDirectory not specified");
  412. return NULL;
  413. }
  414. makePhysicalPartName(lfn,i+1,width,filename,0,DFD_OSdefault,dir,false,0);
  415. RemoteFilename rfn;
  416. rfn.setPath(grp->queryNode(i).endpoint(),filename.str());
  417. ret->setPart(i,rfn,NULL);
  418. }
  419. return ret.getClear();
  420. }
  421. void testRoxieDest()
  422. {
  423. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  424. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  425. wu->setClusterName("thor_data400");
  426. wu->setJobName("test job");
  427. wu->setQueue("nigel_dfuserver_queue");
  428. wu->setUser("Nigel");
  429. IDFUfileSpec *source = wu->queryUpdateSource();
  430. IDFUfileSpec *destination = wu->queryUpdateDestination();
  431. IDFUoptions *options = wu->queryUpdateOptions();
  432. IDFUprogress *progress = wu->queryUpdateProgress();
  433. destination->setLogicalName("thor_data50::key::stuminiquery_name_password");
  434. destination->setDirectory("/e$/test");
  435. Owned<IFileDescriptor> desc = createRoxieFileDescriptor("roxie","thor_data50::key::stuminiquery_name_password",false);
  436. destination->setFromFileDescriptor(*desc);
  437. StringBuffer fileMask;
  438. constructFileMask("thor_data50::key::stuminiquery_name_password", fileMask);
  439. destination->setFileMask(fileMask.str());
  440. destination->setGroupName("roxie");
  441. PROGLOG("%s",wu->queryId());
  442. }
  443. void testRoxieCopies()
  444. {
  445. queryNamedGroupStore().add("__test_cluster_10", { "10.173.10.81-90" }, true);
  446. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  447. for (unsigned i= 0; i<18;i++) {
  448. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  449. wu->setClusterName("thor_data400");
  450. wu->setJobName("test job");
  451. wu->setQueue("nigel_dfuserver_queue");
  452. wu->setUser("Nigel");
  453. IDFUfileSpec *source = wu->queryUpdateSource();
  454. IDFUfileSpec *destination = wu->queryUpdateDestination();
  455. IDFUoptions *options = wu->queryUpdateOptions();
  456. IDFUprogress *progress = wu->queryUpdateProgress();
  457. #if 1
  458. destination->setLogicalName("thor_data50::key::testfile");
  459. #else
  460. destination->setDirectory("/c$/roxiedata/thor_data50/key");
  461. destination->setFileMask("testfile._$P$_of_$N$");
  462. #endif
  463. destination->setGroupName("__test_cluster_10");
  464. unsigned np = (i>=9)?51:10;
  465. ClusterPartDiskMapSpec mspec;
  466. switch(i%9) {
  467. case 0:
  468. if (np==10)
  469. np = 5;
  470. PROGLOG("%d parts, full_redundancy",np);
  471. mspec.setRoxie(1,1,0);
  472. destination->setClusterPartDiskMapSpec("__test_cluster_10",mspec);
  473. destination->setClusterPartDefaultBaseDir("__test_cluster_10","/c$/roxiedata");
  474. options->setReplicate();
  475. break;
  476. case 1:
  477. PROGLOG("%d parts, cyclic_redundancy",np);
  478. mspec.setRoxie(1,2,1);
  479. destination->setClusterPartDiskMapSpec("__test_cluster_10",mspec);
  480. destination->setClusterPartDefaultBaseDir("__test_cluster_10","/c$/roxiedata");
  481. options->setReplicate();
  482. break;
  483. case 2:
  484. PROGLOG("%d parts, no_redundancy",np);
  485. mspec.setRoxie(0,1);
  486. destination->setClusterPartDiskMapSpec("__test_cluster_10",mspec);
  487. destination->setClusterPartDefaultBaseDir("__test_cluster_10","/c$/roxiedata");
  488. break;
  489. case 3:
  490. PROGLOG("%d parts, overloaded",np);
  491. if (np==10)
  492. np = 20;
  493. mspec.setRoxie(0,2);
  494. destination->setClusterPartDiskMapSpec("__test_cluster_10",mspec);
  495. destination->setClusterPartDefaultBaseDir("__test_cluster_10","/c$/roxiedata");
  496. break;
  497. case 4:
  498. PROGLOG("%d parts, thor_cyclic_redundancy",np);
  499. mspec.setRoxie(1,2,1);
  500. destination->setClusterPartDiskMapSpec("__test_cluster_10",mspec);
  501. destination->setClusterPartDefaultBaseDir("__test_cluster_10","/c$/roxiedata");
  502. options->setReplicate();
  503. break;
  504. case 5:
  505. options->setReplicate();
  506. // fall through
  507. default:
  508. if (i==8)
  509. np = 20;
  510. PROGLOG("%d parts, mapping mode %d",np,(i%9-5));
  511. destination->setClusterPartDiskMapping((DFUclusterPartDiskMapping)(i%9-5),"/c$/roxiedata", "__test_cluster_10");
  512. }
  513. #if 1
  514. destination->setNumPartsOverride(np);
  515. destination->setWrap(true); //??
  516. #endif
  517. PROGLOG("A======================");
  518. StringBuffer buf;
  519. wu->toXML(buf);
  520. PROGLOG("\n%s",buf.str());
  521. Owned<IFileDescriptor> fdesc2 = destination->getFileDescriptor(false);
  522. PROGLOG("B--------------------------");
  523. printDesc(fdesc2);
  524. PROGLOG("C--------------------------");
  525. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fdesc2);
  526. Owned<IFileDescriptor> fdesc3 = file->getFileDescriptor();
  527. printDesc(fdesc3);
  528. PROGLOG("E--------------------------");
  529. StringBuffer fn("testing::file");
  530. fn.append(i);
  531. queryDistributedFileDirectory().removeEntry(fn.str(),UNKNOWN_USER);
  532. file->attach(fn.str(),UNKNOWN_USER);
  533. file.clear();
  534. file.setown(queryDistributedFileDirectory().lookup(fn.str(),UNKNOWN_USER,AccessMode::tbdRead,false,false,nullptr,defaultNonPrivilegedUser));
  535. Owned<IFileDescriptor> fdesc4 = file->getFileDescriptor();
  536. printDesc(fdesc4);
  537. }
  538. }
  539. void test2()
  540. {
  541. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  542. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit("D20060303-110019",false);
  543. IConstDFUfileSpec *destination = wu->queryDestination();
  544. if (destination->getWrap())
  545. destination->setNumPartsOverride(51);
  546. Owned<IFileDescriptor> desc = destination->getFileDescriptor();
  547. printDesc(desc);
  548. }
  549. void test3()
  550. {
  551. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  552. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit("D20060815-145448",false);
  553. IConstDFUfileSpec *src = wu->querySource();
  554. Owned<IFileDescriptor> desc = src->getFileDescriptor();
  555. printDesc(desc);
  556. Owned<IFileDescriptor> desc2 = src->getFileDescriptor();
  557. printDesc(desc2);
  558. }
  559. void testMultiFilename()
  560. {
  561. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  562. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  563. wu->setClusterName("thor_data400");
  564. RemoteMultiFilename rmfn;
  565. rmfn.append("\"c:\\import\\tmp1\\x,y\",*._?_of_3");
  566. wu->queryUpdateSource()->setMultiFilename(rmfn);
  567. }
  568. void testIterate()
  569. {
  570. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  571. Owned<IConstDFUWorkUnitIterator> iter = //factory->getWorkUnitsByOwner("Nigel");
  572. factory->getWorkUnitsByState(DFUstate_started);
  573. StringBuffer wuid;
  574. StringBuffer s;
  575. ForEach(*iter) {
  576. if (iter->getId(wuid.clear()).length()) {
  577. Owned<IConstDFUWorkUnit> wu = iter->get();
  578. PROGLOG("%s:",wuid.str());
  579. PROGLOG(" cluster = %s",wu->getClusterName(s.clear()).str());
  580. PROGLOG(" job name = %s",wu->getJobName(s.clear()).str());
  581. PROGLOG(" queue = %s",wu->getQueue(s.clear()).str());
  582. PROGLOG(" state = %s",encodeDFUstate(wu->queryProgress()->getState(),s.clear()).str());
  583. IConstDFUfileSpec * file = wu->queryDestination();
  584. StringBuffer tmp;
  585. PROGLOG(" dest = %s",file->getTitle(tmp).str());
  586. }
  587. }
  588. }
  589. void testPagedIterate()
  590. {
  591. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  592. __int64 cachehint=0;
  593. unsigned n=0;
  594. for (unsigned page=0;page<3;page++) {
  595. DFUsortfield sortorder[] = {DFUsf_user,DFUsf_state,DFUsf_term};
  596. Owned<IConstDFUWorkUnitIterator> iter = factory->getWorkUnitsSorted(sortorder, NULL, NULL, page*10, 10, "nigel", &cachehint, NULL);
  597. StringBuffer s;
  598. ForEach(*iter) {
  599. Owned<IConstDFUWorkUnit> wu = iter->get();
  600. PROGLOG("%s:",wu->queryId());
  601. PROGLOG(" cluster = %s",wu->getClusterName(s.clear()).str());
  602. PROGLOG(" job name = %s",wu->getJobName(s.clear()).str());
  603. PROGLOG(" queue = %s",wu->getQueue(s.clear()).str());
  604. PROGLOG(" user = %s",wu->getUser(s.clear()).str());
  605. PROGLOG(" state = %s",encodeDFUstate(wu->queryProgress()->getState(),s.clear()).str());
  606. IConstDFUfileSpec * file = wu->queryDestination();
  607. StringBuffer tmp;
  608. PROGLOG(" dest = %s",file->getTitle(tmp).str());
  609. }
  610. }
  611. }
  612. #if 0
  613. void testDFUwuqueue()
  614. {
  615. StringAttrArray wulist;
  616. unsigned njobs = queuedJobs("nigel_dfuserver_queue",wulist);
  617. PROGLOG("njobs = %d",njobs);
  618. for (unsigned i=0; i<wulist.ordinality(); i++) {
  619. PROGLOG("job[%d] = %s",i,wulist.item(i).text.get());
  620. }
  621. }
  622. #else
  623. void testDFUwuqueue(const char *name)
  624. {
  625. StringAttrArray wulist;
  626. unsigned running = queuedJobs(name,wulist);
  627. StringBuffer cmd;
  628. StringBuffer cname;
  629. StringBuffer jname;
  630. StringBuffer uname;
  631. ForEachItemIn(i,wulist) {
  632. const char *wuid = wulist.item(i).text.get();
  633. Owned<IConstDFUWorkUnit> wu = getDFUWorkUnitFactory()->openWorkUnit(wuid,false);
  634. if (wu)
  635. PROGLOG("%s: %s,%s,%s,%s,%s",wuid,i<running?"Running":"Queued",
  636. encodeDFUcommand(wu->getCommand(),cmd.clear()).str(),
  637. wu->getClusterName(cname.clear()).str(),
  638. wu->getUser(uname.clear()).str(),
  639. wu->getJobName(jname.clear()).str()
  640. );
  641. }
  642. }
  643. #endif
  644. #define WAIT_SECONDS 30
  645. void testSuperRemoteCopy(const char *remoteip,const char *file)
  646. {
  647. Owned<IDfuFileCopier> copier = createRemoteFileCopier("dfuserver_queue","thor","Test1",true);
  648. Owned<IDFUhelper> helper = createIDFUhelper();
  649. SocketEndpoint ep(remoteip);
  650. helper->superForeignCopy(file,ep,file,UNKNOWN_USER,UNKNOWN_USER,false,copier);
  651. }
  652. void testRepeatedFiles1(StringBuffer &wuid)
  653. {
  654. // IMPORT single cluster, part repeated
  655. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  656. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  657. wuid.append(wu->queryId());
  658. wu->setClusterName("thor");
  659. wu->setJobName("test job 1");
  660. wu->setQueue("dfuserver_queue");
  661. wu->setUser("nhicks");
  662. IDFUfileSpec *source = wu->queryUpdateSource();
  663. IDFUfileSpec *destination = wu->queryUpdateDestination();
  664. IDFUoptions *options = wu->queryUpdateOptions();
  665. IDFUprogress *progress = wu->queryUpdateProgress();
  666. wu->setCommand(DFUcmd_import);
  667. RemoteFilename rfn;
  668. SocketEndpoint ep("10.173.34.80");
  669. rfn.setPath(ep,"d:\\thordata\\test1._1_of_1");
  670. source->setSingleFilename(rfn);
  671. source->setTitle("test1");
  672. source->setRecordSize(10); // needed cause going to split file
  673. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  674. Owned<IGroup> grp = queryNamedGroupStore().lookup("thor");
  675. fdesc->setDefaultDir("/c$/thordata/thor_dev/nigel");
  676. fdesc->setPartMask("testspray1._$P$_of_$N$");
  677. fdesc->setNumParts(19);
  678. ClusterPartDiskMapSpec mapping;
  679. mapping.setRepeatedCopies(18,false);
  680. fdesc->addCluster("thor",grp,mapping);
  681. destination->setFromFileDescriptor(*fdesc);
  682. destination->setLogicalName("thor_dev::nigel::testspray1");
  683. options->setReplicate(true);
  684. options->setOverwrite(true);
  685. submitDFUWorkUnit(wu.getClear());
  686. }
  687. void testRepeatedFiles2(StringBuffer &wuid)
  688. {
  689. // COPY single cluster, part repeated (uses file created by 1)
  690. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  691. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  692. wuid.append(wu->queryId());
  693. wu->setClusterName("thor");
  694. wu->setJobName("test job 2");
  695. wu->setQueue("dfuserver_queue");
  696. wu->setUser("nhicks");
  697. IDFUfileSpec *source = wu->queryUpdateSource();
  698. IDFUfileSpec *destination = wu->queryUpdateDestination();
  699. IDFUoptions *options = wu->queryUpdateOptions();
  700. IDFUprogress *progress = wu->queryUpdateProgress();
  701. wu->setCommand(DFUcmd_copy);
  702. source->setLogicalName("thor_dev::nigel::testspray1");
  703. source->setTitle("test2");
  704. source->setRecordSize(10); // needed cause going to split file
  705. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  706. Owned<IGroup> grp = queryNamedGroupStore().lookup("thor");
  707. fdesc->setDefaultDir("/c$/thordata/thor_dev/nigel");
  708. fdesc->setPartMask("testcopy1._$P$_of_$N$");
  709. fdesc->setNumParts(19);
  710. ClusterPartDiskMapSpec mapping;
  711. mapping.setRepeatedCopies(18,false);
  712. fdesc->addCluster("thor",grp,mapping);
  713. destination->setFromFileDescriptor(*fdesc);
  714. destination->setLogicalName("thor_dev::nigel::testcopy1");
  715. options->setReplicate(true);
  716. options->setOverwrite(true);
  717. submitDFUWorkUnit(wu.getClear());
  718. }
  719. static IGroup *getAuxGroup()
  720. {
  721. IGroup *auxgrp = queryNamedGroupStore().lookup("test_dummy_group");
  722. if (!auxgrp)
  723. {
  724. queryNamedGroupStore().add("test_dummy_group", { "10.173.34.70-77" }, true, "/c$/dummydata");
  725. auxgrp = queryNamedGroupStore().lookup("test_dummy_group");
  726. }
  727. return auxgrp;
  728. }
  729. void testRepeatedFiles3(StringBuffer &wuid)
  730. {
  731. // lets make a dummy cluster if doesn't exist already
  732. Owned<IGroup> auxgrp = getAuxGroup();
  733. // IMPORT double cluster, part repeated
  734. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  735. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  736. wuid.append(wu->queryId());
  737. wu->setClusterName("thor");
  738. wu->setJobName("test job 3");
  739. wu->setQueue("dfuserver_queue");
  740. wu->setUser("nhicks");
  741. IDFUfileSpec *source = wu->queryUpdateSource();
  742. IDFUfileSpec *destination = wu->queryUpdateDestination();
  743. IDFUoptions *options = wu->queryUpdateOptions();
  744. IDFUprogress *progress = wu->queryUpdateProgress();
  745. wu->setCommand(DFUcmd_import);
  746. RemoteFilename rfn;
  747. SocketEndpoint ep("10.173.34.80");
  748. rfn.setPath(ep,"d:\\thordata\\test1._1_of_1");
  749. source->setSingleFilename(rfn);
  750. source->setTitle("test3");
  751. source->setRecordSize(10); // needed cause going to split file
  752. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  753. Owned<IGroup> grp = queryNamedGroupStore().lookup("thor");
  754. fdesc->setDefaultDir("/c$/thordata/thor_dev/nigel");
  755. fdesc->setPartMask("testspray3._$P$_of_$N$");
  756. fdesc->setNumParts(19);
  757. ClusterPartDiskMapSpec mapping;
  758. mapping.setRepeatedCopies(18,false);
  759. fdesc->addCluster("thor",grp,mapping);
  760. ClusterPartDiskMapSpec auxmapping;
  761. auxmapping.setRepeatedCopies(18,true);
  762. auxmapping.setDefaultBaseDir("/c$/dummydata");
  763. fdesc->addCluster("test_dummy_group",grp,auxmapping);
  764. destination->setFromFileDescriptor(*fdesc);
  765. destination->setLogicalName("thor_dev::nigel::testspray2");
  766. options->setReplicate(true);
  767. options->setOverwrite(true);
  768. submitDFUWorkUnit(wu.getClear());
  769. }
  770. void testRepeatedFiles4(StringBuffer &wuid)
  771. {
  772. // COPY dual cluster, part repeated on cluster 1 only repeated cluster2 (uses file created by 1)
  773. Owned<IGroup> auxgrp = getAuxGroup();
  774. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  775. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  776. wuid.append(wu->queryId());
  777. wu->setClusterName("thor");
  778. wu->setJobName("test job 2");
  779. wu->setQueue("dfuserver_queue");
  780. wu->setUser("nhicks");
  781. IDFUfileSpec *source = wu->queryUpdateSource();
  782. IDFUfileSpec *destination = wu->queryUpdateDestination();
  783. IDFUoptions *options = wu->queryUpdateOptions();
  784. IDFUprogress *progress = wu->queryUpdateProgress();
  785. wu->setCommand(DFUcmd_copy);
  786. source->setLogicalName("thor_dev::nigel::testspray1");
  787. source->setTitle("test4");
  788. source->setRecordSize(10); // needed cause going to split file
  789. destination->setLogicalName("thor_dev::nigel::testcopy2");
  790. // destination->setDirectory("/c$/thordata/thor_dev/nigel");
  791. destination->setFileMask("testcopy2._$P$_of_$N$");
  792. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/thordata","thor",true,false);
  793. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/dummydata","test_dummy_group",true,true);
  794. destination->setWrap(true);
  795. options->setReplicate(true);
  796. options->setOverwrite(true);
  797. submitDFUWorkUnit(wu.getClear());
  798. }
  799. void testRepeatedFiles5(StringBuffer &wuid)
  800. {
  801. // COPY dual cluster, part repeated on cluster 1 only repeated cluster2 (uses file created by 1)
  802. Owned<IGroup> auxgrp = getAuxGroup();
  803. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  804. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  805. wuid.append(wu->queryId());
  806. wu->setClusterName("thor");
  807. wu->setJobName("test job 2");
  808. wu->setQueue("dfuserver_queue");
  809. wu->setUser("nhicks");
  810. IDFUfileSpec *source = wu->queryUpdateSource();
  811. IDFUfileSpec *destination = wu->queryUpdateDestination();
  812. IDFUoptions *options = wu->queryUpdateOptions();
  813. IDFUprogress *progress = wu->queryUpdateProgress();
  814. wu->setCommand(DFUcmd_copy);
  815. source->setLogicalName("thor_data400::in::uccv2::nyc::party::init");
  816. SocketEndpoint ep("10.173.28.12");
  817. source->setForeignDali(ep);
  818. source->setTitle("test4");
  819. source->setRecordSize(10); // needed cause going to split file
  820. destination->setLogicalName("thor_dev::nigel::testcopy3");
  821. // destination->setDirectory("/c$/thordata/thor_dev/nigel");
  822. destination->setFileMask("testcopy2._$P$_of_$N$");
  823. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/thordata","thor",true,false);
  824. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/dummydata","test_dummy_group",true,true);
  825. destination->setWrap(true);
  826. options->setReplicate(true);
  827. options->setOverwrite(true);
  828. options->setSuppressNonKeyRepeats(true); // **** only repeat last part when src kind = key
  829. submitDFUWorkUnit(wu.getClear());
  830. }
  831. void testSuperCopy1(StringBuffer &wuid)
  832. {
  833. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  834. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  835. wuid.append(wu->queryId());
  836. wu->setClusterName("thor");
  837. wu->setJobName("test super copy 1");
  838. wu->setQueue("dfuserver_queue");
  839. wu->setUser("nhicks");
  840. IDFUfileSpec *source = wu->queryUpdateSource();
  841. IDFUfileSpec *destination = wu->queryUpdateDestination();
  842. IDFUoptions *options = wu->queryUpdateOptions();
  843. IDFUprogress *progress = wu->queryUpdateProgress();
  844. wu->setCommand(DFUcmd_supercopy);
  845. //source->setLogicalName("thor_data400::in::vehreg_nv_ttl_update");
  846. source->setLogicalName("thor_data400::in::uccv2::nyc::party");
  847. SocketEndpoint ep("10.173.28.12");
  848. source->setForeignDali(ep);
  849. destination->setLogicalName("nigel::testsupercopy1");
  850. // destination->setDirectory("/c$/thordata/thor_dev/nigel");
  851. // destination->setFileMask("testcopy2._$P$_of_$N$");
  852. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/thordata","thor",true,false);
  853. destination->setClusterPartDiskMapping(DFUcpdm_c_replicated_by_d,"/c$/dummydata","test_dummy_group",true,true);
  854. destination->setWrap(true);
  855. destination->setRoxiePrefix("testprefix");
  856. options->setReplicate(true);
  857. options->setOverwrite(true);
  858. options->setSuppressNonKeyRepeats(true);
  859. submitDFUWorkUnit(wu.getClear());
  860. }
  861. struct ReleaseAtomBlock { ~ReleaseAtomBlock() { releaseAtoms(); } };
  862. int main(int argc, char* argv[])
  863. {
  864. ReleaseAtomBlock rABlock;
  865. InitModuleObjects();
  866. // EnableSEHtoExceptionMapping();
  867. try {
  868. CDfsLogicalFileName dlfn;
  869. dlfn.setValidate("foreign::10.173.28.12:7070::thor_data400::in::uccv2::20061115::nyc::party",true);
  870. StringBuffer cmd;
  871. splitFilename(argv[0], NULL, NULL, &cmd, NULL);
  872. StringBuffer lf;
  873. openLogFile(lf, cmd.toLowerCase().append(".log").str());
  874. PROGLOG("DFUWUTEST Starting");
  875. SocketEndpoint ep;
  876. SocketEndpointArray epa;
  877. ep.set(argv[1],DALI_SERVER_PORT);
  878. epa.append(ep);
  879. Owned<IGroup> group = createIGroup(epa);
  880. initClientProcess(group,DCR_Testing);
  881. if (0) {
  882. //test2();
  883. //testMultiFilename();
  884. //testPagedIterate();
  885. //test3();
  886. //testSuperRemoteCopy(argv[2],argv[3]);
  887. }
  888. else if ((argc>3)&&(stricmp(argv[2],"abort")==0)) {
  889. testAbort(argv[3]);
  890. }
  891. else {
  892. // testRoxieDest();
  893. // SprayTest(atoi(argv[2]));
  894. // importTest();
  895. // testRoxieCopies();
  896. StringBuffer wuid;
  897. // testRepeatedFiles1(wuid);
  898. // testRepeatedFiles2(wuid);
  899. // testRepeatedFiles3(wuid);
  900. // testRepeatedFiles4(wuid);
  901. testSuperCopy1(wuid);
  902. // testRepeatedFiles5(wuid);
  903. // testWUcreate(1,wuid.clear());
  904. // testWUcreate(2,wuid.clear());
  905. // testWUcreate(3,wuid.clear());
  906. // testWUcreate(4,wuid.clear());
  907. // testWUcreate(5,wuid.clear());
  908. // testWUcreate(6,wuid.clear());
  909. // testWUcreate(7,wuid.clear());
  910. // testIterate();
  911. // testProgressMonitor(wuid.str());
  912. // testWUcreate(8,wuid.clear());
  913. // testWUcreate(9,wuid.clear());
  914. // testWUcreate(10,wuid.clear());
  915. PROGLOG("WUID = %s",wuid.str());
  916. // testDFUwuqueue("dfuserver_queue");
  917. }
  918. closedownClientProcess();
  919. }
  920. catch (IException *e) {
  921. EXCLOG(e,"Exception");
  922. e->Release();
  923. }
  924. return 0;
  925. }
  926. //=========================================================================
  927. #if 0 // EXAMPLES for ROXIE
  928. void simpleRoxieCopy()
  929. {
  930. const char * queueName;
  931. const char * srcName;
  932. const char * dstName;
  933. const char * destCluster;
  934. const char * user;
  935. const char * password;
  936. const char * srcDali = NULL;
  937. const char * srcUser;
  938. const char * srcPassword;
  939. const char * fileMask;
  940. bool compressed = false;
  941. bool overwrite;
  942. DFUclusterPartDiskMapping val; // roxie
  943. const char * baseDir; // roxie
  944. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  945. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  946. wu->setJobName(dstName);
  947. wu->setQueue(queueName);
  948. wu->setUser(user);
  949. wu->setPassword(password);
  950. wu->setClusterName(destCluster);
  951. IDFUfileSpec *source = wu->queryUpdateSource();
  952. wu->setCommand(DFUcmd_copy);
  953. source->setLogicalName(srcName);
  954. if (srcDali) // remote copy
  955. {
  956. SocketEndpoint ep(srcDali);
  957. source->setForeignDali(ep);
  958. source->setForeignUser(srcUser, srcPassword);
  959. }
  960. IDFUfileSpec *destination = wu->queryUpdateDestination();
  961. destination->setLogicalName(dstName);
  962. destination->setFileMask(fileMask);
  963. destination->setClusterPartDiskMapping(val, baseDir, destCluster); // roxie
  964. if(compressed)
  965. destination->setCompressed(true);
  966. destination->setWrap(true); // *** roxie always wraps
  967. IDFUoptions *options = wu->queryUpdateOptions();
  968. options->setOverwrite(overwrite);
  969. options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie
  970. // other options
  971. wu->submit();
  972. }
  973. void repeatedLastPartRoxieCopy()
  974. {
  975. const char * queueName;
  976. const char * srcName;
  977. const char * dstName;
  978. const char * destCluster;
  979. const char * user;
  980. const char * password;
  981. const char * srcDali = NULL;
  982. const char * srcUser;
  983. const char * srcPassword;
  984. const char * fileMask;
  985. bool compressed = false;
  986. bool overwrite;
  987. DFUclusterPartDiskMapping val; // roxie
  988. const char * baseDir; // roxie
  989. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  990. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  991. wu->setJobName(dstName);
  992. wu->setQueue(queueName);
  993. wu->setUser(user);
  994. wu->setPassword(password);
  995. wu->setClusterName(destCluster);
  996. IDFUfileSpec *source = wu->queryUpdateSource();
  997. wu->setCommand(DFUcmd_copy);
  998. source->setLogicalName(srcName);
  999. if (srcDali) // remote copy
  1000. {
  1001. SocketEndpoint ep(srcDali);
  1002. source->setForeignDali(ep);
  1003. source->setForeignUser(srcUser, srcPassword);
  1004. }
  1005. IDFUfileSpec *destination = wu->queryUpdateDestination();
  1006. destination->setLogicalName(dstName);
  1007. destination->setFileMask(fileMask);
  1008. destination->setClusterPartDiskMapping(val, baseDir, destCluster, true); // **** repeat last part
  1009. if(compressed)
  1010. destination->setCompressed(true);
  1011. destination->setWrap(true); // roxie always wraps
  1012. IDFUoptions *options = wu->queryUpdateOptions();
  1013. options->setOverwrite(overwrite);
  1014. options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie
  1015. options->setSuppressNonKeyRepeats(true); // **** only repeat last part when src kind = key
  1016. // other options
  1017. wu->submit();
  1018. }
  1019. void repeatedLastPartWithServersRoxieCopy()
  1020. {
  1021. const char * queueName;
  1022. const char * srcName;
  1023. const char * dstName;
  1024. const char * destCluster;
  1025. const char * user;
  1026. const char * password;
  1027. const char * srcDali = NULL;
  1028. const char * srcUser;
  1029. const char * srcPassword;
  1030. const char * fileMask;
  1031. bool compressed = false;
  1032. bool overwrite;
  1033. DFUclusterPartDiskMapping val; // roxie
  1034. const char * baseDir; // roxie
  1035. const char *farmname; // **** RoxieFarmProcess/@name in Environment
  1036. const char *farmBaseDir; // **** base directory for farm
  1037. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1038. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1039. wu->setJobName(dstName);
  1040. wu->setQueue(queueName);
  1041. wu->setUser(user);
  1042. wu->setPassword(password);
  1043. wu->setClusterName(destCluster);
  1044. IDFUfileSpec *source = wu->queryUpdateSource();
  1045. wu->setCommand(DFUcmd_copy);
  1046. source->setLogicalName(srcName);
  1047. if (srcDali) // remote copy
  1048. {
  1049. SocketEndpoint ep(srcDali);
  1050. source->setForeignDali(ep);
  1051. source->setForeignUser(srcUser, srcPassword);
  1052. }
  1053. IDFUfileSpec *destination = wu->queryUpdateDestination();
  1054. destination->setLogicalName(dstName);
  1055. destination->setFileMask(fileMask);
  1056. destination->setClusterPartDiskMapping(val, baseDir, destCluster, true); // **** last part repeated
  1057. StringBuffer farmCluster(destCluster);
  1058. farmCluster.append("__").append(farmname); // dali stores server cluster as combination of roxie cluster name and farm name
  1059. destination->setClusterPartDiskMapping(val,farmBaseDir,farmCluster.str(),true,true); // **** only last part
  1060. if(compressed)
  1061. destination->setCompressed(true);
  1062. destination->setWrap(true); // roxie always wraps
  1063. IDFUoptions *options = wu->queryUpdateOptions();
  1064. options->setOverwrite(overwrite);
  1065. options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie
  1066. options->setSuppressNonKeyRepeats(true); // **** only repeat last part when src kind = key
  1067. // other options
  1068. wu->submit();
  1069. }
  1070. void simpleRoxieSuperCopy()
  1071. {
  1072. const char * queueName;
  1073. const char * srcName;
  1074. const char * dstName; // *** name of target superfile
  1075. // **** must already contain extra (i.e. roxie) leading prefix
  1076. const char * destCluster;
  1077. const char * user;
  1078. const char * password;
  1079. const char * srcDali = NULL;
  1080. const char * srcUser;
  1081. const char * srcPassword;
  1082. const char * fileMask;
  1083. bool compressed = false;
  1084. bool overwrite;
  1085. DFUclusterPartDiskMapping val; // roxie
  1086. const char * baseDir; // roxie
  1087. const char * extraPrefix; // **** extra leading prefix for sub-file names (e.g. "roxie1")
  1088. // should *not* contain trailing "::"
  1089. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1090. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1091. wu->setJobName(dstName);
  1092. wu->setQueue(queueName);
  1093. wu->setUser(user);
  1094. wu->setPassword(password);
  1095. wu->setClusterName(destCluster);
  1096. IDFUfileSpec *source = wu->queryUpdateSource();
  1097. wu->setCommand(DFUcmd_supercopy); // **** super copy
  1098. source->setLogicalName(srcName);
  1099. if (srcDali) // remote copy
  1100. {
  1101. SocketEndpoint ep(srcDali);
  1102. source->setForeignDali(ep);
  1103. source->setForeignUser(srcUser, srcPassword);
  1104. }
  1105. IDFUfileSpec *destination = wu->queryUpdateDestination();
  1106. destination->setLogicalName(dstName);
  1107. destination->setFileMask(fileMask);
  1108. destination->setClusterPartDiskMapping(val, baseDir, destCluster); // roxie
  1109. destination->setRoxiePrefix(extraPrefix); // added to start of each sub file and main name
  1110. if(compressed)
  1111. destination->setCompressed(true);
  1112. destination->setWrap(true); // roxie always wraps
  1113. IDFUoptions *options = wu->queryUpdateOptions();
  1114. options->setOverwrite(overwrite);
  1115. options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie
  1116. // other options
  1117. wu->submit();
  1118. }
  1119. #endif