swapnodelib.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "thirdparty.h"
  15. #include "jlib.hpp"
  16. #include "jfile.hpp"
  17. #include "jptree.hpp"
  18. #include "jprop.hpp"
  19. #include "jmisc.hpp"
  20. #include "mpbase.hpp"
  21. #include "daclient.hpp"
  22. #include "dadfs.hpp"
  23. #include "dafdesc.hpp"
  24. #include "dasds.hpp"
  25. #include "danqs.hpp"
  26. #include "dalienv.hpp"
  27. #include "rmtfile.hpp"
  28. #include "rmtsmtp.hpp"
  29. #include "dautils.hpp"
  30. #include "workunit.hpp"
  31. #include "swapnodelib.hpp"
  32. #define SDS_LOCK_TIMEOUT 30000
  33. #define SWAPNODE_RETRY_TIME (1000*60*60*1) // 1hr
  34. static const LogMsgJobInfo swapnodeJob(UnknownJob, UnknownUser);
  35. static bool ensureThorIsDown(const char *cluster, bool nofail, bool wait)
  36. {
  37. bool retry = false;
  38. do {
  39. Owned<IRemoteConnection> pStatus = querySDS().connect("/Status/Servers", myProcessSession(), RTM_NONE, SDS_LOCK_TIMEOUT);
  40. Owned<IPropertyTreeIterator> it = pStatus->queryRoot()->getElements("Server[@name='ThorMaster']");
  41. retry = false;
  42. ForEach(*it) {
  43. IPropertyTree* pServer = &it->query();
  44. if (pServer->hasProp("@cluster") && !strcmp(pServer->queryProp("@cluster"), cluster)) {
  45. if (nofail) {
  46. WARNLOG("A Thor on cluster %s is still active", cluster);
  47. if (!wait)
  48. return false;
  49. Sleep(1000*10);
  50. PROGLOG("Retrying...");
  51. retry = true;
  52. break;
  53. }
  54. throw MakeStringException(-1, "A Thor cluster node swap requires the cluster to be offline. Please stop the Thor cluster '%s' and try again.", cluster);
  55. }
  56. }
  57. } while (retry);
  58. return true;
  59. }
  60. bool WuResubmit(const char *wuid)
  61. {
  62. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  63. Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
  64. if (!wu)
  65. {
  66. ERRLOG("WuResubmit(%s): could not find workunit",wuid);
  67. return false;
  68. }
  69. if (wu->getState()!=WUStateFailed)
  70. {
  71. ERRLOG("WuResubmit(%s): could not resubmit as workunit state is '%s'", wuid, wu->queryStateDesc());
  72. return false;
  73. }
  74. SCMStringBuffer daToken;
  75. StringBuffer user;
  76. wu->getWorkunitDistributedAccessToken(daToken);//TODO Can we use wu->queryUser() instead?
  77. extractFromWorkunitDAToken(daToken.str(), nullptr, &user, nullptr);//get user from token
  78. wu->resetWorkflow();
  79. wu->setState(WUStateSubmitted);
  80. wu->commit();
  81. wu.clear();
  82. submitWorkUnit(wuid,user.str(),daToken.str());//use workunit token as password
  83. PROGLOG("WuResubmit(%s): resubmitted",wuid);
  84. return true;
  85. }
  86. // SwapNode info
  87. //
  88. // SwapNode/
  89. // Thor [ @group, @timeChecked ]
  90. // BadNode [ @netAddress, @timeChecked, @time, @numTimes, @code, @rank, @ (msg)
  91. // Swap [ @inNetAddress, @outNetAddress, @time, @rank]
  92. // WorkUnit [ @id @time @resubmitted ]
  93. //time,nodenum,ip,code,errmsg
  94. //time,nodenum,swapout,swapin
  95. class CSwapNode
  96. {
  97. protected:
  98. Linked<IPropertyTree> environment;
  99. StringAttr clusterName;
  100. StringAttr groupName, spareGroupName;
  101. IPropertyTree *options;
  102. Owned<IGroup> group, spareGroup;
  103. bool checkIfNodeInUse(IpAddress &ip, bool includespares, StringBuffer &clustname)
  104. {
  105. SocketEndpoint ep(0,ip);
  106. if (RANK_NULL != group->rank(ep)) {
  107. clustname.append(groupName);
  108. return true;
  109. }
  110. else if (includespares) {
  111. if (RANK_NULL != spareGroup->rank(ep)) {
  112. clustname.append(groupName).append(" spares");
  113. return true;
  114. }
  115. }
  116. return false;
  117. }
  118. IPropertyTree *getSwapNodeInfo(bool create)
  119. {
  120. Owned<IRemoteConnection> conn = querySDS().connect("/SwapNode", myProcessSession(), RTM_LOCK_WRITE|(create?RTM_CREATE_QUERY:0), 1000*60*5);
  121. if (!conn) {
  122. ERRLOG("SWAPNODE: could not connect to /SwapNode branch");
  123. return NULL;
  124. }
  125. StringBuffer xpath;
  126. xpath.appendf("Thor[@group=\"%s\"]",groupName.get());
  127. Owned<IPropertyTree> info = conn->queryRoot()->getPropTree(xpath.str());
  128. if (!info) {
  129. if (!create) {
  130. PROGLOG("SWAPNODE: no information for group %s",groupName.get());
  131. return NULL;
  132. }
  133. info.set(conn->queryRoot()->addPropTree("Thor",createPTree("Thor")));
  134. info->setProp("@group",groupName.get());
  135. }
  136. return info.getClear();
  137. }
  138. bool doSwap(const char *oldHost, const char *newHost)
  139. {
  140. Owned<INode> newNode = createINode(newHost);
  141. Owned<INode> oldNode = createINode(oldHost);
  142. if (!group->isMember(oldNode))
  143. {
  144. ERRLOG("Node %s is not part of group %s", oldHost, groupName.get());
  145. return false;
  146. }
  147. if (group->isMember(newNode))
  148. {
  149. ERRLOG("Node %s is already part of group %s", newHost, groupName.get());
  150. return false;
  151. }
  152. queryNamedGroupStore().swapNode(oldHost, newHost);
  153. return true;
  154. }
  155. bool doSingleSwapNode(const char *oldHost, const char *newHost, unsigned nodenum, IPropertyTree *info, const char *timechecked)
  156. {
  157. if (doSwap(oldHost, newHost)) {
  158. if (info) {
  159. StringBuffer times(timechecked);
  160. if (times.length()==0) {
  161. CDateTime dt;
  162. dt.setNow();
  163. dt.getString(times);
  164. }
  165. // TBD tie up with bad node in auto?
  166. IPropertyTree *swap = info->addPropTree("Swap", createPTree("Swap"));
  167. swap->setProp("@inNetAddress", newHost);
  168. swap->setProp("@outNetAddress", oldHost);
  169. swap->setProp("@time", times.str());
  170. if (UINT_MAX != nodenum)
  171. swap->setPropInt("@rank", nodenum-1);
  172. }
  173. return true;
  174. }
  175. return false;
  176. }
  177. void init()
  178. {
  179. StringBuffer xpath("Software/ThorCluster[@name=\"");
  180. xpath.append(clusterName).append("\"]");
  181. Owned<IRemoteConnection> conn = querySDS().connect("/Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  182. environment.setown(createPTreeFromIPT(conn->queryRoot()));
  183. options = environment->queryPropTree(xpath.str());
  184. if (!options)
  185. throwUnexpected();
  186. groupName.set(options->queryProp("@nodeGroup"));
  187. if (groupName.isEmpty())
  188. groupName.set(options->queryProp("@name"));
  189. VStringBuffer spareS("%s_spares", groupName.get());
  190. spareGroupName.set(spareS);
  191. group.setown(queryNamedGroupStore().lookup(groupName));
  192. spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName));
  193. }
  194. public:
  195. CSwapNode(const char *_clusterName) :clusterName(_clusterName)
  196. {
  197. init();
  198. }
  199. void swappedList(unsigned days, StringBuffer *out)
  200. {
  201. Owned<IPropertyTree> info = getSwapNodeInfo(true); // should put out error if returns false
  202. if (!info.get())
  203. return;
  204. CDateTime tt;
  205. CDateTime cutoff;
  206. if (days) {
  207. cutoff.setNow();
  208. cutoff.adjustTime(-60*24*(int)days);
  209. }
  210. Owned<IPropertyTreeIterator> it2 = info->getElements("Swap");
  211. ForEach(*it2) {
  212. IPropertyTree &swappednode = it2->query();
  213. const char *ts = swappednode.queryProp("@time");
  214. if (!ts)
  215. continue;
  216. if (days) {
  217. tt.setString(ts);
  218. if (cutoff.compare(tt)>0)
  219. continue;
  220. }
  221. const char *ips = swappednode.queryProp("@outNetAddress");
  222. if (!ips||!*ips)
  223. continue;
  224. IpAddress ip(ips);
  225. StringBuffer clustname;
  226. if (checkIfNodeInUse(ip,true,clustname))
  227. continue; // ignore
  228. if (out)
  229. out->append(ips).append('\n');
  230. else
  231. PROGLOG("%s",ips);
  232. }
  233. }
  234. void emailSwap(const char *msg, bool warn=false, bool sendswapped=false, bool sendhistory=false)
  235. {
  236. StringBuffer emailtarget;
  237. StringBuffer smtpserver;
  238. if (options->getProp("SwapNode/@EmailAddress",emailtarget)&&emailtarget.length()&&options->getProp("SwapNode/@EmailSMTPServer",smtpserver)&&smtpserver.length()) {
  239. const char * subject = options->queryProp("SwapNode/@EmailSubject");
  240. if (!subject)
  241. subject = "SWAPNODE automated email";
  242. StringBuffer msgs;
  243. if (!msg) {
  244. msgs.append("Swapnode command line, Cluster: ");
  245. msg = msgs.append(groupName).append('\n').str();
  246. }
  247. CDateTime dt;
  248. dt.setNow();
  249. StringBuffer out;
  250. dt.getString(out,true).append(": ").append(msg).append("\n\n");
  251. if (options->getPropBool("SwapNode/@EmailSwappedList")||sendswapped) {
  252. out.append("Currently swapped out nodes:\n");
  253. swappedList(0,&out);
  254. out.append('\n');
  255. }
  256. if (options->getPropBool("SwapNode/@EmailHistory")||sendhistory) {
  257. out.append("Swap history:\n");
  258. swapNodeHistory(0,&out);
  259. out.append('\n');
  260. }
  261. SocketEndpoint ep(smtpserver.str(),25);
  262. StringBuffer sender("swapnode@");
  263. queryHostIP().getIpText(sender);
  264. // add tbd
  265. StringBuffer ips;
  266. StringArray warnings;
  267. sendEmail(emailtarget.str(),subject,out.str(),ep.getIpText(ips).str(),ep.port,sender.str(),&warnings);
  268. ForEachItemIn(i,warnings)
  269. WARNLOG("SWAPNODE: %s",warnings.item(i));
  270. }
  271. else if (warn)
  272. WARNLOG("Either SwapNode/@EmailAddress or SwapNode/@EmailSMTPServer not set in thor.xml");
  273. }
  274. void swapNodeHistory(unsigned days,StringBuffer *out)
  275. {
  276. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  277. if (!info.get()) {
  278. if (out)
  279. out->append("No swapnode info\n");
  280. else
  281. ERRLOG("No swapnode info");
  282. return;
  283. }
  284. StringBuffer line;
  285. CDateTime tt;
  286. CDateTime cutoff;
  287. if (days) {
  288. cutoff.setNow();
  289. cutoff.adjustTime(-60*24*(int)days);
  290. }
  291. unsigned i=0;
  292. if (out)
  293. out->append("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message\n------------------------------------------------------\n");
  294. else {
  295. PROGLOG("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message");
  296. PROGLOG("------------------------------------------------------");
  297. }
  298. Owned<IPropertyTreeIterator> it1 = info->getElements("BadNode");
  299. ForEach(*it1) {
  300. IPropertyTree &badnode = it1->query();
  301. const char *ts = badnode.queryProp("@time");
  302. if (!ts)
  303. continue;
  304. if (days) {
  305. tt.setString(ts);
  306. if (cutoff.compare(tt)>0)
  307. continue;
  308. }
  309. line.clear().append(++i).append(", ");
  310. line.append(ts).append(", ").append(badnode.getPropInt("@rank",-1)+1).append(", ");
  311. badnode.getProp("@netAddress",line);
  312. line.append(", ").append(badnode.getPropInt("@code")).append(", \"");
  313. badnode.getProp(NULL,line);
  314. line.append('\"');
  315. if (out)
  316. out->append(line).append('\n');
  317. else
  318. PROGLOG("%s",line.str());
  319. }
  320. if (out)
  321. out->append("\nSwapped, Time, NodeNum, OutIp, InIp\n-----------------------------------\n");
  322. else {
  323. PROGLOG("%s", "");
  324. PROGLOG("Swapped, Time, NodeNum, OutIp, InIp");
  325. PROGLOG("-----------------------------------");
  326. }
  327. i = 0;
  328. Owned<IPropertyTreeIterator> it2 = info->getElements("Swap");
  329. ForEach(*it2) {
  330. IPropertyTree &swappednode = it2->query();
  331. const char *ts = swappednode.queryProp("@time");
  332. if (!ts)
  333. continue;
  334. if (days) {
  335. tt.setString(ts);
  336. if (cutoff.compare(tt)>0)
  337. continue;
  338. }
  339. line.clear().append(++i).append(", ");
  340. swappednode.getProp("@time",line);
  341. line.append(", ").append(swappednode.getPropInt("@rank",-1)+1).append(", ");
  342. swappednode.getProp("@outNetAddress",line);
  343. line.append(", ");
  344. swappednode.getProp("@inNetAddress",line);
  345. if (out)
  346. out->append(line.str()).append('\n');
  347. else
  348. PROGLOG("%s",line.str());
  349. }
  350. }
  351. bool checkThorNodeSwap(const char *failedwuid, unsigned mininterval)
  352. {
  353. bool ret = false;
  354. if (mininterval==(unsigned)-1) { // called by thor
  355. mininterval = 0;
  356. if (!options||!options->getPropBool("SwapNode/@autoSwapNode"))
  357. return false;
  358. if ((!failedwuid||!*failedwuid)&&!options->getPropBool("SwapNode/@checkAfterEveryJob"))
  359. return false;
  360. }
  361. try {
  362. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  363. if (info.get()) {
  364. PROGLOG("checkNodeSwap started");
  365. StringBuffer xpath;
  366. CDateTime dt;
  367. StringBuffer ts;
  368. // see if done less than mininterval ago
  369. if (mininterval) {
  370. dt.setNow();
  371. dt.adjustTime(-((int)mininterval));
  372. if (info->getProp("@timeChecked",ts)) {
  373. CDateTime dtc;
  374. dtc.setString(ts.str());
  375. if (dtc.compare(dt,false)>0) {
  376. PROGLOG("checkNodeSwap using cached validate from %s",ts.str());
  377. xpath.clear().appendf("BadNode[@timeChecked=\"%s\"]",ts.str());
  378. return info->hasProp(xpath.str());
  379. }
  380. }
  381. }
  382. Owned<IGroup> grp = queryNamedGroupStore().lookup(groupName);
  383. if (!grp)
  384. PROGLOG("%s group doesn't exist", groupName.get());
  385. else
  386. {
  387. SocketEndpointArray epa;
  388. grp->getSocketEndpoints(epa);
  389. ForEachItemIn(i1,epa) {
  390. epa.element(i1).port = getDaliServixPort();
  391. }
  392. SocketEndpointArray failures;
  393. UnsignedArray failedcodes;
  394. StringArray failedmessages;
  395. unsigned start = msTick();
  396. const char *thorname = options->queryProp("@name");
  397. StringBuffer dataDir, mirrorDir;
  398. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check
  399. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check
  400. validateNodes(epa,dataDir.str(),mirrorDir.str(),false,failures,failedcodes,failedmessages);
  401. dt.setNow();
  402. dt.getString(ts.clear());
  403. ForEachItemIn(i,failures) {
  404. SocketEndpoint ep(failures.item(i));
  405. ep.port = 0;
  406. StringBuffer ips;
  407. ep.getIpText(ips);
  408. int r = (int)grp->rank(ep);
  409. if (r<0) { // shouldn't occur
  410. ERRLOG("SWAPNODE node %s not found in group %s",ips.str(),groupName.get());
  411. continue;
  412. }
  413. PROGLOG("CheckSwapNode FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  414. // SNMP TBD?
  415. ret = true;
  416. xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str());
  417. IPropertyTree *bnt = info->queryPropTree(xpath.str());
  418. if (!bnt) {
  419. bnt = info->addPropTree("BadNode",createPTree("BadNode"));
  420. bnt->setProp("@netAddress",ips.str());
  421. }
  422. bnt->setPropInt("@numTimes",bnt->getPropInt("@numTimes",0)+1);
  423. bnt->setProp("@timeChecked",ts.str());
  424. bnt->setProp("@time",ts.str());
  425. bnt->setPropInt("@code",failedcodes.item(i));
  426. bnt->setPropInt("@rank",r);
  427. bnt->setProp(NULL,failedmessages.item(i));
  428. }
  429. if (failedwuid&&*failedwuid) {
  430. xpath.clear().appendf("WorkUnit[@id=\"%s\"]",failedwuid);
  431. IPropertyTree *wut = info->queryPropTree(xpath.str());
  432. if (!wut) {
  433. wut = info->addPropTree("WorkUnit",createPTree("WorkUnit"));
  434. wut->setProp("@id",failedwuid);
  435. }
  436. wut->setProp("@time",ts.str());
  437. }
  438. PROGLOG("checkNodeSwap: Time taken = %dms",msTick()-start);
  439. info->setProp("@timeChecked",ts.str());
  440. }
  441. }
  442. }
  443. catch (IException *e) {
  444. EXCLOG(e,"checkNodeSwap");
  445. }
  446. return ret;
  447. }
  448. };
  449. void swappedList(const char *clusterName, unsigned days, StringBuffer *out)
  450. {
  451. CSwapNode swapNode(clusterName);
  452. swapNode.swappedList(days, out);
  453. }
  454. void emailSwap(const char *clusterName, const char *msg, bool warn, bool sendswapped, bool sendhistory)
  455. {
  456. CSwapNode swapNode(clusterName);
  457. swapNode.emailSwap(msg, warn, sendswapped, sendhistory);
  458. }
  459. void swapNodeHistory(const char *clusterName, unsigned days, StringBuffer *out)
  460. {
  461. CSwapNode swapNode(clusterName);
  462. swapNode.swapNodeHistory(days, out);
  463. }
  464. bool checkThorNodeSwap(const char *clusterName, const char *failedwuid, unsigned mininterval)
  465. {
  466. CSwapNode swapNode(clusterName);
  467. return swapNode.checkThorNodeSwap(failedwuid, mininterval);
  468. }
  469. class CSingleSwapNode : public CSwapNode
  470. {
  471. public:
  472. CSingleSwapNode(const char *clusterName) : CSwapNode(clusterName)
  473. {
  474. }
  475. bool swap(const char *oldHost, const char *newHost)
  476. {
  477. ensureThorIsDown(clusterName,false,false);
  478. Owned<IPropertyTree> info = getSwapNodeInfo(true);
  479. if (!doSingleSwapNode(oldHost, newHost, UINT_MAX, info, NULL))
  480. return false;
  481. // check to see if it was a spare and remove
  482. SocketEndpoint spareEp(newHost);
  483. if (spareGroup)
  484. {
  485. rank_t r = spareGroup->rank(spareEp);
  486. if (RANK_NULL != r)
  487. {
  488. PROGLOG("Removing spare : %s", newHost);
  489. queryNamedGroupStore().removeNode(spareGroupName, newHost);
  490. }
  491. }
  492. info.clear();
  493. PROGLOG("SwapNode finished");
  494. return true;
  495. }
  496. };
  497. bool swapNode(const char *cluster, const char *oldHost, const char *newHost)
  498. {
  499. PROGLOG("SWAPNODE(%s,%s,%s) starting",cluster, oldHost, newHost);
  500. CSingleSwapNode swapNode(cluster);
  501. return swapNode.swap(oldHost, newHost);
  502. }
  503. class CAutoSwapNode : public CSwapNode
  504. {
  505. bool doAutoSwapNode(bool dryRun=false)
  506. {
  507. if (!checkThorNodeSwap(NULL,dryRun?0:5)) {
  508. PROGLOG("No bad nodes detected");
  509. PROGLOG("SWAPNODE(auto) exiting");
  510. return false;
  511. }
  512. Owned<IPropertyTree> info = getSwapNodeInfo(false);
  513. if (!info.get()) { // should put out error if returns false
  514. PROGLOG("SWAPNODE(auto) exiting");
  515. return false;
  516. }
  517. StringBuffer ts;
  518. if (!info->getProp("@timeChecked",ts)) {
  519. PROGLOG("SWAPNODE(auto): no check information generated");
  520. return false;
  521. }
  522. // enumerate bad nodes
  523. StringBuffer xpath;
  524. xpath.appendf("BadNode[@time=\"%s\"]",ts.str());
  525. Owned<IPropertyTreeIterator> it = info->getElements(xpath.str());
  526. SocketEndpointArray epa1;
  527. ForEach(*it) {
  528. IPropertyTree &badnode = it->query();
  529. const char *ip = badnode.queryProp("@netAddress");
  530. if (!ip)
  531. continue;
  532. SocketEndpoint ep(ip);
  533. ep.port = getDaliServixPort();
  534. epa1.append(ep);
  535. }
  536. // recheck
  537. SocketEndpointArray badepa;
  538. UnsignedArray failedcodes;
  539. StringArray failedmessages;
  540. unsigned start = msTick();
  541. const char *thorname = options->queryProp("@name");
  542. StringBuffer dataDir, mirrorDir;
  543. if (options->getPropBool("SwapNode/@swapNodeCheckPrimaryDrive",true))
  544. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check
  545. if (options->getPropBool("SwapNode/@swapNodeCheckMirrorDrive",true))
  546. getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check
  547. validateNodes(epa1, dataDir.str(), mirrorDir.str(), false, badepa, failedcodes, failedmessages);
  548. if (!badepa.ordinality()) {
  549. PROGLOG("SWAPNODE: on recheck all bad nodes passed (%s,%s)",groupName.get(),ts.str());
  550. return false;
  551. }
  552. Owned<IGroup> grp = queryNamedGroupStore().lookup(groupName);
  553. CDateTime dt;
  554. dt.setNow();
  555. dt.getString(ts.clear());
  556. bool abort=false;
  557. UnsignedArray badrank;
  558. ForEachItemIn(i1,badepa) {
  559. SocketEndpoint ep(badepa.item(i1));
  560. ep.port = 0; // should be no ports in group
  561. StringBuffer ips;
  562. ep.getIpText(ips);
  563. xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str());
  564. IPropertyTree *bnt = info->queryPropTree(xpath.str());
  565. if (!bnt) {
  566. ERRLOG("SWAPNODE node %s not found in swapnode info!",ips.str());
  567. return false;
  568. }
  569. bnt->setProp("@time",ts.str());
  570. int r = bnt->getPropInt("@rank",-1);
  571. if ((int)r<0) { // shouldn't occur
  572. ERRLOG("SWAPNODE node %s rank not found in group %s",ips.str(),groupName.get());
  573. return false;
  574. }
  575. badrank.append((unsigned)r);
  576. for (unsigned j1=0;j1<i1;j1++) {
  577. SocketEndpoint ep1(badepa.item(j1));
  578. ep1.port = 0; // should be no ports in group
  579. int r1 = (int)badrank.item(j1);
  580. if ((r==(r1+1)%grp->ordinality())||
  581. (r1==(r+1)%grp->ordinality())) {
  582. StringBuffer ips1;
  583. ep1.getIpText(ips1);
  584. ERRLOG("SWAPNODE adjacent nodes %d (%s) and %d (%s) are bad!",r+1,ips.str(),r1+1,ips1.str());
  585. abort = true;
  586. }
  587. }
  588. }
  589. // now see if any of bad nodes have been swapped out recently
  590. CDateTime recent = dt;
  591. int snint = options->getPropInt("SwapNode/@swapNodeInterval",24);
  592. recent.adjustTime(-60*snint);
  593. it.setown(info->getElements("Swap"));
  594. ForEach(*it) {
  595. IPropertyTree &swappednode = it->query();
  596. CDateTime dt1;
  597. const char *dt1s = swappednode.queryProp("@time");
  598. if (!dt1s||!*dt1s)
  599. continue;
  600. dt1.setString(dt1s);
  601. if (dt1.compare(recent)<0)
  602. continue;
  603. const char *ips = swappednode.queryProp("@outNetAddress");
  604. if (!ips||!*ips)
  605. continue;
  606. int r1 = swappednode.getPropInt("@rank",-1);
  607. SocketEndpoint swappedep(ips);
  608. swappedep.port = 0;
  609. ForEachItemIn(i2,badepa) {
  610. SocketEndpoint badep(badepa.item(i2));
  611. int badr = (int)badrank.item(i2);
  612. badep.port = 0;
  613. if (swappedep.equals(badep)) {
  614. // not sure if *really* want this
  615. ERRLOG("Node %d (%s) was swapped out on %s (too recent)",badr+1,ips,dt1s);
  616. abort = true;
  617. }
  618. else if ((badr==(r1+1)%grp->ordinality())||
  619. (r1==(badr+1)%grp->ordinality())) {
  620. StringBuffer bs;
  621. ERRLOG("SWAPNODE adjacent node to bad node %d (%s), %d (%s) was swapped on %s (too recent) !",badr+1,badep.getIpText(bs).str(),r1+1,ips,dt1s);
  622. abort = true;
  623. }
  624. }
  625. }
  626. const char *intent = dryRun?"would":"will";
  627. // find spares
  628. SocketEndpointArray spareepa;
  629. StringArray swapfrom;
  630. StringArray swapto;
  631. Owned<IGroup> spareGroup;
  632. if (!abort) {
  633. spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName));
  634. if (!spareGroup) {
  635. ERRLOG("SWAPNODE could not find spare group %s", spareGroupName.get());
  636. abort = true;
  637. }
  638. else
  639. {
  640. spareGroup->getSocketEndpoints(spareepa);
  641. ForEachItemIn(i3,badepa) {
  642. StringBuffer from;
  643. badepa.item(i3).getIpText(from);
  644. if (i3<spareepa.ordinality()) {
  645. StringBuffer to;
  646. spareepa.item(i3).getIpText(to);
  647. PROGLOG("SWAPNODE %s swap node %d from %s to %s",intent,badrank.item(i3)+1,from.str(),to.str());
  648. }
  649. else {
  650. abort = true;
  651. ERRLOG("SWAPNODE no spare available to swap for node %d (%s)",badrank.item(i3)+1,from.str());
  652. }
  653. }
  654. }
  655. }
  656. // now list what can do
  657. if (abort) {
  658. ERRLOG("SWAPNODE: problems found (listed above), no swap %s be attempted",intent);
  659. return false;
  660. }
  661. if (dryRun)
  662. return false;
  663. // need to release swapnode lock for multi thor not to get deadlocked
  664. info.clear(); // NB: This clears the connection to SwapNode
  665. ensureThorIsDown(clusterName,true,true);
  666. ForEachItemIn(i4,badepa) {
  667. StringBuffer from;
  668. badepa.item(i4).getIpText(from);
  669. const SocketEndpoint &spareEp = spareepa.item(i4);
  670. StringBuffer to;
  671. spareEp.getIpText(to);
  672. rank_t r = spareGroup->rank(spareEp);
  673. dbgassertex(RANK_NULL != r);
  674. spareGroup.setown(spareGroup->remove(r));
  675. queryNamedGroupStore().removeNode(spareGroupName, to);
  676. Owned<IPropertyTree> info = getSwapNodeInfo(false);
  677. if (doSingleSwapNode(from.str(),to.str(),badrank.item(i4)+1,info,ts.str())) {
  678. StringBuffer msg;
  679. msg.appendf("AUTOSWAPNODE: cluster %s node %d: swapped out %s, swapped in %s",groupName.get(),badrank.item(i4)+1,from.str(),to.str());
  680. emailSwap(msg.str());
  681. FLLOG(MCoperatorError, swapnodeJob, "%s", msg.str());
  682. }
  683. }
  684. return true;
  685. }
  686. void autoRestart()
  687. {
  688. // restarts any workunits that failed near to swap
  689. // let see if need resubmit any nodes
  690. StringArray toresubmit;
  691. if (options->getPropBool("SwapNode/@swapNodeRestartJob")) {
  692. Owned<IPropertyTree> info = getSwapNodeInfo(false); // should put out error if returns false
  693. if (!info.get())
  694. {
  695. PROGLOG("SWAPNODE(autoRestart) exiting");
  696. return;
  697. }
  698. CDateTime recent;
  699. recent.setNow();
  700. recent.adjustTime(-SWAPNODE_RETRY_TIME/(1000*60));
  701. Owned<IPropertyTreeIterator> it = info->getElements("WorkUnit");
  702. ForEach(*it) {
  703. IPropertyTree &wu = it->query();
  704. const char *wuid = wu.queryProp("@id");
  705. if (!wuid)
  706. continue;
  707. if (!wu.getPropBool("@resubmitted")) {
  708. // see if any swaps recently done
  709. const char *dt1s = wu.queryProp("@time");
  710. if (!dt1s||!*dt1s)
  711. continue;
  712. CDateTime dt1;
  713. dt1.setString(dt1s);
  714. dt1.adjustTime(SWAPNODE_RETRY_TIME/(1000*60));
  715. Owned<IPropertyTreeIterator> swit = info->getElements("Swap");
  716. ForEach(*swit) {
  717. IPropertyTree &swap = swit->query();
  718. const char *dt2s = swap.queryProp("@time");
  719. if (!dt2s||!*dt2s)
  720. continue;
  721. CDateTime dt2;
  722. dt2.setString(dt2s);
  723. if ((dt2.compare(recent)>0)&&(dt1.compare(dt2)>0)) {
  724. wu.setPropBool("@resubmitted",true); // only one attempt
  725. toresubmit.append(wuid);
  726. break;
  727. }
  728. }
  729. }
  730. }
  731. }
  732. ForEachItemIn(ir,toresubmit) {
  733. WuResubmit(toresubmit.item(ir));
  734. }
  735. }
  736. public:
  737. CAutoSwapNode(const char *clusterpName) : CSwapNode(clusterpName)
  738. {
  739. }
  740. public:
  741. bool swap(bool dryRun)
  742. {
  743. PROGLOG("SWAPNODE(auto%s) starting",dryRun?",dryRun":"");
  744. if (!doAutoSwapNode(dryRun)) // using info in Dali (spares etc.)
  745. return false;
  746. autoRestart();
  747. PROGLOG("AutoSwapNode finished");
  748. return true;
  749. }
  750. };
  751. bool autoSwapNode(const char *groupName, bool dryRun)
  752. {
  753. CAutoSwapNode swapNode(groupName);
  754. return swapNode.swap(dryRun);
  755. }