dfuwu.cpp 98 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jprop.hpp"
  15. #include "jmisc.hpp"
  16. #include "jexcept.hpp"
  17. #include "jiter.ipp"
  18. #include "jptree.hpp"
  19. #include "jencrypt.hpp"
  20. #ifndef dfuwu_decl
  21. #define dfuwu_decl DECL_EXPORT
  22. #endif
  23. #include "mpbase.hpp"
  24. #include "daclient.hpp"
  25. #include "dasds.hpp"
  26. #include "dasess.hpp"
  27. #include "dadfs.hpp"
  28. #include "dautils.hpp"
  29. #include "dafdesc.hpp"
  30. #include "wujobq.hpp"
  31. #include "dfuutil.hpp"
  32. #include "dfuwu.hpp"
  33. #define COPY_WAIT_SECONDS 30
  34. #define SDS_LOCK_TIMEOUT 30000
  35. static StringBuffer &getXPathBase(StringBuffer &wuRoot)
  36. {
  37. return wuRoot.append("/DFU/WorkUnits");
  38. }
  39. static StringBuffer &getXPath(StringBuffer &wuRoot, const char *wuid)
  40. {
  41. getXPathBase(wuRoot);
  42. return wuRoot.append('/').append(wuid);
  43. }
  44. static void removeTree(IPropertyTree *root,const char *name)
  45. {
  46. IPropertyTree *t = root->queryPropTree(name);
  47. if (t)
  48. root->removeTree(t);
  49. }
  50. static StringBuffer &newWUID(StringBuffer &wuid)
  51. {
  52. wuid.append('D');
  53. char result[32];
  54. time_t ltime;
  55. time( &ltime );
  56. tm *today = localtime( &ltime );
  57. strftime(result, sizeof(result), "%Y%m%d-%H%M%S", today);
  58. wuid.append(result);
  59. return wuid;
  60. }
  61. struct DFUstateStruct { int val; const char *str; } DFUstates[] =
  62. {
  63. {DFUstate_unknown,"unknown"},
  64. {DFUstate_scheduled, "scheduled"},
  65. {DFUstate_queued,"queued"},
  66. {DFUstate_started,"started"},
  67. {DFUstate_aborted,"aborted"},
  68. {DFUstate_failed,"failed"},
  69. {DFUstate_finished,"finished"},
  70. {DFUstate_monitoring,"monitoring"},
  71. {DFUstate_aborting,"aborting"},
  72. {DFUstate_unknown,""} // must be last
  73. };
  74. struct DFUcmdStruct { int val; const char *str; } DFUcmds[] =
  75. {
  76. {DFUcmd_copy, "copy"},
  77. {DFUcmd_remove, "remove"},
  78. {DFUcmd_move, "move"},
  79. {DFUcmd_rename, "rename"},
  80. {DFUcmd_replicate, "replicate"},
  81. {DFUcmd_import, "import"},
  82. {DFUcmd_export, "export"},
  83. {DFUcmd_add, "add"},
  84. {DFUcmd_transfer, "transfer"},
  85. {DFUcmd_savemap, "savemap"},
  86. {DFUcmd_addgroup, "addgroup"},
  87. {DFUcmd_server, "server"},
  88. {DFUcmd_monitor, "monitor"},
  89. {DFUcmd_copymerge, "copymerge"},
  90. {DFUcmd_supercopy, "supercopy"},
  91. {DFUcmd_none, ""} // must be last
  92. };
  93. struct DFUsortField { int val; const char *str; } DFUsortfields[] =
  94. {
  95. {DFUsf_wuid, "@"}, //This duplicated item is added for getDFUSortFieldXPath()
  96. {DFUsf_user, "@submitID"},
  97. {DFUsf_cluster, "@cluster"},
  98. {DFUsf_state, "Progress/@state"},
  99. {DFUsf_command, "@command"},
  100. {DFUsf_job, "@jobName"},
  101. {DFUsf_wuid, "@"},
  102. {DFUsf_pcdone, "Progress/@percentdone"},
  103. {DFUsf_protected, "@protected"},
  104. {DFUsf_term, ""}
  105. };
  106. const char *getDFUSortFieldXPath(DFUsortfield sortField)
  107. {
  108. if (sortField < sizeof(DFUsortfields)/sizeof(DFUsortField))
  109. return DFUsortfields[sortField].str;
  110. return NULL;
  111. }
  112. DFUcmd decodeDFUcommand(const char * str)
  113. {
  114. if (!str)
  115. return DFUcmd_none;
  116. unsigned i=0;
  117. for (;;) {
  118. const char *cmp=DFUcmds[i].str;
  119. if (!*cmp||(stricmp(str,cmp)==0))
  120. break;
  121. i++;
  122. }
  123. return (DFUcmd)DFUcmds[i].val;
  124. }
  125. StringBuffer &encodeDFUcommand(DFUcmd cmd,StringBuffer &str)
  126. {
  127. unsigned i=0;
  128. for (;;) {
  129. if (!*DFUcmds[i].str||(DFUcmds[i].val==(int)cmd))
  130. break;
  131. i++;
  132. }
  133. return str.append(DFUcmds[i].str);
  134. }
  135. DFUstate decodeDFUstate(const char * str)
  136. {
  137. if (!str)
  138. return DFUstate_unknown;
  139. unsigned i=0;
  140. for (;;) {
  141. const char *cmp=DFUstates[i].str;
  142. if (!*cmp||(stricmp(str,cmp)==0))
  143. break;
  144. i++;
  145. }
  146. return (DFUstate)DFUstates[i].val;
  147. }
  148. StringBuffer &encodeDFUstate(DFUstate state,StringBuffer &str)
  149. {
  150. unsigned i=0;
  151. for (;;) {
  152. if (!*DFUstates[i].str||(DFUstates[i].val==(int)state))
  153. break;
  154. i++;
  155. }
  156. return str.append(DFUstates[i].str);
  157. }
  158. DFUsortfield decodeDFUsortfield(const char * s)
  159. {
  160. if (!s)
  161. return DFUsf_term;
  162. int mod = 0;
  163. while (*s) {
  164. if (*s=='-')
  165. mod |= DFUsf_reverse;
  166. else if (*s=='?')
  167. mod |= DFUsf_nocase;
  168. else if (*s=='#')
  169. mod |= DFUsf_numeric;
  170. else
  171. break;
  172. s++;
  173. }
  174. unsigned i=0;
  175. for (;;) {
  176. const char *cmp=DFUsortfields[i].str;
  177. if (!*cmp||(DFUsortfields[i].val==(int)DFUsf_term))
  178. return DFUsf_term;
  179. if (stricmp(s,cmp)==0)
  180. break;
  181. i++;
  182. }
  183. return (DFUsortfield)(DFUsortfields[i].val+mod);
  184. }
  185. StringBuffer &encodeDFUsortfield(DFUsortfield fmt,StringBuffer &str,bool incmodifier)
  186. {
  187. if (incmodifier) {
  188. if (((int)fmt)&DFUsf_reverse)
  189. str.append('-');
  190. if (((int)fmt)&DFUsf_nocase)
  191. str.append('?');
  192. if (((int)fmt)&DFUsf_numeric)
  193. str.append('#');
  194. }
  195. fmt = (DFUsortfield)(((int)fmt)&0xff);
  196. unsigned i=0;
  197. for (;;) {
  198. if ((DFUsortfields[i].val==(int)DFUsf_term)||(DFUsortfields[i].val==(int)fmt))
  199. break;
  200. i++;
  201. }
  202. return str.append(DFUsortfields[i].str);
  203. }
  204. class CDFUWorkUnitBase: public CInterface, implements IDFUWorkUnit , implements ISDSSubscription
  205. {
  206. protected: friend class CLinkedDFUWUchild; friend class CDFUprogress; friend class CDFUfileSpec; friend class CDFUoptions; friend class CDFUmonitor;
  207. Owned<IRemoteConnection> conn;
  208. Owned<IPropertyTree> root;
  209. mutable CriticalSection crit;
  210. virtual ~CDFUWorkUnitBase()
  211. {
  212. root.clear();
  213. conn.clear();
  214. }
  215. public:
  216. IMPLEMENT_IINTERFACE;
  217. };
  218. class CLinkedDFUWUchild : public CInterface
  219. {
  220. mutable Linked<IPropertyTree> root;
  221. StringAttr name;
  222. protected:
  223. CDFUWorkUnitBase *parent;
  224. public:
  225. CLinkedDFUWUchild()
  226. {
  227. parent = NULL;
  228. }
  229. ~CLinkedDFUWUchild()
  230. {
  231. root.clear();
  232. parent = NULL;
  233. }
  234. void init(CDFUWorkUnitBase *_parent,const char *_name, bool lazy)
  235. {
  236. name.set(_name);
  237. parent = _parent;
  238. if (!lazy)
  239. queryRoot();
  240. }
  241. IPropertyTree *queryRoot() const
  242. {
  243. if (!root) {
  244. root.set(parent->root->queryPropTree(name.get()));
  245. if (!root)
  246. root.set(parent->root->setPropTree(name,createPTree(name)));
  247. assertex(root);
  248. }
  249. return root.get();
  250. }
  251. void reinit()
  252. {
  253. root.set(parent->root->queryPropTree(name.get()));
  254. }
  255. virtual void Link(void) const { parent->Link(); }
  256. virtual bool Release(void) const { return parent->Release(); }
  257. };
  258. #define IMPLEMENT_DFUWUCHILD virtual void Link(void) const { CLinkedDFUWUchild::Link(); } \
  259. virtual bool Release(void) const { return CLinkedDFUWUchild::Release(); }
  260. class CDFUprogress: public CLinkedDFUWUchild, implements IDFUprogress
  261. {
  262. public:
  263. IMPLEMENT_DFUWUCHILD;
  264. bool getReplicating() const
  265. {
  266. CriticalBlock block(parent->crit);
  267. return queryRoot()->getPropInt("@replicating",0)!=0;
  268. }
  269. unsigned getPercentDone() const
  270. {
  271. CriticalBlock block(parent->crit);
  272. return (unsigned)queryRoot()->getPropInt("@percentdone");
  273. }
  274. unsigned getSecsLeft() const
  275. {
  276. CriticalBlock block(parent->crit);
  277. return (unsigned)queryRoot()->getPropInt("@secsleft");
  278. }
  279. StringBuffer &getTimeLeft(StringBuffer &str) const
  280. {
  281. CriticalBlock block(parent->crit);
  282. queryRoot()->getProp("@timeleft",str);
  283. return str;
  284. }
  285. unsigned __int64 getScaledDone() const
  286. {
  287. CriticalBlock block(parent->crit);
  288. return (unsigned __int64)queryRoot()->getPropInt64("@scaleddone");
  289. }
  290. unsigned __int64 getScaledTotal() const
  291. {
  292. CriticalBlock block(parent->crit);
  293. return (unsigned __int64)queryRoot()->getPropInt64("@scaledtotal");
  294. }
  295. StringBuffer &getScale(StringBuffer &str) const
  296. {
  297. CriticalBlock block(parent->crit);
  298. queryRoot()->getProp("@scale",str);
  299. return str;
  300. }
  301. unsigned getKbPerSecAve() const
  302. {
  303. CriticalBlock block(parent->crit);
  304. return (unsigned)queryRoot()->getPropInt("@kbpersecave");
  305. }
  306. unsigned getKbPerSec() const
  307. {
  308. CriticalBlock block(parent->crit);
  309. return (unsigned)queryRoot()->getPropInt("@kbpersec");
  310. }
  311. unsigned getSlavesDone() const
  312. {
  313. CriticalBlock block(parent->crit);
  314. return (unsigned)queryRoot()->getPropInt("@slavesdone");
  315. }
  316. unsigned getTotalNodes() const
  317. {
  318. CriticalBlock block(parent->crit);
  319. return (unsigned)queryRoot()->getPropInt("@totalnodes");
  320. }
  321. StringBuffer &getTimeTaken(StringBuffer &str) const
  322. {
  323. CriticalBlock block(parent->crit);
  324. queryRoot()->getProp("@timetaken",str);
  325. return str;
  326. }
  327. StringBuffer &formatProgressMessage(StringBuffer &str) const
  328. {
  329. CriticalBlock block(parent->crit);
  330. StringBuffer timeleft;
  331. StringBuffer scale;
  332. unsigned pc = getPercentDone();
  333. str.appendf("%d%% Done",pc);
  334. bool replicating = getReplicating();
  335. if (replicating&&(pc<100))
  336. str.appendf(", Replicating");
  337. getTimeLeft(timeleft);
  338. if (timeleft.length()&&((pc!=100)||(stricmp(timeleft.str(),"unknown")!=0)))
  339. str.appendf(", %s left",timeleft.str());
  340. if (!replicating) {
  341. unsigned __int64 sdone = getScaledDone();
  342. unsigned __int64 stotal = getScaledTotal();
  343. getScale(scale);
  344. getKbPerSecAve();
  345. unsigned kbs = getKbPerSecAve();
  346. if ((kbs!=0)||(sdone!=0)||(stotal!=0)) {
  347. str.append(" (");
  348. if ((sdone!=0)||(stotal!=0)) {
  349. str.appendf("%" I64F "d/%" I64F "d%s",sdone,stotal,scale.str());
  350. if (kbs!=0)
  351. str.append(' ');
  352. }
  353. if (kbs!=0)
  354. str.appendf("@%dKB/sec",kbs);
  355. str.append(')');
  356. }
  357. kbs = getKbPerSec();
  358. if (kbs!=0)
  359. str.appendf(" current rate=%dKB/sec",kbs);
  360. }
  361. unsigned totnodes=getTotalNodes();
  362. if (totnodes==0) { // print subdone/done
  363. StringBuffer s;
  364. if (queryRoot()->getProp("@subinprogress",s)&&s.length())
  365. str.appendf(" %s in progress",s.str());
  366. if (queryRoot()->getProp("@subdone",s.clear())&&s.length())
  367. str.appendf(" %s completed",s.str());
  368. }
  369. else
  370. str.appendf(" [%d/%dnodes]",getSlavesDone(),totnodes);
  371. return str;
  372. }
  373. StringBuffer &formatSummaryMessage(StringBuffer &str) const
  374. {
  375. CriticalBlock block(parent->crit);
  376. Owned<IExceptionIterator> ei = parent->getExceptionIterator();
  377. if (ei->first()) {
  378. IException &e = ei->query();
  379. str.append("Failed: ");
  380. e.errorMessage(str);
  381. }
  382. else {
  383. StringBuffer timetaken;
  384. str.appendf("Total time taken %s", getTimeTaken(timetaken).str());
  385. unsigned kbs = getKbPerSecAve();
  386. if (kbs!=0)
  387. str.appendf(", Average transfer %dKB/sec", kbs);
  388. }
  389. return str;
  390. }
  391. DFUstate getState() const
  392. {
  393. CriticalBlock block(parent->crit);
  394. return decodeDFUstate(queryRoot()->queryProp("@state"));
  395. }
  396. CDateTime &getTimeStarted(CDateTime &val) const
  397. {
  398. CriticalBlock block(parent->crit);
  399. StringBuffer str;
  400. queryRoot()->getProp("@timestarted",str);
  401. val.setString(str.str());
  402. return val;
  403. }
  404. CDateTime &getTimeStopped(CDateTime &val) const
  405. {
  406. CriticalBlock block(parent->crit);
  407. StringBuffer str;
  408. queryRoot()->getProp("@timestopped",str);
  409. val.setString(str.str());
  410. return val;
  411. }
  412. void setProgress( unsigned percentDone, unsigned secsLeft, const char * timeLeft,
  413. unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
  414. unsigned kbPerSecAve, unsigned kbPerSecRate,
  415. unsigned slavesDone, bool replicating, unsigned __int64 numReads, unsigned __int64 numWrites)
  416. {
  417. CriticalBlock block(parent->crit);
  418. queryRoot()->setPropInt("@percentdone",(int)percentDone);
  419. queryRoot()->setPropInt("@secsleft",(int)secsLeft);
  420. queryRoot()->setProp("@timeleft",timeLeft);
  421. queryRoot()->setPropInt64("@scaleddone",scaledDone);
  422. queryRoot()->setPropInt64("@scaledtotal",scaledTotal);
  423. queryRoot()->setProp("@scale",scale);
  424. queryRoot()->setPropInt("@kbpersecave",(int)kbPerSecAve);
  425. queryRoot()->setPropInt("@kbpersec",(int)kbPerSecRate);
  426. queryRoot()->setPropInt("@slavesdone",(int)slavesDone);
  427. queryRoot()->setPropInt("@replicating",replicating?1:0);
  428. queryRoot()->setPropInt("@numreads",numReads);
  429. queryRoot()->setPropInt("@numwrites",numWrites);
  430. parent->commit();
  431. }
  432. void setPercentDone(unsigned percentDone)
  433. {
  434. CriticalBlock block(parent->crit);
  435. queryRoot()->setPropInt("@percentdone",(int)percentDone);
  436. }
  437. void clearProgress()
  438. {
  439. CriticalBlock block(parent->crit);
  440. queryRoot()->removeProp("@percentdone");
  441. queryRoot()->removeProp("@secsleft");
  442. queryRoot()->removeProp("@timetaken");
  443. queryRoot()->removeProp("@timeleft");
  444. queryRoot()->removeProp("@scaleddone");
  445. queryRoot()->removeProp("@scaledtotal");
  446. queryRoot()->removeProp("@scale");
  447. queryRoot()->removeProp("@kbpersecave");
  448. queryRoot()->removeProp("@kbpersec");
  449. queryRoot()->removeProp("@slavesdone");
  450. parent->commit();
  451. }
  452. void setDone(const char * timeTaken, unsigned kbPerSec, bool set100pc)
  453. {
  454. CriticalBlock block(parent->crit);
  455. if (timeTaken) {
  456. queryRoot()->setProp("@timetaken",timeTaken);
  457. queryRoot()->setPropInt("@kbpersecave",(int)kbPerSec);
  458. queryRoot()->setPropInt("@kbpersec",(int)kbPerSec);
  459. }
  460. if (set100pc)
  461. queryRoot()->setPropInt("@percentdone",(int)100);
  462. queryRoot()->setPropInt("@replicating",0);
  463. parent->commit();
  464. }
  465. void setState(DFUstate state)
  466. {
  467. CriticalBlock block(parent->crit);
  468. CDateTime dt;
  469. switch (state) {
  470. case DFUstate_started:
  471. dt.setNow();
  472. setTimeStarted(dt);
  473. break;
  474. case DFUstate_aborting:
  475. {
  476. DFUstate oldstate = getState();
  477. if ((oldstate==DFUstate_aborted)||(oldstate==DFUstate_failed)||(oldstate==DFUstate_finished))
  478. state = oldstate;
  479. }
  480. // fall through
  481. case DFUstate_aborted:
  482. case DFUstate_failed:
  483. case DFUstate_finished:
  484. if (parent->removeQueue()&&(state==DFUstate_aborting))
  485. state = DFUstate_aborted;
  486. dt.setNow();
  487. setTimeStopped(dt);
  488. break;
  489. }
  490. StringBuffer s;
  491. encodeDFUstate(state,s);
  492. queryRoot()->setProp("@state",s.str());
  493. parent->commit();
  494. }
  495. void setTimeStarted(const CDateTime &val)
  496. {
  497. CriticalBlock block(parent->crit);
  498. StringBuffer str;
  499. val.getString(str);
  500. queryRoot()->setProp("@timestarted",str.str());
  501. }
  502. void setTimeStopped(const CDateTime &val)
  503. {
  504. CriticalBlock block(parent->crit);
  505. StringBuffer str;
  506. val.getString(str);
  507. queryRoot()->setProp("@timestopped",str.str());
  508. }
  509. void setTotalNodes(unsigned val)
  510. {
  511. CriticalBlock block(parent->crit);
  512. queryRoot()->setPropInt("@totalnodes",(int)val);
  513. }
  514. StringBuffer &getSubInProgress(StringBuffer &str) const
  515. {
  516. CriticalBlock block(parent->crit);
  517. queryRoot()->getProp("@subinprogress",str);
  518. return str;
  519. }
  520. StringBuffer &getSubDone(StringBuffer &str) const
  521. {
  522. CriticalBlock block(parent->crit);
  523. queryRoot()->getProp("@subdone",str);
  524. return str;
  525. }
  526. double getFileAccessCost() const
  527. {
  528. CriticalBlock block(parent->crit);
  529. return queryRoot()->getPropInt64("@fileAccessCost");
  530. }
  531. void setSubInProgress(const char *str)
  532. {
  533. CriticalBlock block(parent->crit);
  534. queryRoot()->setProp("@subinprogress",str);
  535. }
  536. void setSubDone(const char *str)
  537. {
  538. CriticalBlock block(parent->crit);
  539. queryRoot()->setProp("@subdone",str);
  540. }
  541. void setFileAccessCost(double fileAccessCost)
  542. {
  543. CriticalBlock block(parent->crit);
  544. queryRoot()->setPropReal("@fileAccessCost", fileAccessCost);
  545. }
  546. };
  547. class CDFUmonitor: public CLinkedDFUWUchild, implements IDFUmonitor
  548. {
  549. public:
  550. IMPLEMENT_DFUWUCHILD;
  551. unsigned getCycleCount() const
  552. {
  553. CriticalBlock block(parent->crit);
  554. return (unsigned)queryRoot()->getPropInt("@cycles");
  555. }
  556. unsigned getShotCount() const
  557. {
  558. CriticalBlock block(parent->crit);
  559. return (unsigned)queryRoot()->getPropInt("@shots");
  560. }
  561. bool getHandlerEp(SocketEndpoint &ep) const
  562. {
  563. const char *s = queryRoot()->queryProp("@handler");
  564. if (s&&*s) {
  565. ep.set(s);
  566. if (!ep.isNull())
  567. return true;
  568. }
  569. return false;
  570. }
  571. StringBuffer &getEventName(StringBuffer &str)const
  572. {
  573. queryRoot()->getProp("@eventname",str);
  574. return str;
  575. }
  576. bool getSub()const
  577. {
  578. return queryRoot()->getPropBool("@sub");
  579. }
  580. unsigned getShotLimit() const
  581. {
  582. return queryRoot()->getPropInt("@shotlimit");
  583. }
  584. unsigned getTriggeredList(StringAttrArray &files) const
  585. {
  586. MemoryBuffer buf;
  587. if (!queryRoot()->getPropBin("triggeredList",buf)||(buf.length()<sizeof(unsigned)))
  588. return 0;
  589. unsigned n;
  590. buf.read(n);
  591. for (unsigned i=0;i<n;i++) {
  592. StringAttrItem &item = *new StringAttrItem;
  593. buf.read(item.text);
  594. files.append(item);
  595. }
  596. return n;
  597. }
  598. void setCycleCount(unsigned val)
  599. {
  600. CriticalBlock block(parent->crit);
  601. queryRoot()->setPropInt("@cycles",(int)val);
  602. }
  603. void setShotCount(unsigned val)
  604. {
  605. CriticalBlock block(parent->crit);
  606. queryRoot()->setPropInt("@shots",(int)val);
  607. }
  608. void setHandlerEp(const SocketEndpoint &ep)
  609. {
  610. if (ep.isNull())
  611. queryRoot()->removeProp("@handler");
  612. else {
  613. StringBuffer s;
  614. queryRoot()->setProp("@handler",ep.getUrlStr(s).str());
  615. }
  616. }
  617. void setSub(bool sub)
  618. {
  619. queryRoot()->setPropBool("@sub",sub);
  620. }
  621. void setEventName(const char *lfn)
  622. {
  623. queryRoot()->setProp("@eventname",lfn);
  624. }
  625. void setShotLimit(unsigned limit)
  626. {
  627. queryRoot()->setPropInt("@shotlimit",limit);
  628. }
  629. void setTriggeredList(const StringAttrArray &files)
  630. {
  631. MemoryBuffer buf;
  632. unsigned n = files.ordinality();
  633. buf.append(n);
  634. for (unsigned i=0;i<n;i++) {
  635. StringAttrItem &item = files.item(i);
  636. buf.append(item.text.get());
  637. }
  638. queryRoot()->setPropBin("triggeredList",buf.length(),buf.toByteArray());
  639. }
  640. };
  641. static void printDesc(IFileDescriptor *desc)
  642. {
  643. // ** TBD
  644. if (desc) {
  645. StringBuffer tmp1;
  646. StringBuffer tmp2;
  647. unsigned n = desc->numParts();
  648. PROGLOG(" numParts = %d",n);
  649. PROGLOG(" numCopies(0) = %d",desc->numCopies(0));
  650. // PROGLOG(" groupWidth = %d",desc->queryClusterGroup(0)->ordinality());
  651. // PROGLOG(" numSubFiles = %d",desc->getNumSubFiles());
  652. // Owned<IGroup> group = desc->getGroup(0);
  653. // PROGLOG(" group(0) = %d,%s,...,%s",group->ordinality(),group->queryNode(0).endpoint().getUrlStr(tmp1.clear()).str(),group->queryNode(group->ordinality()-1).endpoint().getUrlStr(tmp2.clear()).str());
  654. // group.setown(desc->getGroup(1));
  655. // PROGLOG(" group(1) = %d,%s,...,%s",group->ordinality(),group->queryNode(0).endpoint().getUrlStr(tmp1.clear()).str(),group->queryNode(group->ordinality()-1).endpoint().getUrlStr(tmp2.clear()).str());
  656. unsigned copy;
  657. for (copy = 0;copy<2;copy++) {
  658. unsigned i;
  659. for (i=0;i<n;i++) {
  660. RemoteFilename rfn;
  661. desc->getFilename(i,copy,rfn);
  662. PROGLOG(" file (%d,%d) = %s",copy,i,rfn.getRemotePath(tmp1.clear()).str());
  663. }
  664. }
  665. }
  666. }
  667. class CDFUfileSpec: public CLinkedDFUWUchild, implements IDFUfileSpec
  668. {
  669. unsigned numpartsoverride;
  670. mutable DFD_OS os;
  671. mutable Owned<IPropertyTree> nullattr;
  672. public:
  673. CDFUfileSpec()
  674. {
  675. numpartsoverride = 0;
  676. os = DFD_OSdefault;
  677. }
  678. IMPLEMENT_DFUWUCHILD;
  679. IFileDescriptor *getFileDescriptor(bool iskey,bool ignorerepeats) const
  680. {
  681. unsigned nc = numClusters();
  682. unsigned n=nc?getNumParts(0,iskey):0;
  683. if (!n) {
  684. StringBuffer lname;
  685. if (getLogicalName(lname).length()) {
  686. CDfsLogicalFileName lfn;
  687. lfn.set(lname.str());
  688. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  689. SocketEndpoint foreignep;
  690. bool isforeign = getForeignDali(foreignep);
  691. if (lfn.isForeign())
  692. isforeign = true;
  693. else if (isforeign)
  694. lfn.setForeign(foreignep,false);
  695. StringBuffer username;
  696. StringBuffer password;
  697. if (!isforeign||!getForeignUser(username,password)) {
  698. parent->getUser(username);
  699. parent->getPassword(password);
  700. }
  701. userdesc->set(username.str(),password.str());
  702. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser);
  703. if (file)
  704. return file->getFileDescriptor();
  705. }
  706. StringBuffer s;
  707. SocketEndpoint ep;
  708. if ((getGroupName(0,s).length()!=0)&&!getForeignDali(ep) ) {
  709. Owned<IGroup> grp = queryNamedGroupStore().lookup(s.str());
  710. if (!grp)
  711. throw MakeStringException(-1,"CDFUfileSpec: Cluster %s not found",s.str());
  712. }
  713. throw MakeStringException(-1,"CDFUfileSpec: No parts found for file!");
  714. }
  715. IPropertyTree *p = createPTreeFromIPT(queryProperties());
  716. if (iskey) {
  717. p->removeProp("@blockCompressed"); // can't compress keys
  718. }
  719. Owned<IFileDescriptor> ret = createFileDescriptor(p);
  720. StringBuffer s;
  721. bool dirgot = false;
  722. if (getDirectory(s).length()) {
  723. dirgot = true;
  724. ret->setDefaultDir(s.str());
  725. }
  726. if (getTitle(s.clear()).length())
  727. ret->setTraceName(s.str());
  728. else if (getLogicalName(s).length())
  729. ret->setTraceName(s.str());
  730. else if (getFileMask(s).length())
  731. ret->setTraceName(s.str());
  732. bool initdone = false;
  733. StringBuffer partmask;
  734. if (getFileMask(partmask).length())
  735. ret->setPartMask(partmask.str());
  736. for (unsigned clustnum=0;clustnum<nc;clustnum++) {
  737. if (clustnum)
  738. n=getNumParts(clustnum,iskey);
  739. if (!n)
  740. continue;
  741. ClusterPartDiskMapSpec mspec;
  742. getClusterPartDiskMapSpec(clustnum,mspec);
  743. if (ignorerepeats&&(mspec.repeatedPart!=CPDMSRP_notRepeated)) {
  744. if (mspec.repeatedPart&CPDMSRP_onlyRepeated)
  745. continue; // ignore only repeated cluster
  746. mspec.repeatedPart = CPDMSRP_notRepeated;
  747. mspec.flags &= ~CPDMSF_repeatedPart;
  748. }
  749. const char *grpname=NULL;
  750. StringBuffer gs;
  751. if (getGroupName(clustnum,gs).length())
  752. grpname = gs.str();
  753. Owned<IGroup> grp(getGroup(clustnum));
  754. if (dirgot&&grp.get()&&partmask.length()) { // not sure if need dir here
  755. if (!initdone) {
  756. ret->setNumParts(n); // NB first cluster determines number of parts
  757. initdone = true;
  758. }
  759. ret->addCluster(grpname,grp,mspec);
  760. }
  761. else if (!initdone) { // don't do if added cluster already
  762. unsigned i;
  763. for (i=0;i<n;i++) {
  764. RemoteFilename rfn;
  765. getPartFilename(clustnum,i,rfn,iskey);
  766. ret->setPart(i,rfn,queryPartProperties(i));
  767. }
  768. ret->endCluster(mspec);
  769. break;
  770. }
  771. else
  772. throw MakeStringException(-1,"CDFUfileSpec: getFileDescriptor: Could not find group for cluster %d", clustnum);
  773. }
  774. return ret.getClear();
  775. }
  776. StringBuffer &getTitle(StringBuffer &str) const
  777. {
  778. queryRoot()->getProp("@title",str);
  779. return str;
  780. }
  781. StringBuffer &getRawDirectory(StringBuffer &str) const
  782. {
  783. queryRoot()->getProp("@directory",str);
  784. return str;
  785. }
  786. StringBuffer &getDirectory(StringBuffer &str) const
  787. {
  788. if (!queryRoot()->getProp("@directory",str)) {
  789. StringBuffer tmp;
  790. if (getRoxiePrefix(tmp).length())
  791. tmp.append("::");
  792. size32_t tmpl = tmp.length();
  793. if (getLogicalName(tmp).length()>tmpl) {
  794. CDfsLogicalFileName lfn;
  795. if (!lfn.setValidate(tmp.str(),true)) {
  796. throw MakeStringException(-1,"DFUWU: Logical name %s invalid(2)",tmp.str());
  797. }
  798. StringBuffer baseoverride;
  799. getClusterPartDefaultBaseDir(NULL,baseoverride);
  800. bool iswin;
  801. getWindowsOS(iswin); // sets os
  802. getLFNDirectoryUsingBaseDir(str, lfn.get(), baseoverride.str());
  803. }
  804. }
  805. return str;
  806. }
  807. StringBuffer &getLogicalName(StringBuffer &str)const
  808. {
  809. queryRoot()->getProp("OrigName",str);
  810. return str;
  811. }
  812. StringBuffer &getFileMask(StringBuffer &str) const
  813. {
  814. if (!queryRoot()->getProp("@partmask",str)) {
  815. StringBuffer tmp;
  816. getLogicalName(tmp);
  817. if (tmp.length()) {
  818. CDfsLogicalFileName lfn;
  819. if (!lfn.setValidate(tmp.str()))
  820. throw MakeStringException(-1,"DFUWU: Logical name %s invalid",tmp.str());
  821. lfn.getTail(str);
  822. str.append("._$P$_of_$N$");
  823. }
  824. }
  825. return str;
  826. }
  827. StringBuffer &getRawFileMask(StringBuffer &str) const
  828. {
  829. queryRoot()->getProp("@partmask",str);
  830. return str;
  831. }
  832. StringBuffer &getGroupName(unsigned clustnum,StringBuffer &str) const
  833. {
  834. // first see if Cluster spec
  835. if (clustnum<numClusters()) {
  836. StringBuffer xpath;
  837. xpath.appendf("Cluster[%d]",clustnum+1);
  838. IPropertyTree *ct = queryRoot()->queryPropTree(xpath.str());
  839. if (ct&&ct->getProp("@name",str))
  840. return str;
  841. StringBuffer s; // old style
  842. queryRoot()->getProp("@group",s);
  843. StringArray gs;
  844. getFileGroups(s.str(),gs);
  845. if (clustnum<gs.ordinality())
  846. str.append(gs.item(clustnum));
  847. }
  848. return str;
  849. }
  850. IPropertyTree *queryProperties() const
  851. {
  852. IPropertyTree *ret = queryRoot()->queryPropTree("Attr");
  853. if (!ret) {
  854. ret = nullattr.get();
  855. if (!ret) {
  856. nullattr.setown(createPTree("Attr"));
  857. ret = nullattr.get();
  858. }
  859. }
  860. return ret;
  861. }
  862. IPropertyTree *queryUpdateProperties()
  863. {
  864. IPropertyTree *ret = queryRoot()->queryPropTree("Attr");
  865. if (!ret)
  866. ret = setProperties(createPTree("Attr"));
  867. return ret;
  868. }
  869. size32_t getRecordSize() const
  870. {
  871. return queryProperties()->getPropInt("@recordSize");
  872. }
  873. bool isCompressed() const
  874. {
  875. bool blocked;
  876. if (!::isCompressed(*queryProperties(),&blocked))
  877. return false;
  878. return blocked; // only block compression supported
  879. }
  880. IGroup *getGroup(unsigned clustnum) const
  881. {
  882. StringBuffer gs;
  883. if (getGroupName(clustnum,gs).length()) {
  884. Owned<IGroup> grp = queryNamedGroupStore().lookup(gs.str());
  885. if (!grp)
  886. throw MakeStringException(-1,"DFUWU: Logical group %s not found",gs.str());
  887. return grp.getClear();
  888. }
  889. return NULL;
  890. }
  891. RemoteFilename &getPartFilename(unsigned clustnum,unsigned partidx, RemoteFilename &rfn, bool iskey) const
  892. {
  893. // supports both with and without Part
  894. StringBuffer tmp;
  895. StringBuffer tmpmask;
  896. StringBuffer tmpfn;
  897. SocketEndpoint ep;
  898. CDfsLogicalFileName lfn;
  899. StringBuffer dir;
  900. ClusterPartDiskMapSpec mspec;
  901. getClusterPartDiskMapSpec(clustnum,mspec);
  902. const char *mask = getFileMask(tmpmask).str();
  903. const char *fn = NULL;
  904. // now read part
  905. Owned<IGroup> grp(getGroup(clustnum));
  906. unsigned np = getNumParts(clustnum,iskey);
  907. unsigned npt = queryRoot()->getPropInt("@numparts",np);
  908. StringBuffer xpath;
  909. xpath.append("Part[@num=\"").append((partidx%npt)+1).append("\"]");
  910. IPropertyTree *part = queryRoot()->queryPropTree(xpath.str());
  911. if (part) {
  912. const char *ns=part->queryProp("@node");
  913. if (ns)
  914. ep.set(ns);
  915. if (!getWrap()&&(partidx<npt)) { // override
  916. const char *n=part->queryProp("@name");
  917. if (n&&*n) {
  918. if (findPathSepChar(n))
  919. fn = splitDirTail(n,dir);
  920. else
  921. fn = n;
  922. }
  923. }
  924. }
  925. else { // if Parts specified
  926. MemoryBuffer mb;
  927. if (queryRoot()->getPropBin("Parts",mb)) {
  928. Owned<IPropertyTreeIterator> pi = deserializePartAttrIterator(mb);
  929. ForEach(*pi) {
  930. IPropertyTree &part = pi->get();
  931. if (part.getPropInt("@num")==(partidx%npt)+1) {
  932. const char *ns=part.queryProp("@node");
  933. if (ns)
  934. ep.set(ns);
  935. if (!getWrap()&&(partidx<npt)) { // override
  936. const char *n=part.queryProp("@name");
  937. if (n&&*n) {
  938. if (findPathSepChar(n))
  939. fn = splitDirTail(n,dir);
  940. else
  941. fn = n;
  942. }
  943. }
  944. break;
  945. }
  946. }
  947. }
  948. }
  949. if (!fn)
  950. fn = mask?expandMask(tmpfn,mask,partidx,np).str():NULL;
  951. unsigned nn;
  952. unsigned dn;
  953. mspec.calcPartLocation(partidx,np,0,grp.get()?grp->ordinality():np,nn,dn);
  954. // now we should have tail name and possibly ep and dir
  955. if (!fn||!*fn)
  956. throw MakeStringException(-1,"DFUWU: cannot construct part file name");
  957. if (ep.isNull()) {
  958. if (!grp)
  959. throw MakeStringException(-1,"DFUWU: cannot determine endpoint for part file");
  960. ep = grp->queryNode(nn).endpoint();
  961. }
  962. StringBuffer tmpout;
  963. // now its a bit of a kludge but can be multiple filenames
  964. if (strchr(fn,',')) {
  965. StringArray sub;
  966. RemoteMultiFilename::expand(fn,sub);
  967. StringBuffer prevdir;
  968. ForEachItemIn(i1,sub) {
  969. const char *subfn = sub.item(i1);
  970. if (!subfn||!*subfn)
  971. continue;
  972. if (tmpout.length())
  973. tmpout.append(',');
  974. if (!isAbsolutePath(subfn)) {
  975. if (!dir.length())
  976. getDirectory(dir);
  977. if ((dir.length()==0)&&(prevdir.length()==0))
  978. throw MakeStringException(-1,"DFUWU: cannot determine file part directory for %s",subfn);
  979. if (prevdir.length())
  980. tmpout.append(prevdir);
  981. else
  982. tmpout.append(dir);
  983. addPathSepChar(tmpout);
  984. }
  985. else
  986. splitDirTail(subfn,prevdir.clear());
  987. tmpout.append(subfn);
  988. }
  989. fn = tmpout.str();
  990. }
  991. else if (!isAbsolutePath(fn)) { // shouldn't be absolute or tail
  992. if (!dir.length())
  993. getDirectory(dir);
  994. if (dir.length()==0)
  995. throw MakeStringException(-1,"DFUWU: cannot determine file part directory for %s",fn);
  996. fn = addPathSepChar(dir).append(fn).str();
  997. }
  998. StringBuffer filename;
  999. if (dn) {
  1000. filename.append(fn);
  1001. setReplicateFilename(filename,dn);
  1002. fn = filename.str();
  1003. }
  1004. rfn.setPath(ep,fn);
  1005. return rfn;
  1006. }
  1007. StringBuffer &getPartUrl(unsigned clustnum,unsigned partidx, StringBuffer &url, bool iskey) const
  1008. { // loses port
  1009. RemoteFilename rfn;
  1010. getPartFilename(clustnum,partidx,rfn,iskey);
  1011. if (rfn.queryIP().isNull())
  1012. rfn.getLocalPath(url);
  1013. else
  1014. rfn.getRemotePath(url);
  1015. return url;
  1016. }
  1017. IPropertyTree *queryPartProperties(unsigned partidx) const
  1018. {
  1019. StringBuffer path;
  1020. path.append("Part[@num=\"").append(partidx+1).append("\"]");
  1021. return queryRoot()->queryPropTree(path.str());
  1022. }
  1023. IPropertyTree *queryUpdatePartProperties(unsigned partidx)
  1024. {
  1025. StringBuffer path;
  1026. path.append("Part[@num=\"").append(partidx+1).append("\"]");
  1027. IPropertyTree *ret = queryRoot()->queryPropTree(path.str());
  1028. if (!ret) {
  1029. ret = queryRoot()->addPropTree("Part",createPTree("Part"));
  1030. ret->setPropInt("@num",partidx+1);
  1031. }
  1032. return ret;
  1033. }
  1034. unsigned getNumParts(unsigned clustnum,bool iskey) const
  1035. {
  1036. unsigned n = numpartsoverride?numpartsoverride:(unsigned)queryRoot()->getPropInt("@numparts",0);
  1037. if (!n) {
  1038. StringBuffer s;
  1039. SocketEndpoint ep;
  1040. if ((getGroupName(clustnum,s).length()!=0)&&!getForeignDali(ep) ) {
  1041. Owned<IGroup> grp = queryNamedGroupStore().lookup(s.str());
  1042. if (grp)
  1043. n = grp->ordinality();
  1044. else {
  1045. OERRLOG("DFUWU: Logical group %s not found",s.str());
  1046. return 0;
  1047. }
  1048. ClusterPartDiskMapSpec mspec;
  1049. getClusterPartDiskMapSpec(clustnum,mspec);
  1050. if (mspec.flags&CPDMSF_wrapToNextDrv)
  1051. n*=mspec.maxDrvs;
  1052. if (iskey)
  1053. n++;
  1054. }
  1055. }
  1056. return n;
  1057. }
  1058. void setTitle(const char *val)
  1059. {
  1060. queryRoot()->setProp("@title",val);
  1061. }
  1062. void setDirectory(const char *val)
  1063. {
  1064. queryRoot()->setProp("@directory",val);
  1065. }
  1066. void setLogicalName(const char *val)
  1067. {
  1068. const char *tail=val;
  1069. for (;;) {
  1070. const char *n = strstr(tail,"::");
  1071. if (!n)
  1072. break;
  1073. tail = n+2;
  1074. }
  1075. queryRoot()->setProp("@name",tail);
  1076. queryRoot()->setProp("OrigName",val);
  1077. }
  1078. void setFileMask(const char *val)
  1079. {
  1080. queryRoot()->setProp("@partmask",val);
  1081. }
  1082. void setNumParts(unsigned val)
  1083. {
  1084. queryRoot()->setPropInt("@numparts",val);
  1085. }
  1086. void setGroupName(const char *val)
  1087. {
  1088. StringArray gs;
  1089. getFileGroups(val,gs);
  1090. ForEachItemIn(i,gs) {
  1091. addCluster(gs.item(i));
  1092. }
  1093. }
  1094. IPropertyTree *setProperties(IPropertyTree *val)
  1095. {
  1096. return queryRoot()->setPropTree("Attr",val);
  1097. }
  1098. void setRecordSize(size32_t size)
  1099. {
  1100. if (size)
  1101. queryUpdateProperties()->setPropInt("@recordSize",(int)size);
  1102. else
  1103. queryUpdateProperties()->removeProp("@recordSize");
  1104. }
  1105. void setCompressed(bool set)
  1106. {
  1107. // only block compressed supported
  1108. if (set)
  1109. queryUpdateProperties()->setPropBool("@blockCompressed",true);
  1110. else
  1111. queryUpdateProperties()->removeProp("@blockCompressed");
  1112. }
  1113. virtual void setFromFileDescriptor(IFileDescriptor &fd)
  1114. {
  1115. // use dfsdesc for hard work
  1116. queryRoot()->setPropTree(NULL,fd.getFileTree(CPDMSF_packParts));
  1117. }
  1118. StringBuffer &quoteStringIfNecessary(const char *s,StringBuffer &dest)
  1119. {
  1120. // we could add other characters
  1121. if (strchr(s,',')||strchr(s,'"')) {
  1122. dest.append('"');
  1123. while (*s) {
  1124. if (*s=='"')
  1125. dest.append('"');
  1126. dest.append(*s);
  1127. s++;
  1128. }
  1129. dest.append('"');
  1130. }
  1131. else
  1132. dest.append(s);
  1133. return dest;
  1134. }
  1135. virtual void setMultiFilename(RemoteMultiFilename &rmfn)
  1136. {
  1137. // first check for common directory
  1138. StringBuffer path;
  1139. StringBuffer dir;
  1140. ForEachItemIn(i1,rmfn) {
  1141. const RemoteFilename &rfn = rmfn.item(i1);
  1142. rfn.getLocalPath(path.clear());
  1143. const char *s=path.str();
  1144. size32_t dirlen = 0;
  1145. while (*s) {
  1146. if (isPathSepChar(*s)) { // must support unix/windows from either platform
  1147. dirlen = s-path.str();
  1148. if ((dirlen==0)||((dirlen==2)&&!isPathSepChar(path.charAt(0))&&(path.charAt(1)==':')))
  1149. dirlen++;
  1150. }
  1151. s++;
  1152. }
  1153. if (i1==0)
  1154. dir.append(dirlen,path.str());
  1155. else {
  1156. size32_t dl = dir.length();
  1157. if (dl>dirlen)
  1158. dl = dirlen;
  1159. s = path.str();
  1160. const char *t = dir.str();
  1161. size32_t l = 0;
  1162. while ((l<dl)&&(s[l]==t[l])) // we should probably case insensitive for windows
  1163. l++;
  1164. while (l&&!isPathSepChar(s[l]))
  1165. l--;
  1166. if (l<dir.length()) {
  1167. while (l&&!isPathSepChar(t[l]))
  1168. l--;
  1169. dir.setLength(l);
  1170. }
  1171. }
  1172. if (dir.length()==0)
  1173. break;
  1174. }
  1175. if ((dir.length()==2)&&(dir.charAt(1)==':'))
  1176. dir.append('\\');
  1177. setDirectory(dir.str());
  1178. StringBuffer mask;
  1179. ForEachItemIn(i2,rmfn) { // now set mask
  1180. const RemoteFilename &rfn = rmfn.item(i2);
  1181. rfn.getLocalPath(path.clear());
  1182. const char *s=path.str()+dir.length();
  1183. if (isPathSepChar(*s)&&dir.length())
  1184. s++;
  1185. if (mask.length())
  1186. mask.append(',');
  1187. quoteStringIfNecessary(s,mask);
  1188. }
  1189. setFileMask(mask.str());
  1190. queryRoot()->setPropInt("@numparts",1);
  1191. IPropertyTree * part = queryRoot()->setPropTree("Part",createPTree("Part"));
  1192. part->setPropInt("@num",1);
  1193. StringBuffer url;
  1194. rmfn.queryEndpoint().getUrlStr(url);
  1195. part->setProp("@node",url.str());
  1196. }
  1197. virtual void setSingleFilename(RemoteFilename &rfn)
  1198. {
  1199. RemoteMultiFilename rmfn;
  1200. rmfn.append(rfn);
  1201. setMultiFilename(rmfn);
  1202. }
  1203. size32_t getMaxRecordSize() const
  1204. {
  1205. size32_t ret = queryProperties()->getPropInt("@maxRecordSize",8192);
  1206. if (ret==0) {
  1207. if (getFormat()==DFUff_fixed)
  1208. ret = getRecordSize(); // if fixed defaults to recordSize
  1209. ret = 8192;
  1210. }
  1211. return ret;
  1212. }
  1213. virtual void setMaxRecordSize(size32_t size)
  1214. {
  1215. if (size)
  1216. queryUpdateProperties()->setPropInt("@maxRecordSize",(int)size);
  1217. else
  1218. queryUpdateProperties()->removeProp("@maxRecordSize");
  1219. }
  1220. virtual void setFormat(DFUfileformat format)
  1221. {
  1222. StringBuffer s;
  1223. CDFUfileformat::encode(format,s);
  1224. queryUpdateProperties()->setProp("@format",s.str());
  1225. }
  1226. virtual DFUfileformat getFormat() const
  1227. {
  1228. return CDFUfileformat::decode(queryProperties()->queryProp("@format"));
  1229. }
  1230. virtual void getCsvOptions(StringBuffer &separate,StringBuffer &terminate,StringBuffer &quote,StringBuffer &escape,bool &quotedTerminator) const
  1231. {
  1232. IPropertyTree *t = queryProperties();
  1233. const char *sep=t->queryProp("@csvSeparate");
  1234. separate.append(sep?sep:"\\,");
  1235. const char *ter=t->queryProp("@csvTerminate");
  1236. terminate.append(ter?ter:"\\n,\\r\\n");
  1237. const char *quo=t->queryProp("@csvQuote");
  1238. quote.append(quo?quo:"\"");
  1239. const char *esc=t->queryProp("@csvEscape");
  1240. if (esc && *esc)
  1241. escape.set(esc);
  1242. quotedTerminator = t->getPropBool("@quotedTerminator", true);
  1243. }
  1244. void setCsvOptions(const char *separate,const char *terminate,const char *quote,const char *escape,bool quotedTerminator)
  1245. {
  1246. IPropertyTree *t = queryUpdateProperties();
  1247. if (separate) //Enable to pass zero string to override default separator
  1248. t->setProp("@csvSeparate",separate);
  1249. if (terminate && *terminate)
  1250. t->setProp("@csvTerminate",terminate);
  1251. if (quote) //Enable to pass zero string to override default quote
  1252. t->setProp("@csvQuote",quote);
  1253. if (escape && *escape)
  1254. t->setProp("@csvEscape",escape);
  1255. t->setPropBool("@quotedTerminator", quotedTerminator);
  1256. }
  1257. StringBuffer &getRowTag(StringBuffer &str)const
  1258. {
  1259. queryProperties()->getProp("@rowTag",str);
  1260. return str;
  1261. }
  1262. void setRowTag(const char *str)
  1263. {
  1264. IPropertyTree *t = queryUpdateProperties();
  1265. t->setProp("@rowTag",str);
  1266. }
  1267. void setFromXML(const char *xml)
  1268. {
  1269. // the following is slightly odd: xml->tree->file->tree
  1270. Owned<IPropertyTree> t = createPTreeFromXMLString(xml);
  1271. Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(t,&queryNamedGroupStore(),0);
  1272. setFromFileDescriptor(*fdesc);
  1273. }
  1274. void setForeignDali(const SocketEndpoint &ep)
  1275. {
  1276. // only used for source of copy
  1277. IPropertyTree *t = queryUpdateProperties();
  1278. StringBuffer s;
  1279. t->setProp("@foreignDali",ep.getUrlStr(s).str());
  1280. }
  1281. bool getForeignDali(SocketEndpoint &ep) const
  1282. {
  1283. // only used for source of copy
  1284. const char *s = queryProperties()->queryProp("@foreignDali");
  1285. if (!s||!*s)
  1286. return false;
  1287. ep.set(s);
  1288. return true;
  1289. }
  1290. void setForeignUser(const char *user,const char *password)
  1291. {
  1292. IPropertyTree *t = queryUpdateProperties();
  1293. t->setProp("@foreignUser",user);
  1294. StringBuffer pw; // minimal encryprion to obscure (will need improvement)
  1295. pw.append(parent->queryId());
  1296. pw.append(password);
  1297. StringBuffer buf;
  1298. encrypt(buf,pw.str());
  1299. t->setProp("@foreignPassword",buf.str());
  1300. }
  1301. bool getForeignUser(StringBuffer &user,StringBuffer &password) const
  1302. {
  1303. IPropertyTree *t = queryProperties();
  1304. const char *s = t->queryProp("@foreignUser");
  1305. if (!s||!*s)
  1306. return false;
  1307. user.append(s);
  1308. StringBuffer pw;
  1309. t->getProp("@foreignPassword",pw);
  1310. if (pw.length()) {
  1311. StringBuffer buf;
  1312. decrypt(buf,pw.str()); // minimal encryprion to obscure (will need improvement)
  1313. const char *p = buf.str();
  1314. const char *i = parent->queryId();
  1315. while (*p&&*i&&(*p==*i)) {
  1316. p++;
  1317. i++;
  1318. }
  1319. password.append(p);
  1320. }
  1321. return true;
  1322. }
  1323. bool getWrap() const
  1324. {
  1325. return queryRoot()->getPropInt("@wrap")!=0;
  1326. }
  1327. bool getMultiCopy() const
  1328. {
  1329. return queryRoot()->getPropInt("@multiCopy")!=0;
  1330. }
  1331. void setWrap(bool val)
  1332. {
  1333. queryRoot()->setPropInt("@wrap",val?1:0);
  1334. }
  1335. void setMultiCopy(bool val)
  1336. {
  1337. queryRoot()->setPropInt("@multicopy",val?1:0);
  1338. }
  1339. void setNumPartsOverride(unsigned num)
  1340. {
  1341. numpartsoverride = num;
  1342. }
  1343. StringBuffer &getDiffKey(StringBuffer &str) const
  1344. {
  1345. queryRoot()->getProp("@diffKey",str);
  1346. return str;
  1347. }
  1348. void setDiffKey(const char *str)
  1349. {
  1350. queryRoot()->setProp("@diffKey",str);
  1351. }
  1352. void getClusterPartDiskMapSpec(unsigned clusternum, ClusterPartDiskMapSpec &spec) const
  1353. {
  1354. unsigned nc = numClusters();
  1355. StringBuffer xpath;
  1356. xpath.appendf("Cluster[%d]",clusternum+1);
  1357. IPropertyTree *pt = queryRoot()->queryPropTree(xpath.str());
  1358. if (pt)
  1359. spec.fromProp(pt);
  1360. else {
  1361. ClusterPartDiskMapSpec defspec;
  1362. spec = defspec;
  1363. }
  1364. }
  1365. bool getClusterPartDiskMapSpec(const char *clustername, ClusterPartDiskMapSpec &spec) const
  1366. {
  1367. unsigned clusternum;
  1368. if (!findCluster(clustername,clusternum))
  1369. return false;
  1370. getClusterPartDiskMapSpec(clusternum,spec);
  1371. return true;
  1372. }
  1373. unsigned findCluster(const char *clustername, unsigned &clusternum) const
  1374. {
  1375. Owned<IPropertyTreeIterator> iter = queryRoot()->getElements("Cluster");
  1376. if (!clustername) {
  1377. if (!iter->first()) {
  1378. iter.clear();
  1379. IPropertyTree *pt= createPTree("Cluster");
  1380. ClusterPartDiskMapSpec spec;
  1381. spec.toProp(pt);
  1382. StringBuffer grpname; // this shouldn't be set but if it is then use
  1383. if (queryRoot()->getProp("@group",grpname)) {
  1384. const char * s = grpname.str();
  1385. const char *e = strchr(s,',');
  1386. if (e)
  1387. grpname.setLength(e-s);
  1388. }
  1389. if (grpname.length()) {
  1390. queryRoot()->setProp("@group",grpname.str());
  1391. pt->setProp("@name",grpname.str());
  1392. }
  1393. queryRoot()->addPropTree("Cluster",pt);
  1394. queryRoot()->setPropInt("@numclusters",1);
  1395. }
  1396. clusternum = 0;
  1397. return true;
  1398. }
  1399. // done via iterate to catch correct index
  1400. clusternum = 0;
  1401. ForEach(*iter) {
  1402. const char *name = iter->query().queryProp("@name");
  1403. if (name&&(stricmp(name,clustername)==0))
  1404. return true;
  1405. clusternum++;
  1406. }
  1407. return false;
  1408. }
  1409. void setClusterPartDiskMapSpec(unsigned clusternum, ClusterPartDiskMapSpec &spec)
  1410. {
  1411. StringBuffer xpath;
  1412. xpath.appendf("Cluster[%d]",clusternum+1);
  1413. IPropertyTree *pt = queryRoot()->queryPropTree(xpath.str());
  1414. if (pt)
  1415. spec.toProp(pt);
  1416. }
  1417. unsigned addCluster(const char *clustername)
  1418. {
  1419. StringBuffer _clustername;
  1420. if (clustername)
  1421. clustername = _clustername.append(clustername).trim().toLowerCase().str();
  1422. unsigned clusternum;
  1423. if (!findCluster(clustername,clusternum)) {
  1424. IPropertyTree *pt = createPTree("Cluster");
  1425. if (clustername&&*clustername)
  1426. pt->setProp("@name",clustername);
  1427. queryRoot()->addPropTree("Cluster",pt);
  1428. queryRoot()->setPropInt("@numclusters",clusternum+1);
  1429. StringBuffer grps;
  1430. Owned<IPropertyTreeIterator> iter = queryRoot()->getElements("Cluster");
  1431. ForEach(*iter) {
  1432. const char *name = iter->query().queryProp("@name");
  1433. if (name&&*name) {
  1434. if (grps.length())
  1435. grps.append(',');
  1436. grps.append(name);
  1437. }
  1438. }
  1439. queryRoot()->setProp("@group",grps.str());
  1440. }
  1441. return clusternum;
  1442. }
  1443. void setClusterPartDiskMapSpec(const char *clustername, ClusterPartDiskMapSpec &spec)
  1444. {
  1445. setClusterPartDiskMapSpec(addCluster(clustername),spec);
  1446. }
  1447. void setClusterPartDefaultBaseDir(const char *clustername,const char *basedir)
  1448. {
  1449. unsigned clusternum;
  1450. if (findCluster(clustername,clusternum)) {
  1451. ClusterPartDiskMapSpec spec;
  1452. getClusterPartDiskMapSpec(clusternum, spec);
  1453. spec.setDefaultBaseDir(basedir);
  1454. setClusterPartDiskMapSpec(clusternum, spec);
  1455. }
  1456. }
  1457. void setClusterPartDiskMapping(DFUclusterPartDiskMapping val,const char *basedir, const char *clustername, bool repeatlast, bool onlyrepeated)
  1458. {
  1459. ClusterPartDiskMapSpec spec;
  1460. switch(val) {
  1461. case DFUcpdm_c_replicated_by_d:
  1462. spec.defaultCopies = DFD_DefaultCopies;
  1463. break;
  1464. case DFUcpdm_c_only:
  1465. spec.defaultCopies = DFD_NoCopies;
  1466. break;
  1467. case DFUcpdm_d_only:
  1468. spec.defaultCopies = DFD_NoCopies;
  1469. spec.startDrv = 1;
  1470. break;
  1471. case DFUcpdm_c_then_d:
  1472. spec.defaultCopies = DFD_NoCopies;
  1473. spec.flags = CPDMSF_wrapToNextDrv;
  1474. break;
  1475. }
  1476. if (basedir&&*basedir)
  1477. spec.setDefaultBaseDir(basedir);
  1478. if (repeatlast)
  1479. spec.setRepeatedCopies(CPDMSRP_lastRepeated,onlyrepeated);
  1480. setClusterPartDiskMapSpec(clustername,spec);
  1481. }
  1482. StringBuffer &getClusterPartDefaultBaseDir(const char *clustername,StringBuffer &str) const
  1483. {
  1484. ClusterPartDiskMapSpec spec;
  1485. if (getClusterPartDiskMapSpec(clustername,spec)&&!spec.defaultBaseDir.isEmpty())
  1486. str.append(spec.defaultBaseDir);
  1487. return str;
  1488. }
  1489. unsigned numClusters() const
  1490. {
  1491. return queryRoot()->getPropInt("@numclusters",1);
  1492. }
  1493. void setReplicateOffset(int val)
  1494. {
  1495. unsigned nc = numClusters(); // sets for all
  1496. for (unsigned i=0;i<nc;i++) {
  1497. ClusterPartDiskMapSpec spec;
  1498. getClusterPartDiskMapSpec(i,spec);
  1499. spec.replicateOffset = val;
  1500. setClusterPartDiskMapSpec(i,spec);
  1501. }
  1502. }
  1503. void setWindowsOS(bool iswin)
  1504. {
  1505. os = iswin?DFD_OSwindows:DFD_OSunix;
  1506. }
  1507. bool getWindowsOS(bool &iswin) const
  1508. {
  1509. #ifdef _WIN32
  1510. iswin = true;
  1511. #else
  1512. iswin = false;
  1513. #endif
  1514. switch (os) {
  1515. case DFD_OSwindows:
  1516. iswin = true;
  1517. return true;
  1518. case DFD_OSunix:
  1519. iswin = true;
  1520. return true;
  1521. }
  1522. StringBuffer dir;
  1523. if (!queryRoot()->getProp("@directory",dir))
  1524. getClusterPartDefaultBaseDir(NULL,dir);
  1525. if (!dir.length())
  1526. return false;
  1527. iswin = getPathSepChar(dir.str())=='\\';
  1528. os = iswin?DFD_OSwindows:DFD_OSunix;
  1529. return true;
  1530. }
  1531. void setRoxiePrefix(const char *val)
  1532. {
  1533. queryRoot()->setProp("@roxiePrefix",val);
  1534. }
  1535. StringBuffer &getRoxiePrefix(StringBuffer &str) const
  1536. {
  1537. queryRoot()->getProp("@roxiePrefix",str);
  1538. return str.toLowerCase();
  1539. }
  1540. bool getRemoteGroupOverride() const
  1541. {
  1542. return queryRoot()->getPropBool("@remoteGroupOverride");
  1543. }
  1544. void setRemoteGroupOverride(bool set)
  1545. {
  1546. queryRoot()->setPropBool("@remoteGroupOverride",set);
  1547. }
  1548. };
  1549. class CDFUoptions: public CLinkedDFUWUchild, implements IDFUoptions
  1550. {
  1551. public:
  1552. IMPLEMENT_DFUWUCHILD;
  1553. bool getNoSplit() const
  1554. {
  1555. return queryRoot()->getPropInt("@nosplit")!=0;
  1556. }
  1557. bool getReplicate() const
  1558. {
  1559. return (queryRoot()->getPropInt("@replicate")!=0);
  1560. }
  1561. bool getRecover() const
  1562. {
  1563. return queryRoot()->getPropInt("@recover")!=0;
  1564. }
  1565. bool getNoRecover() const
  1566. {
  1567. return queryRoot()->getPropInt("@noRecover")!=0;
  1568. }
  1569. bool getIfNewer() const
  1570. {
  1571. return queryRoot()->getPropInt("@ifNewer")!=0;
  1572. }
  1573. bool getIfModified() const
  1574. {
  1575. return queryRoot()->getPropInt("@ifModified")!=0;
  1576. }
  1577. bool getSuppressNonKeyRepeats() const
  1578. {
  1579. return queryRoot()->getPropInt("@suppressNonKeyRepeats")!=0;
  1580. }
  1581. bool getSlavePathOverride(StringBuffer &path) const
  1582. {
  1583. return queryRoot()->getProp("@slave",path)&&(path.length()!=0);
  1584. }
  1585. bool getCrcCheck() const
  1586. {
  1587. return queryRoot()->getPropInt("@crcCheck")!=0;
  1588. }
  1589. __int64 getRecover_ID() const
  1590. {
  1591. return queryRoot()->getPropInt64("@recover_ID");
  1592. }
  1593. unsigned getmaxConnections() const
  1594. {
  1595. return (unsigned)queryRoot()->getPropInt("@maxConnections");
  1596. }
  1597. bool getCrc() const
  1598. {
  1599. return queryRoot()->getPropInt("@crc")!=0;
  1600. }
  1601. unsigned getRetry() const
  1602. {
  1603. return queryRoot()->getPropInt("@retry")!=0;
  1604. }
  1605. bool getPush() const
  1606. {
  1607. return queryRoot()->getPropInt("@push")!=0;
  1608. }
  1609. bool getKeepHeader() const
  1610. {
  1611. return queryRoot()->getPropInt("@keepHeader")!=0;
  1612. }
  1613. bool getPull() const
  1614. {
  1615. return queryRoot()->getPropInt("@pull")!=0;
  1616. }
  1617. unsigned getThrottle() const
  1618. {
  1619. return queryRoot()->getPropInt("@throttle");
  1620. }
  1621. size32_t getTransferBufferSize() const
  1622. {
  1623. return(size32_t)queryRoot()->getPropInt("@transferBufferSize");
  1624. }
  1625. bool getVerify() const
  1626. {
  1627. return queryRoot()->getPropInt("@verify")!=0;
  1628. }
  1629. bool getOverwrite() const
  1630. {
  1631. return queryRoot()->getPropInt("@overwrite")!=0;
  1632. }
  1633. DFUreplicateMode getReplicateMode(StringBuffer &cluster, bool &repeatlast,bool &onlyrepeated) const
  1634. {
  1635. repeatlast = false;
  1636. onlyrepeated = false;
  1637. const char *s = queryRoot()->queryProp("@replicatecluster");
  1638. if (s&&*s) {
  1639. cluster.append(s);
  1640. repeatlast = queryRoot()->getPropInt("@repeatlast")!=0;
  1641. onlyrepeated = queryRoot()->getPropInt("@onlyrepeated")!=0;
  1642. }
  1643. return (DFUreplicateMode)queryRoot()->getPropInt("@replicatemode");
  1644. }
  1645. IPropertyTree *queryTree() const
  1646. {
  1647. return queryRoot();
  1648. }
  1649. const char * queryPartFilter() const
  1650. {
  1651. return queryRoot()->queryProp("@partfilter");
  1652. }
  1653. const char * queryFooter() const
  1654. {
  1655. return queryRoot()->queryProp("@footer");
  1656. }
  1657. const char * queryHeader() const
  1658. {
  1659. return queryRoot()->queryProp("@header");
  1660. }
  1661. const char * queryGlue() const
  1662. {
  1663. return queryRoot()->queryProp("@glue");
  1664. }
  1665. const char * queryLengthPrefix() const
  1666. {
  1667. return queryRoot()->queryProp("@prefix");
  1668. }
  1669. const char * querySplitPrefix() const
  1670. {
  1671. return queryRoot()->queryProp("@splitPrefix");
  1672. }
  1673. void setNoDelete(bool val)
  1674. {
  1675. queryRoot()->setPropInt("@nodelete",val?1:0);
  1676. }
  1677. void setNoRecover(bool val)
  1678. {
  1679. queryRoot()->setPropInt("@noRecover",val?1:0);
  1680. }
  1681. void setIfNewer(bool val)
  1682. {
  1683. queryRoot()->setPropInt("@ifNewer",val?1:0);
  1684. }
  1685. void setIfModified(bool val)
  1686. {
  1687. queryRoot()->setPropInt("@ifModified",val?1:0);
  1688. }
  1689. void setSuppressNonKeyRepeats(bool val)
  1690. {
  1691. queryRoot()->setPropInt("@suppressNonKeyRepeats",val?1:0);
  1692. }
  1693. void setSlavePathOverride(const char *path)
  1694. {
  1695. if (path&&*path)
  1696. queryRoot()->setProp("@slave",path);
  1697. else
  1698. queryRoot()->removeProp("@slave");
  1699. }
  1700. void setCrcCheck(bool val)
  1701. {
  1702. queryRoot()->setPropInt("@crcCheck",val?1:0);
  1703. }
  1704. void setNoSplit(bool val=true)
  1705. {
  1706. queryRoot()->setPropInt("@nosplit",val?1:0);
  1707. }
  1708. void setReplicate(bool val=true)
  1709. {
  1710. queryRoot()->setPropInt("@replicate",val?1:0);
  1711. }
  1712. void setRecover(bool val=true)
  1713. {
  1714. queryRoot()->setPropInt("@recover",val?1:0);
  1715. }
  1716. void setRecover_ID(__int64 val)
  1717. {
  1718. queryRoot()->setPropInt64("@recover_ID",val);
  1719. }
  1720. void setmaxConnections(unsigned val)
  1721. {
  1722. queryRoot()->setPropInt64("@maxConnections",(int)val);
  1723. }
  1724. void setCrc(bool val=true)
  1725. {
  1726. queryRoot()->setPropInt("@crc",val?1:0);
  1727. }
  1728. void setRetry(unsigned val)
  1729. {
  1730. queryRoot()->setPropInt("@retry",val?1:0);
  1731. }
  1732. void setPush(bool val=true)
  1733. {
  1734. queryRoot()->setPropInt("@push",val?1:0);
  1735. }
  1736. void setKeepHeader(bool val=true)
  1737. {
  1738. queryRoot()->setPropInt("@keepHeader",val?1:0);
  1739. }
  1740. void setPull(bool val=true)
  1741. {
  1742. queryRoot()->setPropInt("@pull",val?1:0);
  1743. }
  1744. void setThrottle(unsigned val)
  1745. {
  1746. queryRoot()->setPropInt("@throttle",val);
  1747. }
  1748. void setTransferBufferSize(unsigned val)
  1749. {
  1750. queryRoot()->setPropInt("@transferBufferSize",val);
  1751. }
  1752. void setVerify(bool val=true)
  1753. {
  1754. queryRoot()->setPropInt("@verify",val?1:0);
  1755. }
  1756. void setOverwrite(bool val=true)
  1757. {
  1758. queryRoot()->setPropInt("@overwrite",val?1:0);
  1759. }
  1760. void setReplicateMode(DFUreplicateMode val,const char *cluster=NULL,bool repeatlast=false,bool onlyrepeated=false)
  1761. {
  1762. queryRoot()->setPropInt("@replicatemode",(int)val);
  1763. if (cluster) {
  1764. queryRoot()->setProp("@replicatecluster",cluster);
  1765. queryRoot()->setPropInt("@repeatlast",repeatlast?1:0);
  1766. queryRoot()->setPropInt("@onlyrepeated",onlyrepeated?1:0);
  1767. }
  1768. }
  1769. void setPartFilter(const char *filter)
  1770. {
  1771. queryRoot()->setProp("@partfilter",filter);
  1772. }
  1773. void setHeader(const char *str)
  1774. {
  1775. queryRoot()->setProp("@header",str);
  1776. }
  1777. void setGlue(const char *str)
  1778. {
  1779. queryRoot()->setProp("@glue",str);
  1780. }
  1781. void setFooter(const char *str)
  1782. {
  1783. queryRoot()->setProp("@footer",str);
  1784. }
  1785. void setLengthPrefix(const char *str)
  1786. {
  1787. queryRoot()->setProp("@prefix",str);
  1788. }
  1789. void setSplitPrefix(const char *str)
  1790. {
  1791. queryRoot()->setProp("@splitPrefix",str);
  1792. }
  1793. void setSubfileCopy(bool set)
  1794. {
  1795. queryRoot()->setPropBool("@subFileCopy",set);
  1796. }
  1797. bool getSubfileCopy() const
  1798. {
  1799. return queryRoot()->getPropBool("@subFileCopy");
  1800. }
  1801. void setEncDec(const char *enc,const char *dec)
  1802. {
  1803. assertex(parent);
  1804. const char *wuid = parent->root->queryName();
  1805. assertex(wuid&&*wuid);
  1806. MemoryBuffer mb;
  1807. mb.append(enc);
  1808. mb.append(dec);
  1809. while (mb.length()<1024) // salt
  1810. mb.append((char)getRandom()%255); // 255 deliberate so I can add stuff later
  1811. Csimplecrypt c((const byte *)wuid, strlen(wuid), mb.length());
  1812. c.encrypt((void *)mb.toByteArray());
  1813. queryRoot()->setPropBin("Data",mb.length(),mb.toByteArray());
  1814. }
  1815. virtual bool getEncDec(StringAttr &enc,StringAttr &dec)
  1816. {
  1817. MemoryBuffer mb;
  1818. if (queryRoot()->getPropBin("Data",mb)) {
  1819. assertex(parent);
  1820. const char *wuid = parent->root->queryName();
  1821. assertex(wuid&&*wuid);
  1822. Csimplecrypt c((const byte *)wuid, strlen(wuid), mb.length());
  1823. c.decrypt((void *)mb.toByteArray());
  1824. mb.read(enc).read(dec);
  1825. return true;
  1826. }
  1827. return false;
  1828. }
  1829. bool getFailIfNoSourceFile() const
  1830. {
  1831. return queryRoot()->getPropBool("@failIfNoSourceFile");
  1832. }
  1833. void setFailIfNoSourceFile(bool val)
  1834. {
  1835. queryRoot()->setPropBool("@failIfNoSourceFile",val);
  1836. }
  1837. bool getRecordStructurePresent() const
  1838. {
  1839. return queryRoot()->getPropBool("@recordStructurePresent");
  1840. }
  1841. void setRecordStructurePresent(bool val)
  1842. {
  1843. queryRoot()->setPropBool("@recordStructurePresent",val);
  1844. }
  1845. bool getQuotedTerminator() const
  1846. {
  1847. return queryRoot()->getPropBool("@quotedTerminator");
  1848. }
  1849. void setQuotedTerminator(bool val)
  1850. {
  1851. queryRoot()->setPropBool("@quotedTerminator",val);
  1852. }
  1853. bool getPreserveCompression() const
  1854. {
  1855. return queryRoot()->getPropBool("@preserveCompression");
  1856. }
  1857. void setPreserveCompression(bool val)
  1858. {
  1859. queryRoot()->setPropBool("@preserveCompression",val);
  1860. }
  1861. StringBuffer &getUMask(StringBuffer &str)const
  1862. {
  1863. if (queryRoot()->hasProp("@umask"))
  1864. queryRoot()->getProp("@umask",str);
  1865. return str;
  1866. }
  1867. void setUMask(const char *val)
  1868. {
  1869. queryRoot()->setProp("@umask",val);
  1870. }
  1871. int getExpireDays() const
  1872. {
  1873. if (queryRoot()->hasProp("@expireDays"))
  1874. return queryRoot()->getPropInt("@expireDays", -1);
  1875. return -1;
  1876. }
  1877. void setExpireDays(int val)
  1878. {
  1879. queryRoot()->setPropInt("@expireDays",val);
  1880. }
  1881. bool getNoCommon() const
  1882. {
  1883. return queryRoot()->getPropBool("@noCommon", true);
  1884. }
  1885. void setNoCommon(bool val)
  1886. {
  1887. queryRoot()->setPropBool("@noCommon",val);
  1888. }
  1889. };
  1890. class CExceptionIterator: implements IExceptionIterator, public CInterface
  1891. {
  1892. Linked<IPropertyTree> tree;
  1893. unsigned i;
  1894. Owned<IException> cur;
  1895. public:
  1896. IMPLEMENT_IINTERFACE;
  1897. CExceptionIterator(IPropertyTree *_tree)
  1898. : tree(_tree)
  1899. {
  1900. i = 0;
  1901. }
  1902. bool first()
  1903. {
  1904. i = 0;
  1905. return next();
  1906. }
  1907. bool next()
  1908. {
  1909. StringBuffer key;
  1910. key.append("Exception[").append(++i).append(']');
  1911. IPropertyTree *et = tree.get()?tree->queryPropTree(key.str()):NULL;
  1912. if (!et) {
  1913. cur.clear();
  1914. return false;
  1915. }
  1916. int code = et->getPropInt("@exceptionCode");
  1917. StringBuffer msg;
  1918. et->getProp("@exceptionMessage",msg);
  1919. cur.setown(MakeStringException(code, "%s", msg.str()));
  1920. return true;
  1921. }
  1922. bool isValid()
  1923. {
  1924. return cur.get()!=NULL;
  1925. }
  1926. IException & query()
  1927. {
  1928. return *cur.get();
  1929. }
  1930. };
  1931. class CDFUWorkUnit: public CDFUWorkUnitBase
  1932. {
  1933. mutable CDFUprogress progress;
  1934. mutable CDFUfileSpec source;
  1935. mutable CDFUfileSpec destination;
  1936. mutable CDFUoptions options;
  1937. mutable CDFUmonitor monitor;
  1938. Mutex updatelock;
  1939. bool updating;
  1940. Linked<IDFUprogressSubscriber> subscriber;
  1941. Linked<IDFUabortSubscriber> abortsubscriber;
  1942. SubscriptionId subscriberid;
  1943. Semaphore completed;
  1944. unsigned localedition;
  1945. Linked<IDFUWorkUnitFactory> parent;
  1946. public:
  1947. bool checkconn()
  1948. {
  1949. if (!conn) {
  1950. StringBuffer wuRoot;
  1951. getXPath(wuRoot, queryId());
  1952. conn.setown(querySDS().connect(wuRoot.str(), myProcessSession() , 0, SDS_LOCK_TIMEOUT));
  1953. if (!conn)
  1954. return false;
  1955. root.setown(conn->getRoot());
  1956. }
  1957. return true;
  1958. }
  1959. CDFUWorkUnit(IDFUWorkUnitFactory *_parent,IRemoteConnection *_conn,IPropertyTree *tree,bool _lock=false)
  1960. : parent(_parent)
  1961. {
  1962. updating = false;
  1963. subscriberid = 0;
  1964. if (_conn) {
  1965. conn.setown(_conn);
  1966. root.setown(conn->getRoot());
  1967. }
  1968. else
  1969. root.set(tree);
  1970. localedition = _lock?(unsigned)root->getPropInt("Progress/Edition",0):0;
  1971. progress.init(this,"Progress",!_lock);
  1972. source.init(this,"Source",!_lock);
  1973. destination.init(this,"Destination",!_lock);
  1974. options.init(this,"Options",!_lock);
  1975. monitor.init(this,"Monitor",!_lock);
  1976. if (_lock) {
  1977. updatelock.lock();
  1978. assertex(!updating);
  1979. updating = true;
  1980. }
  1981. }
  1982. ~CDFUWorkUnit()
  1983. {
  1984. CriticalBlock block(crit);
  1985. try {
  1986. subscriber.clear();
  1987. abortsubscriber.clear();
  1988. unsubscribe();
  1989. if (updating) {
  1990. conn.clear();
  1991. updatelock.unlock();
  1992. }
  1993. else if (conn) {
  1994. conn->rollback(); // prevent writing created branches
  1995. conn.clear();
  1996. }
  1997. }
  1998. catch (IException *e) {
  1999. // destructor should always succeed
  2000. EXCLOG(e,"~CDFUWorkUnit");
  2001. e->Release();
  2002. }
  2003. }
  2004. const char *queryId() const
  2005. {
  2006. return root->queryName();
  2007. }
  2008. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2009. { // for progress and/or state changed
  2010. Linked<IDFUabortSubscriber> notifyabortsubscriber;
  2011. {
  2012. CriticalBlock block(crit);
  2013. if (subscriber) {
  2014. queryProgress(true);
  2015. subscriber->notify(&progress);
  2016. }
  2017. else if (abortsubscriber)
  2018. queryProgress(true); // reload progress
  2019. DFUstate state = progress.getState();
  2020. switch (state) {
  2021. case DFUstate_aborting:
  2022. if (abortsubscriber) {
  2023. notifyabortsubscriber.set(abortsubscriber.getClear());
  2024. notifyabortsubscriber->notifyAbort();
  2025. }
  2026. return;
  2027. case DFUstate_aborted:
  2028. if (abortsubscriber) {
  2029. notifyabortsubscriber.set(abortsubscriber.getClear());
  2030. break;
  2031. }
  2032. // fall through
  2033. case DFUstate_failed:
  2034. case DFUstate_finished:
  2035. completed.signal();
  2036. break;
  2037. }
  2038. }
  2039. if (notifyabortsubscriber) {
  2040. notifyabortsubscriber->notifyAbort();
  2041. completed.signal();
  2042. }
  2043. }
  2044. void requestAbort()
  2045. {
  2046. updatelock.lock();
  2047. progress.setState(DFUstate_aborting);
  2048. updatelock.unlock();
  2049. }
  2050. StringBuffer &getDFUServerName(StringBuffer &str) const
  2051. {
  2052. root->getProp("@dfuserver",str);
  2053. return str;
  2054. }
  2055. StringBuffer &getClusterName(StringBuffer &str) const
  2056. {
  2057. root->getProp("@cluster",str);
  2058. return str;
  2059. }
  2060. StringBuffer &getJobName(StringBuffer &str) const
  2061. {
  2062. root->getProp("@jobName",str);
  2063. return str;
  2064. }
  2065. StringBuffer &getQueue(StringBuffer &str) const
  2066. {
  2067. root->getProp("@queue",str);
  2068. return str;
  2069. }
  2070. StringBuffer &getUser(StringBuffer &str) const
  2071. {
  2072. root->getProp("@submitID",str);
  2073. return str;
  2074. }
  2075. StringBuffer &getPassword(StringBuffer &str) const
  2076. {
  2077. StringBuffer pw;
  2078. root->getProp("@password",pw);
  2079. if (pw.length()) {
  2080. StringBuffer buf;
  2081. decrypt(buf,pw.str()); // minimal encryprion to obscure (will need improvement)
  2082. const char *p = buf.str();
  2083. const char *i = queryId();
  2084. while (*p&&*i&&(*p==*i)) {
  2085. p++;
  2086. i++;
  2087. }
  2088. str.append(p);
  2089. }
  2090. return str;
  2091. }
  2092. bool isProtected() const
  2093. {
  2094. return root->getPropInt("@protected",0)!=0;
  2095. }
  2096. IDFUWorkUnit *openUpdate(bool exclusive)
  2097. {
  2098. updatelock.lock();
  2099. assertex(!updating);
  2100. updating = true;
  2101. if (!checkconn())
  2102. return NULL;
  2103. conn->changeMode(exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  2104. reinit();
  2105. localedition = (unsigned)root->getPropInt("Progress/Edition",0);
  2106. Link();
  2107. return this;
  2108. }
  2109. void closeUpdate()
  2110. {
  2111. assertex(updating);
  2112. conn->changeMode(0, SDS_LOCK_TIMEOUT);
  2113. reinit();
  2114. updating = false;
  2115. updatelock.unlock();
  2116. }
  2117. DFUcmd getCommand() const
  2118. {
  2119. StringBuffer s;
  2120. root->getProp("@command",s);
  2121. return decodeDFUcommand(s.str());
  2122. }
  2123. StringBuffer &getCommandName(StringBuffer &str) const
  2124. {
  2125. root->getProp("@command",str);
  2126. return str;
  2127. }
  2128. CDateTime &getTimeScheduled(CDateTime &val) const
  2129. {
  2130. StringBuffer str;
  2131. root->getProp("@timescheduled",str);
  2132. val.setString(str.str());
  2133. return val;
  2134. }
  2135. void setTimeScheduled(const CDateTime &val)
  2136. {
  2137. StringBuffer str;
  2138. val.getString(str);
  2139. root->setProp("@timescheduled",str.str());
  2140. }
  2141. IConstDFUoptions *queryOptions() const
  2142. {
  2143. return &options;
  2144. }
  2145. IConstDFUfileSpec *querySource() const
  2146. {
  2147. return &source;
  2148. }
  2149. IConstDFUfileSpec *queryDestination() const
  2150. {
  2151. return &destination;
  2152. }
  2153. IConstDFUprogress *queryProgress(bool reload=true)
  2154. {
  2155. CriticalBlock block(crit);
  2156. if (reload) {
  2157. if (!checkconn())
  2158. return NULL;
  2159. if (updating)
  2160. conn->commit();
  2161. else
  2162. conn->rollback(); // prevent writing created branches
  2163. conn->reload("Progress");
  2164. if (!checkconn())
  2165. return NULL;
  2166. progress.reinit();
  2167. }
  2168. return &progress;
  2169. }
  2170. IConstDFUmonitor *queryMonitor(bool reload=true)
  2171. {
  2172. CriticalBlock block(crit);
  2173. if (reload) {
  2174. if (!checkconn())
  2175. return NULL;
  2176. if (updating)
  2177. conn->commit();
  2178. else
  2179. conn->rollback(); // prevent writing created branches
  2180. conn->reload("Monitor");
  2181. if (!checkconn())
  2182. return NULL;
  2183. monitor.reinit();
  2184. }
  2185. return &monitor;
  2186. }
  2187. void subscribe()
  2188. {
  2189. // called with crit locked
  2190. if (!subscriberid) {
  2191. StringBuffer xpath;
  2192. getXPath(xpath,queryId()).append("/Progress/Edition");
  2193. if (parent)
  2194. subscriberid = (SubscriptionId)parent->subscribe(xpath.str(),QUERYINTERFACE(this,ISDSSubscription));
  2195. }
  2196. }
  2197. void unsubscribe()
  2198. {
  2199. // NOT called with crit locked (as causes deadlock in notifyAbort)
  2200. if (subscriberid) {
  2201. if (parent)
  2202. parent->subscribe(NULL,QUERYINTERFACE(this,ISDSSubscription));
  2203. subscriberid = 0;
  2204. }
  2205. }
  2206. void subscribeProgress(IDFUprogressSubscriber *sub)
  2207. {
  2208. {
  2209. CriticalBlock block(crit);
  2210. if (sub) {
  2211. subscriber.set(sub);
  2212. subscribe();
  2213. return;
  2214. }
  2215. else {
  2216. subscriber.clear();
  2217. if (abortsubscriber.get())
  2218. return;
  2219. }
  2220. }
  2221. unsubscribe(); // call outside crit
  2222. }
  2223. void subscribeAbort(IDFUabortSubscriber *sub)
  2224. {
  2225. {
  2226. CriticalBlock block(crit);
  2227. if (sub) {
  2228. abortsubscriber.set(sub);
  2229. subscribe();
  2230. return;
  2231. }
  2232. else {
  2233. abortsubscriber.clear();
  2234. if (subscriber.get())
  2235. return;
  2236. }
  2237. }
  2238. unsubscribe(); // call outside crit
  2239. }
  2240. DFUstate waitForCompletion(int timeout)
  2241. {
  2242. {
  2243. CriticalBlock block(crit);
  2244. subscribe();
  2245. }
  2246. for (;;) {
  2247. DFUstate ret = queryProgress(true)->getState();
  2248. switch (ret) {
  2249. case DFUstate_aborted:
  2250. case DFUstate_failed:
  2251. case DFUstate_finished:
  2252. return ret;
  2253. }
  2254. if (!completed.wait(timeout)) // should only go round loop once
  2255. break;
  2256. }
  2257. return queryProgress(true)->getState();
  2258. }
  2259. void reinit()
  2260. {
  2261. if (checkconn()) {
  2262. root.setown(conn->getRoot());
  2263. progress.reinit();
  2264. source.reinit();
  2265. destination.reinit();
  2266. options.reinit();
  2267. monitor.reinit();
  2268. }
  2269. }
  2270. unsigned commit()
  2271. {
  2272. CriticalBlock block(crit);
  2273. if (!conn)
  2274. return 0;
  2275. assertex(updating);
  2276. localedition++;
  2277. root->setPropInt("Progress/Edition",localedition);
  2278. conn->commit();
  2279. reinit();
  2280. return localedition;
  2281. }
  2282. void rollback()
  2283. {
  2284. CriticalBlock block(crit);
  2285. if (conn) {
  2286. conn->rollback();
  2287. reinit();
  2288. }
  2289. }
  2290. unsigned getEdition(bool local)
  2291. {
  2292. CriticalBlock block(crit);
  2293. if (local)
  2294. return localedition;
  2295. if (!conn)
  2296. return 0;
  2297. conn->reload("Progress/Edition"); // this may cause problems TBI
  2298. return root->getPropInt("Progress/Edition",0);
  2299. }
  2300. void protect(bool protectMode)
  2301. {
  2302. root->setPropInt("@protected", protectMode?1:0);
  2303. }
  2304. void setDFUServerName(const char * val)
  2305. {
  2306. root->setProp("@dfuserver",val);
  2307. }
  2308. void setClusterName(const char * val)
  2309. {
  2310. root->setProp("@cluster",val);
  2311. }
  2312. void setJobName(const char * val)
  2313. {
  2314. root->setProp("@jobName",val);
  2315. }
  2316. void setQueue(const char * val)
  2317. {
  2318. root->setProp("@queue",val);
  2319. }
  2320. void setUser(const char * val)
  2321. {
  2322. root->setProp("@submitID",val);
  2323. }
  2324. void setPassword(const char * val)
  2325. {
  2326. if (!val||!*val)
  2327. return;
  2328. StringBuffer pw; // minimal encryprion to obscure (will need improvement)
  2329. pw.append(queryId());
  2330. pw.append(val);
  2331. StringBuffer buf;
  2332. encrypt(buf,pw.str());
  2333. root->setProp("@password",buf.str());
  2334. }
  2335. void setCommand(DFUcmd cmd)
  2336. {
  2337. StringBuffer s;
  2338. encodeDFUcommand(cmd,s);
  2339. root->setProp("@command",s.str());
  2340. }
  2341. IDFUoptions *queryUpdateOptions()
  2342. {
  2343. return &options;
  2344. }
  2345. IDFUfileSpec *queryUpdateSource()
  2346. {
  2347. return &source;
  2348. }
  2349. IDFUfileSpec *queryUpdateDestination()
  2350. {
  2351. return &destination;
  2352. }
  2353. void addOptions(IPropertyTree *tree)
  2354. {
  2355. // TBD
  2356. }
  2357. IDFUprogress *queryUpdateProgress()
  2358. {
  2359. return &progress;
  2360. }
  2361. IDFUmonitor *queryUpdateMonitor()
  2362. {
  2363. return &monitor;
  2364. }
  2365. void cleanupAndDelete()
  2366. {
  2367. if (isProtected())
  2368. throw MakeStringException(-1, "DFU Workunit is protected");
  2369. switch (progress.getState()) {
  2370. case DFUstate_unknown:
  2371. case DFUstate_aborted:
  2372. case DFUstate_failed:
  2373. case DFUstate_finished:
  2374. break;
  2375. default:
  2376. throw MakeStringException(-1, "DFU Workunit is active");
  2377. break;
  2378. }
  2379. if (checkconn()) {
  2380. conn->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // make sure not locked
  2381. root.clear();
  2382. conn->close(true);
  2383. conn.clear();
  2384. }
  2385. }
  2386. void queryRecoveryStore(IRemoteConnection *& _conn,IPropertyTree *&_tree, StringBuffer &runningpath)
  2387. {
  2388. if (!conn)
  2389. reinit();
  2390. _conn = conn;
  2391. _tree = root->queryPropTree("Recovery");
  2392. if (!_tree)
  2393. _tree = root->addPropTree("Recovery",createPTree("Recovery"));
  2394. getXPath(runningpath,queryId()).append("/Running");
  2395. }
  2396. void removeRecoveryStore()
  2397. {
  2398. IPropertyTree *tree = root->queryPropTree("Recovery");
  2399. root->removeTree(tree);
  2400. }
  2401. void addException(IException *e)
  2402. {
  2403. IPropertyTree *tree = root->queryPropTree("Exceptions");
  2404. if (!tree)
  2405. tree = root->addPropTree("Exceptions",createPTree("Exceptions"));
  2406. IPropertyTree *et = tree->addPropTree("Exception",createPTree("Exception"));
  2407. et->setPropInt("@exceptionCode", e->errorCode());
  2408. StringBuffer msg;
  2409. et->setProp("@exceptionMessage",e->errorMessage(msg).str());
  2410. }
  2411. IExceptionIterator *getExceptionIterator()
  2412. {
  2413. IPropertyTree *tree = root->queryPropTree("Exceptions");
  2414. return new CExceptionIterator(tree);
  2415. }
  2416. void clearExceptions()
  2417. {
  2418. removeTree(root,"Exceptions");
  2419. }
  2420. StringBuffer& getApplicationValue(const char *app, const char *propname, StringBuffer &str) const
  2421. {
  2422. IPropertyTree *tree = root->queryPropTree("Application");
  2423. if (tree) {
  2424. StringBuffer prop;
  2425. prop.append(app).append('/').append(propname);
  2426. tree->getProp(prop.str(),str);
  2427. }
  2428. return str;
  2429. }
  2430. int getApplicationValueInt(const char *app, const char *propname, int ret) const
  2431. {
  2432. IPropertyTree *tree = root->queryPropTree("Application");
  2433. if (tree) {
  2434. StringBuffer prop;
  2435. prop.append(app).append('/').append(propname);
  2436. ret = tree->getPropInt(prop.str(),ret);
  2437. }
  2438. return ret;
  2439. }
  2440. void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
  2441. {
  2442. IPropertyTree *tree = root->queryPropTree("Application");
  2443. if (!tree)
  2444. tree = root->addPropTree("Application",createPTree("Application"));
  2445. IPropertyTree *sub = tree->queryPropTree(app);
  2446. if (!sub)
  2447. sub = tree->addPropTree(app,createPTree(app));
  2448. if (overwrite || !sub->hasProp(propname))
  2449. sub->setProp(propname, value);
  2450. }
  2451. void setApplicationValueInt(const char *app, const char *propname, int value, bool overwrite)
  2452. {
  2453. StringBuffer str;
  2454. str.append(value);
  2455. setApplicationValue( app, propname, str.str(), overwrite);
  2456. }
  2457. StringBuffer &getDebugValue(const char *propname, StringBuffer &str) const
  2458. {
  2459. StringBuffer prop("Debug/");
  2460. prop.append(propname);
  2461. const char * val = root->queryProp(prop.str());
  2462. if (!val)
  2463. return str;
  2464. return str.append(val);
  2465. }
  2466. void setDebugValue(const char *propname, const char *value, bool overwrite)
  2467. {
  2468. IPropertyTree *tree = root->queryPropTree("Debug");
  2469. if (!tree)
  2470. tree = root->addPropTree("Debug",createPTree("Debug"));
  2471. if (overwrite || !tree->hasProp(propname))
  2472. tree->setProp(propname, value);
  2473. }
  2474. StringBuffer &toXML(StringBuffer &str)
  2475. {
  2476. if (root)
  2477. ::toXML(root, str, 0, XML_Format|XML_SortTags);
  2478. return str;
  2479. }
  2480. bool removeQueue()
  2481. {
  2482. StringBuffer qname;
  2483. if (getQueue(qname).length()!=0) {
  2484. Owned<IJobQueue> queue = createJobQueue(qname.str());
  2485. if (queue.get()) {
  2486. Owned<IJobQueueItem> item = queue->take(queryId());
  2487. if (item.get())
  2488. return true;
  2489. }
  2490. }
  2491. return false;
  2492. }
  2493. };
  2494. class CConstDFUWorkUnitIterator: implements IConstDFUWorkUnitIterator, public CInterface
  2495. {
  2496. Linked<IRemoteConnection> conn;
  2497. Linked<IPropertyTreeIterator> iter;
  2498. Linked<IDFUWorkUnitFactory> parent;
  2499. public:
  2500. IMPLEMENT_IINTERFACE;
  2501. CConstDFUWorkUnitIterator(IDFUWorkUnitFactory *_parent,IRemoteConnection *_conn,IPropertyTreeIterator *_iter) // takes ownership of conn and iter
  2502. : parent(_parent), conn(_conn),iter(_iter)
  2503. {
  2504. }
  2505. ~CConstDFUWorkUnitIterator()
  2506. {
  2507. iter.clear();
  2508. conn.clear();
  2509. }
  2510. bool first()
  2511. {
  2512. return iter?iter->first():false;
  2513. }
  2514. bool next()
  2515. {
  2516. return iter?iter->next():false;
  2517. }
  2518. bool isValid()
  2519. {
  2520. return iter&&iter->isValid();
  2521. }
  2522. StringBuffer &getId(StringBuffer &str)
  2523. {
  2524. IPropertyTree &pt=iter->query();
  2525. return pt.getName(str);
  2526. }
  2527. virtual IConstDFUWorkUnit * get()
  2528. {
  2529. if (!isValid())
  2530. return NULL;
  2531. StringBuffer wuid;
  2532. return parent?parent->openWorkUnit(getId(wuid).str(),false):NULL;
  2533. }
  2534. };
  2535. class CConstDFUWUArrayIterator : implements IConstDFUWorkUnitIterator, public CInterface
  2536. {
  2537. unsigned idx;
  2538. IArrayOf<IConstDFUWorkUnit> wua;
  2539. public:
  2540. IMPLEMENT_IINTERFACE;
  2541. CConstDFUWUArrayIterator(IDFUWorkUnitFactory *_parent,IRemoteConnection *_conn, IArrayOf<IPropertyTree> &trees)
  2542. {
  2543. idx = 0;
  2544. ForEachItemIn(i,trees) {
  2545. IPropertyTree &tree = trees.item(i);
  2546. wua.append(*(IConstDFUWorkUnit *) new CDFUWorkUnit(_parent,NULL,&tree));
  2547. }
  2548. }
  2549. bool first()
  2550. {
  2551. idx = 0;
  2552. return isValid();
  2553. }
  2554. bool isValid()
  2555. {
  2556. return idx<wua.ordinality();
  2557. }
  2558. bool next()
  2559. {
  2560. idx++;
  2561. return isValid();
  2562. }
  2563. IConstDFUWorkUnit & query()
  2564. {
  2565. return wua.item(idx);
  2566. }
  2567. IConstDFUWorkUnit * get()
  2568. {
  2569. if (!isValid())
  2570. return NULL;
  2571. IConstDFUWorkUnit *ret = &wua.item(idx);
  2572. return LINK(ret);
  2573. }
  2574. virtual StringBuffer &getId(StringBuffer &str)
  2575. {
  2576. return str.append(query().queryId());
  2577. }
  2578. };
  2579. class CDFUWorkUnitFactory : implements IDFUWorkUnitFactory, implements ISDSSubscription, public CInterface
  2580. {
  2581. CriticalSection proxylock;
  2582. PointerArray subscribers;
  2583. Int64Array subscriberids;
  2584. Int64Array active; // active TIDS
  2585. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2586. {
  2587. __uint64 tid = (__uint64) GetCurrentThreadId();
  2588. Linked<ISDSSubscription> dest;
  2589. {
  2590. CriticalBlock block(proxylock);
  2591. if (active.find(tid)!=NotFound)
  2592. return;
  2593. active.append(tid);
  2594. unsigned i1 = subscriberids.find(id);
  2595. if (i1!=NotFound)
  2596. dest.set((ISDSSubscription *)subscribers.item(i1));
  2597. }
  2598. try {
  2599. if (dest.get())
  2600. dest->notify(id,xpath,flags,valueLen,valueData);
  2601. }
  2602. catch (IException *e) {
  2603. EXCLOG(e,"CDFUWorkUnitFactory:notify");
  2604. e->Release();
  2605. }
  2606. CriticalBlock block(proxylock);
  2607. active.zap(tid);
  2608. }
  2609. __int64 subscribe (const char *xpath,void *iface)
  2610. {
  2611. // idea is to avoid subscribing/unsubscribing while processing a notify
  2612. // *unless* on my own thread
  2613. // this happens when waiting to abort
  2614. CriticalBlock block(proxylock);
  2615. unsigned __int64 tid = (unsigned __int64) GetCurrentThreadId();
  2616. ThreadId atid = 0;
  2617. for (unsigned i=0;i<100;i++) {
  2618. bool ok = true;
  2619. ForEachItemInRev(j,active) {
  2620. if (active.item(j)!=tid) {
  2621. ok = false;
  2622. atid = (ThreadId)active.item(j);
  2623. }
  2624. }
  2625. if (ok)
  2626. break;
  2627. if (i%10==9)
  2628. DBGLOG("CDFUWorkUnitFactory: Subscription(%d,%" I64F "d) busy %s",i,(__int64)atid,xpath?xpath:"");
  2629. CriticalUnblock unblock(proxylock);
  2630. Sleep(i*10);
  2631. if (i==99)
  2632. PrintStackReport();
  2633. }
  2634. SubscriptionId subscriberid = 0;
  2635. ForEachItemInRev(i1,subscribers) {
  2636. if (subscribers.item(i1)==iface) {
  2637. querySDS().unsubscribe(subscriberids.item(i1));
  2638. subscribers.remove(i1);
  2639. subscriberids.remove(i1);
  2640. }
  2641. }
  2642. if (xpath) {
  2643. subscriberid = querySDS().subscribe(xpath, *this, false);
  2644. subscribers.append(iface);
  2645. subscriberids.append(subscriberid);
  2646. }
  2647. return subscriberid;
  2648. }
  2649. public:
  2650. IMPLEMENT_IINTERFACE;
  2651. IDFUWorkUnit * createWorkUnit()
  2652. {
  2653. StringBuffer wuid;
  2654. newWUID(wuid);
  2655. StringBuffer wuRoot;
  2656. getXPath(wuRoot, wuid.str());
  2657. IRemoteConnection* conn = querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_UNIQUE, SDS_LOCK_TIMEOUT);
  2658. conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  2659. IDFUWorkUnit *ret = new CDFUWorkUnit(this, conn, NULL, true);
  2660. // created time stamp? TBD
  2661. return ret;
  2662. }
  2663. bool deleteWorkUnit(const char * wuid)
  2664. {
  2665. StringBuffer wuids(wuid);
  2666. wuids.trim();
  2667. if (!wuids.length())
  2668. return false;
  2669. StringBuffer wuRoot;
  2670. getXPath(wuRoot, wuids.str());
  2671. IRemoteConnection *conn = querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  2672. if (!conn)
  2673. return false;
  2674. Owned<CDFUWorkUnit> cw = new CDFUWorkUnit(this,conn,NULL,true);
  2675. try
  2676. {
  2677. cw->cleanupAndDelete();
  2678. return true;
  2679. }
  2680. catch (IException *e)
  2681. {
  2682. EXCLOG(e, "DFUWU Exception: ");
  2683. e->Release();
  2684. return false;
  2685. }
  2686. return false;
  2687. }
  2688. IConstDFUWorkUnit * openWorkUnit(const char * wuid, bool lock)
  2689. {
  2690. StringBuffer wuids(wuid);
  2691. wuids.trim();
  2692. if (!wuids.length())
  2693. return NULL;
  2694. StringBuffer wuRoot;
  2695. getXPath(wuRoot, wuids.str());
  2696. IRemoteConnection* conn = querySDS().connect(wuRoot.str(), myProcessSession() , lock ? RTM_LOCK_READ : 0, SDS_LOCK_TIMEOUT);
  2697. if (!conn)
  2698. return NULL;
  2699. return new CDFUWorkUnit(this, conn, NULL, false);
  2700. }
  2701. IConstDFUWorkUnitIterator * getWorkUnitsByXPath(const char *xpath)
  2702. {
  2703. StringBuffer wuRoot;
  2704. getXPathBase(wuRoot);
  2705. Owned<IRemoteConnection> conn = querySDS().connect(wuRoot.str(), myProcessSession() , 0, SDS_LOCK_TIMEOUT);
  2706. if (!conn.get())
  2707. return new CConstDFUWorkUnitIterator(this,NULL,NULL);
  2708. CDaliVersion serverVersionNeeded("3.2");
  2709. Owned<IPropertyTreeIterator> iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ?
  2710. conn->queryRoot()->getElements(xpath) :
  2711. conn->getElements(xpath));
  2712. return new CConstDFUWorkUnitIterator(this,conn,iter);
  2713. }
  2714. IConstDFUWorkUnitIterator * getWorkUnitsByOwner(const char * owner)
  2715. {
  2716. StringBuffer path("*");
  2717. if (owner && *owner)
  2718. path.append("[@submitID=\"").append(owner).append("\"]");
  2719. return getWorkUnitsByXPath(path.str());
  2720. }
  2721. IConstDFUWorkUnitIterator * getWorkUnitsByState(DFUstate state)
  2722. {
  2723. StringBuffer path;
  2724. encodeDFUstate(state,path.append("*[Progress/@state=\"")).append("\"]");
  2725. return getWorkUnitsByXPath(path.str());
  2726. }
  2727. IDFUWorkUnit * updateWorkUnit(const char * wuid, bool exclusive)
  2728. {
  2729. StringBuffer wuids(wuid);
  2730. wuids.trim();
  2731. if (!wuids.length())
  2732. return NULL;
  2733. StringBuffer wuRoot;
  2734. getXPath(wuRoot, wuids.str());
  2735. IRemoteConnection* conn = querySDS().connect(wuRoot.str(), myProcessSession(), exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  2736. if (!conn)
  2737. return NULL;
  2738. return new CDFUWorkUnit(this, conn, NULL, true);
  2739. }
  2740. IConstDFUWorkUnitIterator* getWorkUnitsSorted( DFUsortfield *sortorder, // list of fields to sort by (terminated by WUSFterm)
  2741. DFUsortfield *filters, // list of fields to filter by (terminated by WUSFterm)
  2742. const void *filterbuf,
  2743. unsigned startoffset,
  2744. unsigned maxnum,
  2745. const char *queryowner,
  2746. __int64 *cachehint,
  2747. unsigned *total)
  2748. {
  2749. class CDFUWorkUnitsPager : implements IElementsPager, public CSimpleInterface
  2750. {
  2751. StringAttr xPath;
  2752. StringAttr sortOrder;
  2753. StringAttr nameFilterLo;
  2754. StringAttr nameFilterHi;
  2755. StringArray unknownAttributes;
  2756. public:
  2757. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2758. CDFUWorkUnitsPager(const char* _xPath, const char *_sortOrder, const char* _nameFilterLo, const char* _nameFilterHi, StringArray& _unknownAttributes)
  2759. : xPath(_xPath), sortOrder(_sortOrder), nameFilterLo(_nameFilterLo), nameFilterHi(_nameFilterHi)
  2760. {
  2761. ForEachItemIn(x, _unknownAttributes)
  2762. unknownAttributes.append(_unknownAttributes.item(x));
  2763. }
  2764. virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
  2765. {
  2766. Owned<IRemoteConnection> conn = querySDS().connect("DFU/WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
  2767. if (!conn)
  2768. return NULL;
  2769. Owned<IPropertyTreeIterator> iter = conn->getElements(xPath);
  2770. if (!iter)
  2771. return NULL;
  2772. sortElements(iter, sortOrder.get(), nameFilterLo.get(), nameFilterHi.get(), unknownAttributes, elements);
  2773. return conn.getClear();
  2774. }
  2775. virtual bool allMatchingElementsReceived() { return true; }//For now, dali always returns all of matched WUs.
  2776. };
  2777. StringBuffer query;
  2778. StringAttr namefilter("*");
  2779. StringBuffer so;
  2780. const char *field;
  2781. StringBuffer sf;
  2782. StringAttr namefilterlo;
  2783. StringAttr namefilterhi;
  2784. StringArray unknownAttributes;
  2785. if (filters)
  2786. {
  2787. const char *fv = (const char *)filterbuf;
  2788. for (unsigned i=0;filters[i]!=DFUsf_term;i++)
  2789. {
  2790. DFUsortfield fmt = filters[i];
  2791. if (fmt==DFUsf_wuid)
  2792. namefilterlo.set(fv);
  2793. else if (fmt==DFUsf_wuidhigh)
  2794. namefilterhi.set(fv);
  2795. else if (fmt==DFUsf_wildwuid)
  2796. namefilter.set(fv);
  2797. else if (!fv || !*fv)
  2798. {
  2799. const char* attr = getDFUSortFieldXPath(fmt);
  2800. if (attr && *attr)
  2801. unknownAttributes.append(attr);
  2802. }
  2803. else
  2804. {
  2805. field = encodeDFUsortfield(fmt,sf.clear(),false).str();
  2806. query.append('[').append(field).append('=');
  2807. if (((int)fmt)&DFUsf_nocase)
  2808. query.append('?');
  2809. if (((int)fmt)&DFUsf_wild)
  2810. query.append('~');
  2811. query.append('"').append(fv).append("\"]");
  2812. }
  2813. fv += strlen(fv)+1;
  2814. }
  2815. }
  2816. query.insert(0, namefilter.get());
  2817. if (sortorder)
  2818. {
  2819. for (unsigned i=0;sortorder[i]!=DFUsf_term;i++)
  2820. {
  2821. field = encodeDFUsortfield(sortorder[0],sf.clear(),true).str();
  2822. if (so.length())
  2823. so.append(',');
  2824. so.append(field);
  2825. }
  2826. }
  2827. IArrayOf<IPropertyTree> results;
  2828. Owned<IElementsPager> elementsPager = new CDFUWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
  2829. Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,NULL,queryowner,cachehint,results,total, NULL);
  2830. return new CConstDFUWUArrayIterator(this,conn,results);
  2831. }
  2832. virtual unsigned numWorkUnits()
  2833. {
  2834. Owned<IRemoteConnection> conn = querySDS().connect("DFU/WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
  2835. if (!conn)
  2836. return 0;
  2837. IPropertyTree *root = conn->queryRoot();
  2838. return root->numChildren();
  2839. }
  2840. };
  2841. IDFUWorkUnitFactory * getDFUWorkUnitFactory()
  2842. {
  2843. return new CDFUWorkUnitFactory;
  2844. }
  2845. dfuwu_decl unsigned queuedJobs(const char *queuename,StringAttrArray &wulist)
  2846. {
  2847. unsigned ret = 0;
  2848. try{
  2849. Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
  2850. if (conn) {
  2851. StringBuffer mask;
  2852. mask.appendf("Server[@name=\"DFUserver\"]/Queue[@name=\"%s\"]",queuename);
  2853. Owned<IPropertyTreeIterator> iterq = conn->queryRoot()->getElements(mask.str());
  2854. ForEach(*iterq) {
  2855. Owned<IPropertyTreeIterator> iterj = iterq->query().getElements("Job");
  2856. ForEach(*iterj) {
  2857. const char *wuid = iterj->query().queryProp("@wuid");
  2858. if (wuid&&*wuid&&(*wuid!='!')) { // filter escapes
  2859. wulist.append(*new StringAttrItem(wuid));
  2860. ret++;
  2861. }
  2862. }
  2863. }
  2864. }
  2865. }
  2866. catch(IException* e){
  2867. StringBuffer msg;
  2868. e->errorMessage(msg);
  2869. IERRLOG("DFUWU runningJobs(%s) %s",queuename,msg.str());
  2870. e->Release();
  2871. }
  2872. try{
  2873. Owned<IJobQueue> queue = createJobQueue(queuename);
  2874. if (queue) {
  2875. CJobQueueContents contents;
  2876. queue->copyItems(contents);
  2877. Owned<IJobQueueIterator> iter = contents.getIterator();
  2878. ForEach(*iter) {
  2879. const char *wuid = iter->query().queryWUID();
  2880. if (wuid&&*wuid&&(*wuid!='!')) { // filter escapes
  2881. wulist.append(*new StringAttrItem(wuid));
  2882. }
  2883. }
  2884. }
  2885. }
  2886. catch(IException* e){
  2887. StringBuffer msg;
  2888. e->errorMessage(msg);
  2889. IERRLOG("DFUWU queuedJobs(%s) %s",queuename,msg.str());
  2890. e->Release();
  2891. }
  2892. return ret;
  2893. }
  2894. IDfuFileCopier *createRemoteFileCopier(const char *qname,const char *clustername, const char *jobname, bool replicate)
  2895. {
  2896. class cCopier: public CInterface, implements IDfuFileCopier
  2897. {
  2898. Owned<IDFUWorkUnitFactory> factory;
  2899. StringAttr qname;
  2900. StringAttr clustername;
  2901. StringAttr jobname;
  2902. // DFD_OS os;
  2903. StringArray wuids;
  2904. bool replicate;
  2905. public:
  2906. IMPLEMENT_IINTERFACE;
  2907. cCopier(const char *_qname,const char *_clustername, const char *_jobname, bool _replicate)
  2908. : qname(_qname), clustername(_clustername), jobname(_jobname)
  2909. {
  2910. factory.setown(getDFUWorkUnitFactory());
  2911. replicate = _replicate;
  2912. }
  2913. bool copyFile(const char *lfn,SocketEndpoint &srcdali,const char *srclfn,IUserDescriptor *srcuser,IUserDescriptor *user)
  2914. {
  2915. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  2916. wu->setClusterName(clustername);
  2917. wu->setJobName(jobname);
  2918. wu->setQueue(qname);
  2919. if (user) {
  2920. StringBuffer uname;
  2921. user->getUserName(uname);
  2922. wu->setUser(uname.str());
  2923. StringBuffer pwd;
  2924. wu->setPassword(user->getPassword(pwd).str());
  2925. }
  2926. IDFUfileSpec *source = wu->queryUpdateSource();
  2927. IDFUfileSpec *destination = wu->queryUpdateDestination();
  2928. IDFUoptions *options = wu->queryUpdateOptions();
  2929. wu->setCommand(DFUcmd_copy);
  2930. source->setLogicalName(srclfn);
  2931. source->setForeignDali(srcdali);
  2932. destination->setLogicalName(lfn);
  2933. destination->setGroupName(clustername);
  2934. options->setReplicate(true);
  2935. // should be no need for overwrite
  2936. const char *wuid = wu->queryId();
  2937. StringBuffer eps;
  2938. PROGLOG("%s: Copy %s from %s to %s",wuid,srclfn,srcdali.getUrlStr(eps).str(),lfn);
  2939. wuids.append(wuid);
  2940. submitDFUWorkUnit(wu.getClear());
  2941. return true;
  2942. }
  2943. bool wait()
  2944. {
  2945. ForEachItemIn(i,wuids) {
  2946. const char *wuid = wuids.item(i);
  2947. Owned<IConstDFUWorkUnit> dfuwu = factory->openWorkUnit(wuid,false);
  2948. if (!dfuwu)
  2949. throw MakeStringException(-1,"DFUWU %s could not be found",wuid);
  2950. IConstDFUprogress *progress = dfuwu->queryProgress();
  2951. PROGLOG("Waiting for %s",wuid);
  2952. DFUstate state = dfuwu->waitForCompletion(1000*60*60*24*4); // big timeout
  2953. switch(state)
  2954. {
  2955. case DFUstate_unknown:
  2956. case DFUstate_scheduled:
  2957. case DFUstate_queued:
  2958. case DFUstate_started:
  2959. case DFUstate_monitoring:
  2960. case DFUstate_aborting:
  2961. return false;
  2962. case DFUstate_aborted:
  2963. return false;
  2964. case DFUstate_failed:
  2965. return false;
  2966. case DFUstate_finished:
  2967. break;
  2968. }
  2969. Sleep(COPY_WAIT_SECONDS*1000);
  2970. }
  2971. return true;
  2972. }
  2973. };
  2974. return new cCopier(qname,clustername,jobname,replicate);
  2975. }
  2976. extern dfuwu_decl void submitDFUWorkUnit(IDFUWorkUnit *workunit)
  2977. {
  2978. Owned<IDFUWorkUnit> wu(workunit);
  2979. StringBuffer qname;
  2980. if (wu->getQueue(qname).length()==0) {
  2981. throw MakeStringException(-1, "DFU no queue name specified");
  2982. }
  2983. Owned<IJobQueue> queue = createJobQueue(qname.str());
  2984. if (!queue.get()) {
  2985. throw MakeStringException(-1, "Cound not create queue");
  2986. }
  2987. StringBuffer user;
  2988. wu->getUser(user);
  2989. IDFUprogress *progress = wu->queryUpdateProgress();
  2990. progress->setState(DFUstate_queued);
  2991. progress->clearProgress();
  2992. wu->clearExceptions();
  2993. wu->commit();
  2994. StringAttr wuid(wu->queryId());
  2995. wu.clear();
  2996. IJobQueueItem *item = createJobQueueItem(wuid.get());
  2997. item->setEndpoint(queryMyNode()->endpoint());
  2998. if (user.length()!=0)
  2999. item->setOwner(user.str());
  3000. queue->enqueue(item);
  3001. }
  3002. extern dfuwu_decl void submitDFUWorkUnit(const char *wuid)
  3003. {
  3004. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  3005. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(wuid);
  3006. if(!wu)
  3007. throw MakeStringException(-1, "DFU workunit %s could not be opened for update", wuid);
  3008. submitDFUWorkUnit(wu.getClear());
  3009. }