swapnodelib.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "thirdparty.h"
  15. #include "jlib.hpp"
  16. #include "jfile.hpp"
  17. #include "jptree.hpp"
  18. #include "jprop.hpp"
  19. #include "jmisc.hpp"
  20. #include "mpbase.hpp"
  21. #include "daclient.hpp"
  22. #include "dadfs.hpp"
  23. #include "dafdesc.hpp"
  24. #include "dasds.hpp"
  25. #include "danqs.hpp"
  26. #include "dalienv.hpp"
  27. #include "rmtfile.hpp"
  28. #include "rmtsmtp.hpp"
  29. #include "dautils.hpp"
  30. #include "workunit.hpp"
  31. #include "swapnodelib.hpp"
  32. #define SDS_LOCK_TIMEOUT 30000
  33. #define SWAPNODE_RETRY_TIME (1000*60*60*1) // 1hr
  34. static const LogMsgJobInfo swapnodeJob(UnknownJob, UnknownUser);
  35. static bool ensureThorIsDown(const char *cluster, bool nofail, bool wait)
  36. {
  37. bool retry = false;
  38. do {
  39. Owned<IRemoteConnection> pStatus = querySDS().connect("/Status/Servers", myProcessSession(), RTM_NONE, SDS_LOCK_TIMEOUT);
  40. Owned<IPropertyTreeIterator> it = pStatus->queryRoot()->getElements("Server[@name='ThorMaster']");
  41. retry = false;
  42. ForEach(*it) {
  43. IPropertyTree* pServer = &it->query();
  44. if (pServer->hasProp("@cluster") && !strcmp(pServer->queryProp("@cluster"), cluster)) {
  45. if (nofail) {
  46. WARNLOG("A Thor on cluster %s is still active", cluster);
  47. if (!wait)
  48. return false;
  49. Sleep(1000*10);
  50. PROGLOG("Retrying...");
  51. retry = true;
  52. break;
  53. }
  54. throw MakeStringException(-1, "A Thor cluster node swap requires the cluster to be offline. Please stop the Thor cluster '%s' and try again.", cluster);
  55. }
  56. }
  57. } while (retry);
  58. return true;
  59. }
  60. bool WuResubmit(const char *wuid)
  61. {
  62. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  63. Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
  64. if (!wu)
  65. {
  66. ERRLOG("WuResubmit(%s): could not find workunit",wuid);
  67. return false;
  68. }
  69. if (wu->getState()!=WUStateFailed)
  70. {
  71. ERRLOG("WuResubmit(%s): could not resubmit as workunit state is '%s'", wuid, wu->queryStateDesc());
  72. return false;
  73. }
  74. SCMStringBuffer token;
  75. wu->getSecurityToken(token);
  76. SCMStringBuffer user;
  77. SCMStringBuffer password;
  78. extractToken(token.str(), wuid, user, password);
  79. wu->resetWorkflow();
  80. wu->setState(WUStateSubmitted);
  81. wu->commit();
  82. wu.clear();
  83. submitWorkUnit(wuid,user.str(),password.str());
  84. PROGLOG("WuResubmit(%s): resubmitted",wuid);
  85. return true;
  86. }
  87. // SwapNode info
  88. //
  89. // SwapNode/
  90. // Thor [ @group, @timeChecked ]
  91. // BadNode [ @netAddress, @timeChecked, @time, @numTimes, @code, @rank, @ (msg)
  92. // Swap [ @inNetAddress, @outNetAddress, @time, @rank]
  93. // WorkUnit [ @id @time @resubmitted ]
  94. //time,nodenum,ip,code,errmsg
  95. //time,nodenum,swapout,swapin
  96. class CSwapNode
  97. {
  98. protected:
  99. Linked<IPropertyTree> environment;
  100. StringAttr clusterName;
  101. StringAttr groupName, spareGroupName;
  102. IPropertyTree *options;
  103. Owned<IGroup> group, spareGroup;
  104. bool checkIfNodeInUse(IpAddress &ip, bool includespares, StringBuffer &clustname)
  105. {
  106. SocketEndpoint ep(0,ip);
  107. if (RANK_NULL != group->rank(ep)) {
  108. clustname.append(groupName);
  109. return true;
  110. }
  111. else if (includespares) {
  112. if (RANK_NULL != spareGroup->rank(ep)) {
  113. clustname.append(groupName).append(" spares");
  114. return true;
  115. }
  116. }
  117. return false;
  118. }
  119. IPropertyTree *getSwapNodeInfo(bool create)
  120. {
  121. Owned<IRemoteConnection> conn = querySDS().connect("/SwapNode", myProcessSession(), RTM_LOCK_WRITE|(create?RTM_CREATE_QUERY:0), 1000*60*5);
  122. if (!conn) {
  123. ERRLOG("SWAPNODE: could not connect to /SwapNode branch");
  124. return NULL;
  125. }
  126. StringBuffer xpath;
  127. xpath.appendf("Thor[@group=\"%s\"]",groupName.get());
  128. Owned<IPropertyTree> info = conn->queryRoot()->getPropTree(xpath.str());
  129. if (!info) {
  130. if (!create) {
  131. PROGLOG("SWAPNODE: no information for group %s",groupName.get());
  132. return NULL;
  133. }
  134. info.set(conn->queryRoot()->addPropTree("Thor",createPTree("Thor")));
  135. info->setProp("@group",groupName.get());
  136. }
  137. return info.getClear();
  138. }
  139. bool doSwap(const char *oldip, const char *newip)
  140. {
  141. Owned<INode> newNode = createINode(newip);
  142. Owned<INode> oldNode = createINode(oldip);
  143. if (!group->isMember(oldNode)) {
  144. ERRLOG("Node %s is not part of group %s", oldip, groupName.get());
  145. return false;
  146. }
  147. if (group->isMember(newNode)) {
  148. ERRLOG("Node %s is already part of group %s", newip, groupName.get());
  149. return false;
  150. }
  151. queryNamedGroupStore().swapNode(oldNode->endpoint(),newNode->endpoint());
  152. return true;
  153. }
  154. bool doSingleSwapNode(const char *oldip,const char *newip,unsigned nodenum,IPropertyTree *info,const char *timechecked)
  155. {
  156. if (doSwap(oldip,newip)) {
  157. if (info) {
  158. StringBuffer times(timechecked);
  159. if (times.length()==0) {
  160. CDateTime dt;
  161. dt.setNow();
  162. dt.getString(times);
  163. }
  164. // TBD tie up with bad node in auto?
  165. IPropertyTree *swap = info->addPropTree("Swap",createPTree("Swap"));
  166. swap->setProp("@inNetAddress",newip);
  167. swap->setProp("@outNetAddress",oldip);
  168. swap->setProp("@time",times.str());
  169. if (UINT_MAX != nodenum)
  170. swap->setPropInt("@rank",nodenum-1);
  171. }
  172. return true;
  173. }
  174. return false;
  175. }
  176. void init()
  177. {
  178. StringBuffer xpath("Software/ThorCluster[@name=\"");
  179. xpath.append(clusterName).append("\"]");
  180. Owned<IRemoteConnection> conn = querySDS().connect("/Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  181. environment.setown(createPTreeFromIPT(conn->queryRoot()));
  182. options = environment->queryPropTree(xpath.str());
  183. if (!options)
  184. throwUnexpected();
  185. groupName.set(options->queryProp("@nodeGroup"));
  186. if (groupName.isEmpty())
  187. groupName.set(options->queryProp("@name"));
  188. VStringBuffer spareS("%s_spares", groupName.get());
  189. spareGroupName.set(spareS);
  190. group.setown(queryNamedGroupStore().lookup(groupName));
  191. spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName));
  192. }
  193. public:
  194. CSwapNode(const char *_clusterName) :clusterName(_clusterName)
  195. {
  196. init();
  197. }
  198. void swappedList(unsigned days, StringBuffer *out)
  199. {
  200. Owned<IPropertyTree> info = getSwapNodeInfo(true); // should put out error if returns false
  201. if (!info.get())
  202. return;
  203. CDateTime tt;
  204. CDateTime cutoff;
  205. if (days) {
  206. cutoff.setNow();
  207. cutoff.adjustTime(-60*24*(int)days);
  208. }
  209. Owned<IPropertyTreeIterator> it2 = info->getElements("Swap");
  210. ForEach(*it2) {
  211. IPropertyTree &swappednode = it2->query();
  212. const char *ts = swappednode.queryProp("@time");
  213. if (!ts)
  214. continue;
  215. if (days) {
  216. tt.setString(ts);
  217. if (cutoff.compare(tt)>0)
  218. continue;
  219. }
  220. const char *ips = swappednode.queryProp("@outNetAddress");
  221. if (!ips||!*ips)
  222. continue;
  223. IpAddress ip(ips);
  224. StringBuffer clustname;
  225. if (checkIfNodeInUse(ip,true,clustname))
  226. continue; // ignore
  227. if (out)
  228. out->append(ips).append('\n');
  229. else
  230. PROGLOG("%s",ips);
  231. }
  232. }
  233. void emailSwap(const char *msg, bool warn=false, bool sendswapped=false, bool sendhistory=false)
  234. {
  235. StringBuffer emailtarget;
  236. StringBuffer smtpserver;
  237. if (options->getProp("SwapNode/@EmailAddress",emailtarget)&&emailtarget.length()&&options->getProp("SwapNode/@EmailSMTPServer",smtpserver)&&smtpserver.length()) {
  238. const char * subject = options->queryProp("SwapNode/@EmailSubject");
  239. if (!subject)
  240. subject = "SWAPNODE automated email";
  241. StringBuffer msgs;
  242. if (!msg) {
  243. msgs.append("Swapnode command line, Cluster: ");
  244. msg = msgs.append(groupName).append('\n').str();
  245. }
  246. CDateTime dt;
  247. dt.setNow();
  248. StringBuffer out;
  249. dt.getString(out,true).append(": ").append(msg).append("\n\n");
  250. if (options->getPropBool("SwapNode/@EmailSwappedList")||sendswapped) {
  251. out.append("Currently swapped out nodes:\n");
  252. swappedList(0,&out);
  253. out.append('\n');
  254. }
  255. if (options->getPropBool("SwapNode/@EmailHistory")||sendhistory) {
  256. out.append("Swap history:\n");
  257. swapNodeHistory(0,&out);
  258. out.append('\n');
  259. }
  260. SocketEndpoint ep(smtpserver.str(),25);
  261. StringBuffer sender("swapnode@");
  262. queryHostIP().getIpText(sender);
  263. // add tbd
  264. StringBuffer ips;
  265. StringArray warnings;
  266. sendEmail(emailtarget.str(),subject,out.str(),ep.getIpText(ips).str(),ep.port,sender.str(),&warnings);
  267. ForEachItemIn(i,warnings)
  268. WARNLOG("SWAPNODE: %s",warnings.item(i));
  269. }
  270. else if (warn)
  271. WARNLOG("Either SwapNode/@EmailAddress or SwapNode/@EmailSMTPServer not set in thor.xml");
  272. }
  273. void swapNodeHistory(unsigned days,StringBuffer *out)
  274. {
  275. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  276. if (!info.get()) {
  277. if (out)
  278. out->append("No swapnode info\n");
  279. else
  280. ERRLOG("No swapnode info");
  281. return;
  282. }
  283. StringBuffer line;
  284. CDateTime tt;
  285. CDateTime cutoff;
  286. if (days) {
  287. cutoff.setNow();
  288. cutoff.adjustTime(-60*24*(int)days);
  289. }
  290. unsigned i=0;
  291. if (out)
  292. out->append("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message\n------------------------------------------------------\n");
  293. else {
  294. PROGLOG("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message");
  295. PROGLOG("------------------------------------------------------");
  296. }
  297. Owned<IPropertyTreeIterator> it1 = info->getElements("BadNode");
  298. ForEach(*it1) {
  299. IPropertyTree &badnode = it1->query();
  300. const char *ts = badnode.queryProp("@time");
  301. if (!ts)
  302. continue;
  303. if (days) {
  304. tt.setString(ts);
  305. if (cutoff.compare(tt)>0)
  306. continue;
  307. }
  308. line.clear().append(++i).append(", ");
  309. line.append(ts).append(", ").append(badnode.getPropInt("@rank",-1)+1).append(", ");
  310. badnode.getProp("@netAddress",line);
  311. line.append(", ").append(badnode.getPropInt("@code")).append(", \"");
  312. badnode.getProp(NULL,line);
  313. line.append('\"');
  314. if (out)
  315. out->append(line).append('\n');
  316. else
  317. PROGLOG("%s",line.str());
  318. }
  319. if (out)
  320. out->append("\nSwapped, Time, NodeNum, OutIp, InIp\n-----------------------------------\n");
  321. else {
  322. PROGLOG("%s", "");
  323. PROGLOG("Swapped, Time, NodeNum, OutIp, InIp");
  324. PROGLOG("-----------------------------------");
  325. }
  326. i = 0;
  327. Owned<IPropertyTreeIterator> it2 = info->getElements("Swap");
  328. ForEach(*it2) {
  329. IPropertyTree &swappednode = it2->query();
  330. const char *ts = swappednode.queryProp("@time");
  331. if (!ts)
  332. continue;
  333. if (days) {
  334. tt.setString(ts);
  335. if (cutoff.compare(tt)>0)
  336. continue;
  337. }
  338. line.clear().append(++i).append(", ");
  339. swappednode.getProp("@time",line);
  340. line.append(", ").append(swappednode.getPropInt("@rank",-1)+1).append(", ");
  341. swappednode.getProp("@outNetAddress",line);
  342. line.append(", ");
  343. swappednode.getProp("@inNetAddress",line);
  344. if (out)
  345. out->append(line.str()).append('\n');
  346. else
  347. PROGLOG("%s",line.str());
  348. }
  349. }
  350. bool checkThorNodeSwap(const char *failedwuid, unsigned mininterval)
  351. {
  352. bool ret = false;
  353. if (mininterval==(unsigned)-1) { // called by thor
  354. mininterval = 0;
  355. if (!options||!options->getPropBool("SwapNode/@autoSwapNode"))
  356. return false;
  357. if ((!failedwuid||!*failedwuid)&&!options->getPropBool("SwapNode/@checkAfterEveryJob"))
  358. return false;
  359. }
  360. try {
  361. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  362. if (info.get()) {
  363. PROGLOG("checkNodeSwap started");
  364. StringBuffer xpath;
  365. CDateTime dt;
  366. StringBuffer ts;
  367. // see if done less than mininterval ago
  368. if (mininterval) {
  369. dt.setNow();
  370. dt.adjustTime(-((int)mininterval));
  371. if (info->getProp("@timeChecked",ts)) {
  372. CDateTime dtc;
  373. dtc.setString(ts.str());
  374. if (dtc.compare(dt,false)>0) {
  375. PROGLOG("checkNodeSwap using cached validate from %s",ts.str());
  376. xpath.clear().appendf("BadNode[@timeChecked=\"%s\"]",ts.str());
  377. return info->hasProp(xpath.str());
  378. }
  379. }
  380. }
  381. Owned<IGroup> grp = queryNamedGroupStore().lookup(groupName);
  382. if (!grp)
  383. PROGLOG("%s group doesn't exist", groupName.get());
  384. else
  385. {
  386. SocketEndpointArray epa;
  387. grp->getSocketEndpoints(epa);
  388. ForEachItemIn(i1,epa) {
  389. epa.element(i1).port = getDaliServixPort();
  390. }
  391. SocketEndpointArray failures;
  392. UnsignedArray failedcodes;
  393. StringArray failedmessages;
  394. unsigned start = msTick();
  395. const char *thorname = options->queryProp("@name");
  396. StringBuffer dataDir, mirrorDir;
  397. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check
  398. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check
  399. validateNodes(epa,dataDir.str(),mirrorDir.str(),false,failures,failedcodes,failedmessages);
  400. dt.setNow();
  401. dt.getString(ts.clear());
  402. ForEachItemIn(i,failures) {
  403. SocketEndpoint ep(failures.item(i));
  404. ep.port = 0;
  405. StringBuffer ips;
  406. ep.getIpText(ips);
  407. int r = (int)grp->rank(ep);
  408. if (r<0) { // shouldn't occur
  409. ERRLOG("SWAPNODE node %s not found in group %s",ips.str(),groupName.get());
  410. continue;
  411. }
  412. PROGLOG("CheckSwapNode FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  413. // SNMP TBD?
  414. ret = true;
  415. xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str());
  416. IPropertyTree *bnt = info->queryPropTree(xpath.str());
  417. if (!bnt) {
  418. bnt = info->addPropTree("BadNode",createPTree("BadNode"));
  419. bnt->setProp("@netAddress",ips.str());
  420. }
  421. bnt->setPropInt("@numTimes",bnt->getPropInt("@numTimes",0)+1);
  422. bnt->setProp("@timeChecked",ts.str());
  423. bnt->setProp("@time",ts.str());
  424. bnt->setPropInt("@code",failedcodes.item(i));
  425. bnt->setPropInt("@rank",r);
  426. bnt->setProp(NULL,failedmessages.item(i));
  427. }
  428. if (failedwuid&&*failedwuid) {
  429. xpath.clear().appendf("WorkUnit[@id=\"%s\"]",failedwuid);
  430. IPropertyTree *wut = info->queryPropTree(xpath.str());
  431. if (!wut) {
  432. wut = info->addPropTree("WorkUnit",createPTree("WorkUnit"));
  433. wut->setProp("@id",failedwuid);
  434. }
  435. wut->setProp("@time",ts.str());
  436. }
  437. PROGLOG("checkNodeSwap: Time taken = %dms",msTick()-start);
  438. info->setProp("@timeChecked",ts.str());
  439. }
  440. }
  441. }
  442. catch (IException *e) {
  443. EXCLOG(e,"checkNodeSwap");
  444. }
  445. return ret;
  446. }
  447. };
  448. void swappedList(const char *clusterName, unsigned days, StringBuffer *out)
  449. {
  450. CSwapNode swapNode(clusterName);
  451. swapNode.swappedList(days, out);
  452. }
  453. void emailSwap(const char *clusterName, const char *msg, bool warn, bool sendswapped, bool sendhistory)
  454. {
  455. CSwapNode swapNode(clusterName);
  456. swapNode.emailSwap(msg, warn, sendswapped, sendhistory);
  457. }
  458. void swapNodeHistory(const char *clusterName, unsigned days, StringBuffer *out)
  459. {
  460. CSwapNode swapNode(clusterName);
  461. swapNode.swapNodeHistory(days, out);
  462. }
  463. bool checkThorNodeSwap(const char *clusterName, const char *failedwuid, unsigned mininterval)
  464. {
  465. CSwapNode swapNode(clusterName);
  466. return swapNode.checkThorNodeSwap(failedwuid, mininterval);
  467. }
  468. class CSingleSwapNode : public CSwapNode
  469. {
  470. public:
  471. CSingleSwapNode(const char *clusterName) : CSwapNode(clusterName)
  472. {
  473. }
  474. bool swap(const char *oldip, const char *newip)
  475. {
  476. ensureThorIsDown(clusterName,false,false);
  477. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  478. if (!doSingleSwapNode(oldip,newip,UINT_MAX,info,NULL))
  479. return false;
  480. // check to see if it was a spare and remove
  481. SocketEndpoint spareEp(newip);
  482. if (spareGroup)
  483. {
  484. rank_t r = spareGroup->rank(spareEp);
  485. if (RANK_NULL != r)
  486. {
  487. PROGLOG("Removing spare : %s", newip);
  488. spareGroup.setown(spareGroup->remove(r));
  489. queryNamedGroupStore().add(spareGroupName, spareGroup); // NB: replace
  490. }
  491. }
  492. info.clear();
  493. PROGLOG("SwapNode finished");
  494. return true;
  495. }
  496. };
  497. bool swapNode(const char *cluster, const char *oldip, const char *newip)
  498. {
  499. PROGLOG("SWAPNODE(%s,%s,%s) starting",cluster,oldip,newip);
  500. CSingleSwapNode swapNode(cluster);
  501. return swapNode.swap(oldip, newip);
  502. }
  503. class CAutoSwapNode : public CSwapNode
  504. {
  505. bool doAutoSwapNode(bool dryRun=false)
  506. {
  507. if (!checkThorNodeSwap(NULL,dryRun?0:5)) {
  508. PROGLOG("No bad nodes detected");
  509. PROGLOG("SWAPNODE(auto) exiting");
  510. return false;
  511. }
  512. Owned<IPropertyTree> info = getSwapNodeInfo(false);
  513. if (!info.get()) { // should put out error if returns false
  514. PROGLOG("SWAPNODE(auto) exiting");
  515. return false;
  516. }
  517. StringBuffer ts;
  518. if (!info->getProp("@timeChecked",ts)) {
  519. PROGLOG("SWAPNODE(auto): no check information generated");
  520. return false;
  521. }
  522. // enumerate bad nodes
  523. StringBuffer xpath;
  524. xpath.appendf("BadNode[@time=\"%s\"]",ts.str());
  525. Owned<IPropertyTreeIterator> it = info->getElements(xpath.str());
  526. SocketEndpointArray epa1;
  527. ForEach(*it) {
  528. IPropertyTree &badnode = it->query();
  529. const char *ip = badnode.queryProp("@netAddress");
  530. if (!ip)
  531. continue;
  532. SocketEndpoint ep(ip);
  533. ep.port = getDaliServixPort();
  534. epa1.append(ep);
  535. }
  536. // recheck
  537. SocketEndpointArray badepa;
  538. UnsignedArray failedcodes;
  539. StringArray failedmessages;
  540. unsigned start = msTick();
  541. const char *thorname = options->queryProp("@name");
  542. StringBuffer dataDir, mirrorDir;
  543. if (options->getPropBool("SwapNode/@swapNodeCheckPrimaryDrive",true))
  544. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check
  545. if (options->getPropBool("SwapNode/@swapNodeCheckMirrorDrive",true))
  546. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check
  547. validateNodes(epa1, dataDir.str(), mirrorDir.str(), false, badepa, failedcodes, failedmessages);
  548. if (!badepa.ordinality()) {
  549. PROGLOG("SWAPNODE: on recheck all bad nodes passed (%s,%s)",groupName.get(),ts.str());
  550. return false;
  551. }
  552. Owned<IGroup> grp = queryNamedGroupStore().lookup(groupName);
  553. CDateTime dt;
  554. dt.setNow();
  555. dt.getString(ts.clear());
  556. bool abort=false;
  557. UnsignedArray badrank;
  558. ForEachItemIn(i1,badepa) {
  559. SocketEndpoint ep(badepa.item(i1));
  560. ep.port = 0; // should be no ports in group
  561. StringBuffer ips;
  562. ep.getIpText(ips);
  563. xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str());
  564. IPropertyTree *bnt = info->queryPropTree(xpath.str());
  565. if (!bnt) {
  566. ERRLOG("SWAPNODE node %s not found in swapnode info!",ips.str());
  567. return false;
  568. }
  569. bnt->setProp("@time",ts.str());
  570. int r = bnt->getPropInt("@rank",-1);
  571. if ((int)r<0) { // shouldn't occur
  572. ERRLOG("SWAPNODE node %s rank not found in group %s",ips.str(),groupName.get());
  573. return false;
  574. }
  575. badrank.append((unsigned)r);
  576. for (unsigned j1=0;j1<i1;j1++) {
  577. SocketEndpoint ep1(badepa.item(j1));
  578. ep1.port = 0; // should be no ports in group
  579. int r1 = (int)badrank.item(j1);
  580. if ((r==(r1+1)%grp->ordinality())||
  581. (r1==(r+1)%grp->ordinality())) {
  582. StringBuffer ips1;
  583. ep1.getIpText(ips1);
  584. ERRLOG("SWAPNODE adjacent nodes %d (%s) and %d (%s) are bad!",r+1,ips.str(),r1+1,ips1.str());
  585. abort = true;
  586. }
  587. }
  588. }
  589. // now see if any of bad nodes have been swapped out recently
  590. CDateTime recent = dt;
  591. int snint = options->getPropInt("SwapNode/@swapNodeInterval",24);
  592. recent.adjustTime(-60*snint);
  593. it.setown(info->getElements("Swap"));
  594. ForEach(*it) {
  595. IPropertyTree &swappednode = it->query();
  596. CDateTime dt1;
  597. const char *dt1s = swappednode.queryProp("@time");
  598. if (!dt1s||!*dt1s)
  599. continue;
  600. dt1.setString(dt1s);
  601. if (dt1.compare(recent)<0)
  602. continue;
  603. const char *ips = swappednode.queryProp("@outNetAddress");
  604. if (!ips||!*ips)
  605. continue;
  606. int r1 = swappednode.getPropInt("@rank",-1);
  607. SocketEndpoint swappedep(ips);
  608. swappedep.port = 0;
  609. ForEachItemIn(i2,badepa) {
  610. SocketEndpoint badep(badepa.item(i2));
  611. int badr = (int)badrank.item(i2);
  612. badep.port = 0;
  613. if (swappedep.equals(badep)) {
  614. // not sure if *really* want this
  615. ERRLOG("Node %d (%s) was swapped out on %s (too recent)",badr+1,ips,dt1s);
  616. abort = true;
  617. }
  618. else if ((badr==(r1+1)%grp->ordinality())||
  619. (r1==(badr+1)%grp->ordinality())) {
  620. StringBuffer bs;
  621. ERRLOG("SWAPNODE adjacent node to bad node %d (%s), %d (%s) was swapped on %s (too recent) !",badr+1,badep.getIpText(bs).str(),r1+1,ips,dt1s);
  622. abort = true;
  623. }
  624. }
  625. }
  626. const char *intent = dryRun?"would":"will";
  627. // find spares
  628. SocketEndpointArray spareepa;
  629. StringArray swapfrom;
  630. StringArray swapto;
  631. Owned<IGroup> spareGroup;
  632. if (!abort) {
  633. spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName));
  634. if (!spareGroup) {
  635. ERRLOG("SWAPNODE could not find spare group %s", spareGroupName.get());
  636. abort = true;
  637. }
  638. else
  639. {
  640. spareGroup->getSocketEndpoints(spareepa);
  641. ForEachItemIn(i3,badepa) {
  642. StringBuffer from;
  643. badepa.item(i3).getIpText(from);
  644. if (i3<spareepa.ordinality()) {
  645. StringBuffer to;
  646. spareepa.item(i3).getIpText(to);
  647. PROGLOG("SWAPNODE %s swap node %d from %s to %s",intent,badrank.item(i3)+1,from.str(),to.str());
  648. }
  649. else {
  650. abort = true;
  651. ERRLOG("SWAPNODE no spare available to swap for node %d (%s)",badrank.item(i3)+1,from.str());
  652. }
  653. }
  654. }
  655. }
  656. // now list what can do
  657. if (abort) {
  658. ERRLOG("SWAPNODE: problems found (listed above), no swap %s be attempted",intent);
  659. return false;
  660. }
  661. if (dryRun)
  662. return false;
  663. // need to release swapnode lock for multi thor not to get deadlocked
  664. info.clear(); // NB: This clears the connection to SwapNode
  665. ensureThorIsDown(clusterName,true,true);
  666. ForEachItemIn(i4,badepa) {
  667. StringBuffer from;
  668. badepa.item(i4).getIpText(from);
  669. const SocketEndpoint &spareEp = spareepa.item(i4);
  670. StringBuffer to;
  671. spareEp.getIpText(to);
  672. rank_t r = spareGroup->rank(spareEp);
  673. spareGroup.setown(spareGroup->remove(r));
  674. queryNamedGroupStore().add(spareGroupName, spareGroup); // NB: replace
  675. Owned<IPropertyTree> info = getSwapNodeInfo(false);
  676. if (doSingleSwapNode(from.str(),to.str(),badrank.item(i4)+1,info,ts.str())) {
  677. StringBuffer msg;
  678. msg.appendf("AUTOSWAPNODE: cluster %s node %d: swapped out %s, swapped in %s",groupName.get(),badrank.item(i4)+1,from.str(),to.str());
  679. emailSwap(msg.str());
  680. FLLOG(MCoperatorError, swapnodeJob, "%s", msg.str());
  681. }
  682. }
  683. return true;
  684. }
  685. void autoRestart()
  686. {
  687. // restarts any workunits that failed near to swap
  688. // let see if need resubmit any nodes
  689. StringArray toresubmit;
  690. if (options->getPropBool("SwapNode/@swapNodeRestartJob")) {
  691. Owned<IPropertyTree> info = getSwapNodeInfo(false); // should put out error if returns false
  692. if (!info.get())
  693. {
  694. PROGLOG("SWAPNODE(autoRestart) exiting");
  695. return;
  696. }
  697. CDateTime recent;
  698. recent.setNow();
  699. recent.adjustTime(-SWAPNODE_RETRY_TIME/(1000*60));
  700. Owned<IPropertyTreeIterator> it = info->getElements("WorkUnit");
  701. ForEach(*it) {
  702. IPropertyTree &wu = it->query();
  703. const char *wuid = wu.queryProp("@id");
  704. if (!wuid)
  705. continue;
  706. if (!wu.getPropBool("@resubmitted")) {
  707. // see if any swaps recently done
  708. const char *dt1s = wu.queryProp("@time");
  709. if (!dt1s||!*dt1s)
  710. continue;
  711. CDateTime dt1;
  712. dt1.setString(dt1s);
  713. dt1.adjustTime(SWAPNODE_RETRY_TIME/(1000*60));
  714. Owned<IPropertyTreeIterator> swit = info->getElements("Swap");
  715. ForEach(*swit) {
  716. IPropertyTree &swap = swit->query();
  717. const char *dt2s = swap.queryProp("@time");
  718. if (!dt2s||!*dt2s)
  719. continue;
  720. CDateTime dt2;
  721. dt2.setString(dt2s);
  722. if ((dt2.compare(recent)>0)&&(dt1.compare(dt2)>0)) {
  723. wu.setPropBool("@resubmitted",true); // only one attempt
  724. toresubmit.append(wuid);
  725. break;
  726. }
  727. }
  728. }
  729. }
  730. }
  731. ForEachItemIn(ir,toresubmit) {
  732. WuResubmit(toresubmit.item(ir));
  733. }
  734. }
  735. public:
  736. CAutoSwapNode(const char *clusterpName) : CSwapNode(clusterpName)
  737. {
  738. }
  739. public:
  740. bool swap(bool dryRun)
  741. {
  742. PROGLOG("SWAPNODE(auto%s) starting",dryRun?",dryRun":"");
  743. if (!doAutoSwapNode(dryRun)) // using info in Dali (spares etc.)
  744. return false;
  745. autoRestart();
  746. PROGLOG("AutoSwapNode finished");
  747. return true;
  748. }
  749. };
  750. bool autoSwapNode(const char *groupName, bool dryRun)
  751. {
  752. CAutoSwapNode swapNode(groupName);
  753. return swapNode.swap(dryRun);
  754. }