/*############################################################################## HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ############################################################################## */ #include "platform.h" #include "thirdparty.h" #include "jlib.hpp" #include "jfile.hpp" #include "jptree.hpp" #include "jprop.hpp" #include "jmisc.hpp" #include "mpbase.hpp" #include "daclient.hpp" #include "dadfs.hpp" #include "dafdesc.hpp" #include "dasds.hpp" #include "danqs.hpp" #include "dalienv.hpp" #include "rmtfile.hpp" #include "rmtsmtp.hpp" #include "dautils.hpp" #include "workunit.hpp" #include "swapnodelib.hpp" #define SDS_LOCK_TIMEOUT 30000 #define SWAPNODE_RETRY_TIME (1000*60*60*1) // 1hr static const LogMsgJobInfo swapnodeJob(UnknownJob, UnknownUser); static bool ensureThorIsDown(const char *cluster, bool nofail, bool wait) { bool retry = false; do { Owned pStatus = querySDS().connect("/Status/Servers", myProcessSession(), RTM_NONE, SDS_LOCK_TIMEOUT); Owned it = pStatus->queryRoot()->getElements("Server[@name='ThorMaster']"); retry = false; ForEach(*it) { IPropertyTree* pServer = &it->query(); if (pServer->hasProp("@cluster") && !strcmp(pServer->queryProp("@cluster"), cluster)) { if (nofail) { WARNLOG("A Thor on cluster %s is still active", cluster); if (!wait) return false; Sleep(1000*10); PROGLOG("Retrying..."); retry = true; break; } throw MakeStringException(-1, "A Thor cluster node swap requires the cluster to be offline. Please stop the Thor cluster '%s' and try again.", cluster); } } } while (retry); return true; } bool WuResubmit(const char *wuid) { Owned factory = getWorkUnitFactory(); Owned wu = factory->updateWorkUnit(wuid); if (!wu) { ERRLOG("WuResubmit(%s): could not find workunit",wuid); return false; } if (wu->getState()!=WUStateFailed) { ERRLOG("WuResubmit(%s): could not resubmit as workunit state is '%s'", wuid, wu->queryStateDesc()); return false; } SCMStringBuffer daToken; StringBuffer user; wu->getWorkunitDistributedAccessToken(daToken);//TODO Can we use wu->queryUser() instead? extractFromWorkunitDAToken(daToken.str(), nullptr, &user, nullptr);//get user from token wu->resetWorkflow(); wu->setState(WUStateSubmitted); wu->commit(); wu.clear(); submitWorkUnit(wuid,user.str(),daToken.str());//use workunit token as password PROGLOG("WuResubmit(%s): resubmitted",wuid); return true; } // SwapNode info // // SwapNode/ // Thor [ @group, @timeChecked ] // BadNode [ @netAddress, @timeChecked, @time, @numTimes, @code, @rank, @ (msg) // Swap [ @inNetAddress, @outNetAddress, @time, @rank] // WorkUnit [ @id @time @resubmitted ] //time,nodenum,ip,code,errmsg //time,nodenum,swapout,swapin class CSwapNode { protected: Linked environment; StringAttr clusterName; StringAttr groupName, spareGroupName; IPropertyTree *options; Owned group, spareGroup; bool checkIfNodeInUse(IpAddress &ip, bool includespares, StringBuffer &clustname) { SocketEndpoint ep(0,ip); if (RANK_NULL != group->rank(ep)) { clustname.append(groupName); return true; } else if (includespares) { if (RANK_NULL != spareGroup->rank(ep)) { clustname.append(groupName).append(" spares"); return true; } } return false; } IPropertyTree *getSwapNodeInfo(bool create) { Owned conn = querySDS().connect("/SwapNode", myProcessSession(), RTM_LOCK_WRITE|(create?RTM_CREATE_QUERY:0), 1000*60*5); if (!conn) { ERRLOG("SWAPNODE: could not connect to /SwapNode branch"); return NULL; } StringBuffer xpath; xpath.appendf("Thor[@group=\"%s\"]",groupName.get()); Owned info = conn->queryRoot()->getPropTree(xpath.str()); if (!info) { if (!create) { PROGLOG("SWAPNODE: no information for group %s",groupName.get()); return NULL; } info.set(conn->queryRoot()->addPropTree("Thor",createPTree("Thor"))); info->setProp("@group",groupName.get()); } return info.getClear(); } bool doSwap(const char *oldHost, const char *newHost) { Owned newNode = createINode(newHost); Owned oldNode = createINode(oldHost); if (!group->isMember(oldNode)) { ERRLOG("Node %s is not part of group %s", oldHost, groupName.get()); return false; } if (group->isMember(newNode)) { ERRLOG("Node %s is already part of group %s", newHost, groupName.get()); return false; } queryNamedGroupStore().swapNode(oldHost, newHost); return true; } bool doSingleSwapNode(const char *oldHost, const char *newHost, unsigned nodenum, IPropertyTree *info, const char *timechecked) { if (doSwap(oldHost, newHost)) { if (info) { StringBuffer times(timechecked); if (times.length()==0) { CDateTime dt; dt.setNow(); dt.getString(times); } // TBD tie up with bad node in auto? IPropertyTree *swap = info->addPropTree("Swap", createPTree("Swap")); swap->setProp("@inNetAddress", newHost); swap->setProp("@outNetAddress", oldHost); swap->setProp("@time", times.str()); if (UINT_MAX != nodenum) swap->setPropInt("@rank", nodenum-1); } return true; } return false; } void init() { StringBuffer xpath("Software/ThorCluster[@name=\""); xpath.append(clusterName).append("\"]"); Owned conn = querySDS().connect("/Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); environment.setown(createPTreeFromIPT(conn->queryRoot())); options = environment->queryPropTree(xpath.str()); if (!options) throwUnexpected(); groupName.set(options->queryProp("@nodeGroup")); if (groupName.isEmpty()) groupName.set(options->queryProp("@name")); VStringBuffer spareS("%s_spares", groupName.get()); spareGroupName.set(spareS); group.setown(queryNamedGroupStore().lookup(groupName)); spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName)); } public: CSwapNode(const char *_clusterName) :clusterName(_clusterName) { init(); } void swappedList(unsigned days, StringBuffer *out) { Owned info = getSwapNodeInfo(true); // should put out error if returns false if (!info.get()) return; CDateTime tt; CDateTime cutoff; if (days) { cutoff.setNow(); cutoff.adjustTime(-60*24*(int)days); } Owned it2 = info->getElements("Swap"); ForEach(*it2) { IPropertyTree &swappednode = it2->query(); const char *ts = swappednode.queryProp("@time"); if (!ts) continue; if (days) { tt.setString(ts); if (cutoff.compare(tt)>0) continue; } const char *ips = swappednode.queryProp("@outNetAddress"); if (!ips||!*ips) continue; IpAddress ip(ips); StringBuffer clustname; if (checkIfNodeInUse(ip,true,clustname)) continue; // ignore if (out) out->append(ips).append('\n'); else PROGLOG("%s",ips); } } void emailSwap(const char *msg, bool warn=false, bool sendswapped=false, bool sendhistory=false) { StringBuffer emailtarget; StringBuffer smtpserver; if (options->getProp("SwapNode/@EmailAddress",emailtarget)&&emailtarget.length()&&options->getProp("SwapNode/@EmailSMTPServer",smtpserver)&&smtpserver.length()) { const char * subject = options->queryProp("SwapNode/@EmailSubject"); if (!subject) subject = "SWAPNODE automated email"; StringBuffer msgs; if (!msg) { msgs.append("Swapnode command line, Cluster: "); msg = msgs.append(groupName).append('\n').str(); } CDateTime dt; dt.setNow(); StringBuffer out; dt.getString(out,true).append(": ").append(msg).append("\n\n"); if (options->getPropBool("SwapNode/@EmailSwappedList")||sendswapped) { out.append("Currently swapped out nodes:\n"); swappedList(0,&out); out.append('\n'); } if (options->getPropBool("SwapNode/@EmailHistory")||sendhistory) { out.append("Swap history:\n"); swapNodeHistory(0,&out); out.append('\n'); } SocketEndpoint ep(smtpserver.str(),25); StringBuffer sender("swapnode@"); queryHostIP().getIpText(sender); // add tbd StringBuffer ips; StringArray warnings; sendEmail(emailtarget.str(),subject,out.str(),ep.getIpText(ips).str(),ep.port,sender.str(),&warnings); ForEachItemIn(i,warnings) WARNLOG("SWAPNODE: %s",warnings.item(i)); } else if (warn) WARNLOG("Either SwapNode/@EmailAddress or SwapNode/@EmailSMTPServer not set in thor.xml"); } void swapNodeHistory(unsigned days,StringBuffer *out) { Owned info = getSwapNodeInfo(true); if (!info.get()) { if (out) out->append("No swapnode info\n"); else ERRLOG("No swapnode info"); return; } StringBuffer line; CDateTime tt; CDateTime cutoff; if (days) { cutoff.setNow(); cutoff.adjustTime(-60*24*(int)days); } unsigned i=0; if (out) out->append("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message\n------------------------------------------------------\n"); else { PROGLOG("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message"); PROGLOG("------------------------------------------------------"); } Owned it1 = info->getElements("BadNode"); ForEach(*it1) { IPropertyTree &badnode = it1->query(); const char *ts = badnode.queryProp("@time"); if (!ts) continue; if (days) { tt.setString(ts); if (cutoff.compare(tt)>0) continue; } line.clear().append(++i).append(", "); line.append(ts).append(", ").append(badnode.getPropInt("@rank",-1)+1).append(", "); badnode.getProp("@netAddress",line); line.append(", ").append(badnode.getPropInt("@code")).append(", \""); badnode.getProp(NULL,line); line.append('\"'); if (out) out->append(line).append('\n'); else PROGLOG("%s",line.str()); } if (out) out->append("\nSwapped, Time, NodeNum, OutIp, InIp\n-----------------------------------\n"); else { PROGLOG("%s", ""); PROGLOG("Swapped, Time, NodeNum, OutIp, InIp"); PROGLOG("-----------------------------------"); } i = 0; Owned it2 = info->getElements("Swap"); ForEach(*it2) { IPropertyTree &swappednode = it2->query(); const char *ts = swappednode.queryProp("@time"); if (!ts) continue; if (days) { tt.setString(ts); if (cutoff.compare(tt)>0) continue; } line.clear().append(++i).append(", "); swappednode.getProp("@time",line); line.append(", ").append(swappednode.getPropInt("@rank",-1)+1).append(", "); swappednode.getProp("@outNetAddress",line); line.append(", "); swappednode.getProp("@inNetAddress",line); if (out) out->append(line.str()).append('\n'); else PROGLOG("%s",line.str()); } } bool checkThorNodeSwap(const char *failedwuid, unsigned mininterval) { bool ret = false; if (mininterval==(unsigned)-1) { // called by thor mininterval = 0; if (!options||!options->getPropBool("SwapNode/@autoSwapNode")) return false; if ((!failedwuid||!*failedwuid)&&!options->getPropBool("SwapNode/@checkAfterEveryJob")) return false; } try { Owned info = getSwapNodeInfo(true); if (info.get()) { PROGLOG("checkNodeSwap started"); StringBuffer xpath; CDateTime dt; StringBuffer ts; // see if done less than mininterval ago if (mininterval) { dt.setNow(); dt.adjustTime(-((int)mininterval)); if (info->getProp("@timeChecked",ts)) { CDateTime dtc; dtc.setString(ts.str()); if (dtc.compare(dt,false)>0) { PROGLOG("checkNodeSwap using cached validate from %s",ts.str()); xpath.clear().appendf("BadNode[@timeChecked=\"%s\"]",ts.str()); return info->hasProp(xpath.str()); } } } Owned grp = queryNamedGroupStore().lookup(groupName); if (!grp) PROGLOG("%s group doesn't exist", groupName.get()); else { SocketEndpointArray epa; grp->getSocketEndpoints(epa); ForEachItemIn(i1,epa) { epa.element(i1).port = getDaliServixPort(); } SocketEndpointArray failures; UnsignedArray failedcodes; StringArray failedmessages; unsigned start = msTick(); const char *thorname = options->queryProp("@name"); StringBuffer dataDir, mirrorDir; getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check validateNodes(epa,dataDir.str(),mirrorDir.str(),false,failures,failedcodes,failedmessages); dt.setNow(); dt.getString(ts.clear()); ForEachItemIn(i,failures) { SocketEndpoint ep(failures.item(i)); ep.port = 0; StringBuffer ips; ep.getIpText(ips); int r = (int)grp->rank(ep); if (r<0) { // shouldn't occur ERRLOG("SWAPNODE node %s not found in group %s",ips.str(),groupName.get()); continue; } PROGLOG("CheckSwapNode FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i)); // SNMP TBD? ret = true; xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str()); IPropertyTree *bnt = info->queryPropTree(xpath.str()); if (!bnt) { bnt = info->addPropTree("BadNode",createPTree("BadNode")); bnt->setProp("@netAddress",ips.str()); } bnt->setPropInt("@numTimes",bnt->getPropInt("@numTimes",0)+1); bnt->setProp("@timeChecked",ts.str()); bnt->setProp("@time",ts.str()); bnt->setPropInt("@code",failedcodes.item(i)); bnt->setPropInt("@rank",r); bnt->setProp(NULL,failedmessages.item(i)); } if (failedwuid&&*failedwuid) { xpath.clear().appendf("WorkUnit[@id=\"%s\"]",failedwuid); IPropertyTree *wut = info->queryPropTree(xpath.str()); if (!wut) { wut = info->addPropTree("WorkUnit",createPTree("WorkUnit")); wut->setProp("@id",failedwuid); } wut->setProp("@time",ts.str()); } PROGLOG("checkNodeSwap: Time taken = %dms",msTick()-start); info->setProp("@timeChecked",ts.str()); } } } catch (IException *e) { EXCLOG(e,"checkNodeSwap"); } return ret; } }; void swappedList(const char *clusterName, unsigned days, StringBuffer *out) { CSwapNode swapNode(clusterName); swapNode.swappedList(days, out); } void emailSwap(const char *clusterName, const char *msg, bool warn, bool sendswapped, bool sendhistory) { CSwapNode swapNode(clusterName); swapNode.emailSwap(msg, warn, sendswapped, sendhistory); } void swapNodeHistory(const char *clusterName, unsigned days, StringBuffer *out) { CSwapNode swapNode(clusterName); swapNode.swapNodeHistory(days, out); } bool checkThorNodeSwap(const char *clusterName, const char *failedwuid, unsigned mininterval) { CSwapNode swapNode(clusterName); return swapNode.checkThorNodeSwap(failedwuid, mininterval); } class CSingleSwapNode : public CSwapNode { public: CSingleSwapNode(const char *clusterName) : CSwapNode(clusterName) { } bool swap(const char *oldHost, const char *newHost) { ensureThorIsDown(clusterName,false,false); Owned info = getSwapNodeInfo(true); if (!doSingleSwapNode(oldHost, newHost, UINT_MAX, info, NULL)) return false; // check to see if it was a spare and remove SocketEndpoint spareEp(newHost); if (spareGroup) { rank_t r = spareGroup->rank(spareEp); if (RANK_NULL != r) { PROGLOG("Removing spare : %s", newHost); queryNamedGroupStore().removeNode(spareGroupName, newHost); } } info.clear(); PROGLOG("SwapNode finished"); return true; } }; bool swapNode(const char *cluster, const char *oldHost, const char *newHost) { PROGLOG("SWAPNODE(%s,%s,%s) starting",cluster, oldHost, newHost); CSingleSwapNode swapNode(cluster); return swapNode.swap(oldHost, newHost); } class CAutoSwapNode : public CSwapNode { bool doAutoSwapNode(bool dryRun=false) { if (!checkThorNodeSwap(NULL,dryRun?0:5)) { PROGLOG("No bad nodes detected"); PROGLOG("SWAPNODE(auto) exiting"); return false; } Owned info = getSwapNodeInfo(false); if (!info.get()) { // should put out error if returns false PROGLOG("SWAPNODE(auto) exiting"); return false; } StringBuffer ts; if (!info->getProp("@timeChecked",ts)) { PROGLOG("SWAPNODE(auto): no check information generated"); return false; } // enumerate bad nodes StringBuffer xpath; xpath.appendf("BadNode[@time=\"%s\"]",ts.str()); Owned it = info->getElements(xpath.str()); SocketEndpointArray epa1; ForEach(*it) { IPropertyTree &badnode = it->query(); const char *ip = badnode.queryProp("@netAddress"); if (!ip) continue; SocketEndpoint ep(ip); ep.port = getDaliServixPort(); epa1.append(ep); } // recheck SocketEndpointArray badepa; UnsignedArray failedcodes; StringArray failedmessages; unsigned start = msTick(); const char *thorname = options->queryProp("@name"); StringBuffer dataDir, mirrorDir; if (options->getPropBool("SwapNode/@swapNodeCheckPrimaryDrive",true)) getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"data","thor",thorname,dataDir); // if not defined can't check if (options->getPropBool("SwapNode/@swapNodeCheckMirrorDrive",true)) getConfigurationDirectory(environment->queryPropTree("Software/Directories"),"mirror","thor",thorname,mirrorDir); // if not defined can't check validateNodes(epa1, dataDir.str(), mirrorDir.str(), false, badepa, failedcodes, failedmessages); if (!badepa.ordinality()) { PROGLOG("SWAPNODE: on recheck all bad nodes passed (%s,%s)",groupName.get(),ts.str()); return false; } Owned grp = queryNamedGroupStore().lookup(groupName); CDateTime dt; dt.setNow(); dt.getString(ts.clear()); bool abort=false; UnsignedArray badrank; ForEachItemIn(i1,badepa) { SocketEndpoint ep(badepa.item(i1)); ep.port = 0; // should be no ports in group StringBuffer ips; ep.getIpText(ips); xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str()); IPropertyTree *bnt = info->queryPropTree(xpath.str()); if (!bnt) { ERRLOG("SWAPNODE node %s not found in swapnode info!",ips.str()); return false; } bnt->setProp("@time",ts.str()); int r = bnt->getPropInt("@rank",-1); if ((int)r<0) { // shouldn't occur ERRLOG("SWAPNODE node %s rank not found in group %s",ips.str(),groupName.get()); return false; } badrank.append((unsigned)r); for (unsigned j1=0;j1ordinality())|| (r1==(r+1)%grp->ordinality())) { StringBuffer ips1; ep1.getIpText(ips1); ERRLOG("SWAPNODE adjacent nodes %d (%s) and %d (%s) are bad!",r+1,ips.str(),r1+1,ips1.str()); abort = true; } } } // now see if any of bad nodes have been swapped out recently CDateTime recent = dt; int snint = options->getPropInt("SwapNode/@swapNodeInterval",24); recent.adjustTime(-60*snint); it.setown(info->getElements("Swap")); ForEach(*it) { IPropertyTree &swappednode = it->query(); CDateTime dt1; const char *dt1s = swappednode.queryProp("@time"); if (!dt1s||!*dt1s) continue; dt1.setString(dt1s); if (dt1.compare(recent)<0) continue; const char *ips = swappednode.queryProp("@outNetAddress"); if (!ips||!*ips) continue; int r1 = swappednode.getPropInt("@rank",-1); SocketEndpoint swappedep(ips); swappedep.port = 0; ForEachItemIn(i2,badepa) { SocketEndpoint badep(badepa.item(i2)); int badr = (int)badrank.item(i2); badep.port = 0; if (swappedep.equals(badep)) { // not sure if *really* want this ERRLOG("Node %d (%s) was swapped out on %s (too recent)",badr+1,ips,dt1s); abort = true; } else if ((badr==(r1+1)%grp->ordinality())|| (r1==(badr+1)%grp->ordinality())) { StringBuffer bs; ERRLOG("SWAPNODE adjacent node to bad node %d (%s), %d (%s) was swapped on %s (too recent) !",badr+1,badep.getIpText(bs).str(),r1+1,ips,dt1s); abort = true; } } } const char *intent = dryRun?"would":"will"; // find spares SocketEndpointArray spareepa; StringArray swapfrom; StringArray swapto; Owned spareGroup; if (!abort) { spareGroup.setown(queryNamedGroupStore().lookup(spareGroupName)); if (!spareGroup) { ERRLOG("SWAPNODE could not find spare group %s", spareGroupName.get()); abort = true; } else { spareGroup->getSocketEndpoints(spareepa); ForEachItemIn(i3,badepa) { StringBuffer from; badepa.item(i3).getIpText(from); if (i3rank(spareEp); dbgassertex(RANK_NULL != r); spareGroup.setown(spareGroup->remove(r)); queryNamedGroupStore().removeNode(spareGroupName, to); Owned info = getSwapNodeInfo(false); if (doSingleSwapNode(from.str(),to.str(),badrank.item(i4)+1,info,ts.str())) { StringBuffer msg; msg.appendf("AUTOSWAPNODE: cluster %s node %d: swapped out %s, swapped in %s",groupName.get(),badrank.item(i4)+1,from.str(),to.str()); emailSwap(msg.str()); FLLOG(MCoperatorError, swapnodeJob, "%s", msg.str()); } } return true; } void autoRestart() { // restarts any workunits that failed near to swap // let see if need resubmit any nodes StringArray toresubmit; if (options->getPropBool("SwapNode/@swapNodeRestartJob")) { Owned info = getSwapNodeInfo(false); // should put out error if returns false if (!info.get()) { PROGLOG("SWAPNODE(autoRestart) exiting"); return; } CDateTime recent; recent.setNow(); recent.adjustTime(-SWAPNODE_RETRY_TIME/(1000*60)); Owned it = info->getElements("WorkUnit"); ForEach(*it) { IPropertyTree &wu = it->query(); const char *wuid = wu.queryProp("@id"); if (!wuid) continue; if (!wu.getPropBool("@resubmitted")) { // see if any swaps recently done const char *dt1s = wu.queryProp("@time"); if (!dt1s||!*dt1s) continue; CDateTime dt1; dt1.setString(dt1s); dt1.adjustTime(SWAPNODE_RETRY_TIME/(1000*60)); Owned swit = info->getElements("Swap"); ForEach(*swit) { IPropertyTree &swap = swit->query(); const char *dt2s = swap.queryProp("@time"); if (!dt2s||!*dt2s) continue; CDateTime dt2; dt2.setString(dt2s); if ((dt2.compare(recent)>0)&&(dt1.compare(dt2)>0)) { wu.setPropBool("@resubmitted",true); // only one attempt toresubmit.append(wuid); break; } } } } } ForEachItemIn(ir,toresubmit) { WuResubmit(toresubmit.item(ir)); } } public: CAutoSwapNode(const char *clusterpName) : CSwapNode(clusterpName) { } public: bool swap(bool dryRun) { PROGLOG("SWAPNODE(auto%s) starting",dryRun?",dryRun":""); if (!doAutoSwapNode(dryRun)) // using info in Dali (spares etc.) return false; autoRestart(); PROGLOG("AutoSwapNode finished"); return true; } }; bool autoSwapNode(const char *groupName, bool dryRun) { CAutoSwapNode swapNode(groupName); return swapNode.swap(dryRun); }