/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include "thirdparty.h"
#include "jlib.hpp"
#include "jfile.hpp"
#include "jptree.hpp"
#include "jprop.hpp"
#include "jmisc.hpp"
#include "mpbase.hpp"
#include "daclient.hpp"
#include "dadfs.hpp"
#include "dafdesc.hpp"
#include "dasds.hpp"
#include "danqs.hpp"
#include "dalienv.hpp"
#include "rmtfile.hpp"
#include "rmtsmtp.hpp"
#ifndef _ESP
#include "dautils.hpp"
#include "workunit.hpp"
#else
#include "swapnodemain.hpp"
#endif
#define SDS_LOCK_TIMEOUT 30000
#define SWAPNODE_RETRY_TIME (1000*60*60*1) // 1hr
#ifdef _DEBUG
#ifdef NIGEL_TESTING
#define FILES_WRITE_PREFIX "test_"
#else
#define FILES_WRITE_PREFIX ""
#endif
#else
#define FILES_WRITE_PREFIX ""
#endif
#ifndef _ESP
static void doAutoSwapNode(IRemoteConnection *connEnv,IRemoteConnection *connFiles,IPropertyTree *options,bool doswap);
static const LogMsgJobInfo swapnodeJob(UnknownJob, UnknownUser);
static void autoRestart(IPropertyTree *options);
#endif
static IRemoteConnection* GetRemoteLock(const char* path, unsigned int mode, bool nonfatalinuse)
{
// this code not nice - could do with rewrite when time permits!
IRemoteConnection* pRemoteConnection = NULL;
try
{
PROGLOG("Getting a lock on %s ...", path);
pRemoteConnection = querySDS().connect(path, myProcessSession(), mode, SDS_LOCK_TIMEOUT);
}
catch (IException* e)
{
StringBuffer sErrMsg;
e->errorMessage(sErrMsg);
e->Release();
/*typical error message when lock fails is as follows:
SDS: Lock timeout
SDS Reply Error : SDS: Lock timeout
Failed to establish lock to NewEnvironment/
Existing lock status: Locks on path: /NewEnvironment/
Endpoint |SessionId |ConnectionId |mode
172.16.48.175:7254 |c00000038 |c0000003b |653
*/
const char* pattern = "Failed to establish lock to ";
const char* match = strstr(sErrMsg.str(), pattern);
if (match)
{
match += strlen(pattern);
const char* eol = strchr(match, '\n');
StringBuffer path;
path.append(eol - match - 1, match);
//if we can extract IP address of computer holding the lock then
//show a customized message.
//
//Retrieve IP address of computer holding the lock...
char achHost[128] = "";
const char* p = strstr(sErrMsg.str(), "\n\n");
if (p && *(p+=2))
{
const char* q = strchr(p, ':');
if (q)
{
const int len = q-p;
strncpy(achHost, p, len);
achHost[len] = '\0';
}
}
StringBuffer sMsg;
sMsg.appendf("Failed to get a lock on /%s", path.str());
if (achHost[0])
sMsg.appendf(" because it is locked by computer %s.", achHost);
else
sMsg.append(":\n\n").append(sErrMsg);
if (nonfatalinuse) {
WARNLOG("%s",sMsg.str());
return NULL;
}
throw ::MakeStringException(-1, "%s", sMsg.str());
}
else
throw ::MakeStringException(-1, "%s", sErrMsg.str());
}
return pRemoteConnection;
}
static bool ensureThorIsDown(const char* cluster, bool nofail, bool wait)
{
bool retry = false;
do {
Owned pStatus = querySDS().connect("/Status/Servers", myProcessSession(), RTM_NONE, SDS_LOCK_TIMEOUT);
Owned it = pStatus->queryRoot()->getElements("Server[@name='ThorMaster']");
retry = false;
ForEach(*it) {
IPropertyTree* pServer = &it->query();
if (pServer->hasProp("@cluster") && !strcmp(pServer->queryProp("@cluster"), cluster)) {
if (nofail) {
WARNLOG("A Thor on cluster %s is still active", cluster);
if (!wait)
return false;
Sleep(1000*10);
PROGLOG("Retrying...");
retry = true;
break;
}
throw MakeStringException(-1, "A Thor cluster node swap requires the cluster to be offline. Please stop the Thor cluster '%s' and try again.", cluster);
}
}
} while (retry);
return true;
}
static bool doEnvironment(IPropertyTree* root, const char* clustername, const char* oldip, const char* newip, unsigned nodenum, bool& bNewMachineIsLinux)
{
IPropertyTree* pHardware = root->queryPropTree("Hardware");
IPropertyTree* oldmachine = pHardware->queryPropTree(StringBuffer("Computer[@netAddress=\"").append(oldip).append("\"]").str());
if(!oldmachine) {
ERRLOG("Could not find computer with ip=%s", oldip);
return false;
}
IPropertyTree* newmachine = pHardware->queryPropTree(StringBuffer("Computer[@netAddress=\"").append(newip).append("\"]").str());
if(!newmachine) {
ERRLOG("Could not find computer with ip=%s", newip);
return false;
}
StringBuffer xpath;
//determine if the new machine is linux so we can properly update its slaves file
const char* newMachineType = newmachine->queryProp("@computerType");
xpath.clear().appendf("ComputerType[@name='%s']", newMachineType);
IPropertyTree* newMachineTypeNode = pHardware->queryPropTree( xpath.str() );
if (!newMachineTypeNode) {
ERRLOG("computer type '%s' of the new slave is not defined!", newMachineType);
return false;
}
const char* os = newMachineTypeNode->queryProp("@opSys");
bNewMachineIsLinux = os && !strcmp(os, "linux");
DBGLOG("NewMachine=%s (%s)",newmachine->queryProp("@name"),bNewMachineIsLinux?"linux":"windows");
DBGLOG("OldMachine=%s",oldmachine->queryProp("@name"));
IPropertyTree* pSoftware = root->queryPropTree("Software");
if(!pSoftware) {
ERRLOG("Could not find /Software!");
return false;
}
// look for all ThorCluster entries with correct nodegroup (needed for multithor)
Owned clusters = pSoftware->getElements("ThorCluster");
ForEach(*clusters) {
IPropertyTree &cluster = clusters->query();
const char *groupname = cluster.queryProp("@nodeGroup");
if (!groupname||!*groupname)
groupname = cluster.queryProp("@name");
if (strcmp(groupname,clustername)!=0)
continue;
xpath.clear().appendf("ThorSlaveProcess[@computer='%s']", oldmachine->queryProp("@name"));
IPropertyTree* slave = cluster.queryPropTree(xpath.str());
if(!slave) {
ERRLOG("Could not find slave %s in thor %s", oldmachine->queryProp("@name"), cluster.queryProp("@name"));
return false;
}
xpath.clear().appendf("ThorSlaveProcess[@computer='%s']", newmachine->queryProp("@name"));
if (cluster.queryPropTree(xpath.str())) {
ERRLOG("This would duplicate slave %s in thor %s", oldmachine->queryProp("@name"), cluster.queryProp("@name"));
return false;
}
StringBuffer sn;
if (nodenum!=0) {
sn.append('s').append(nodenum);
if (strcmp(slave->queryProp("@name"),sn.str())!=0) {
ERRLOG("Incorrect slave number %d for slave %s(%s) in thor %s",
nodenum,oldmachine->queryProp("@name"), slave->queryProp("@name"), cluster.queryProp("@name"));
return false;
}
}
xpath.clear().appendf("ThorSpareProcess[@computer='%s']", newmachine->queryProp("@name"));
IPropertyTree* spare = cluster.queryPropTree(xpath.str());
if(spare) {
const char *state = slave->queryProp("@state"); // not sure if anyone actuall sets this but bwd compat.
DBGLOG("Removing Spare:%s%s%s",slave->queryProp("@name"),state?" with status: ":"",state?state:"");
cluster.removeTree(spare);
}
slave->setProp("@computer",newmachine->queryProp("@name"));
newmachine->setProp("@state","Unavailable");
}
return true;
}
static bool resolveComputerName(IPropertyTree *rootEnv,const char *name,IpAddress &ip)
{
StringBuffer query;
query.appendf("Hardware/Computer[@name=\"%s\"]",name);
Owned machine = rootEnv->getPropTree(query.str());
const char *node = machine?machine->queryProp("@netAddress"):NULL;
if (!node||!*node)
false;
ip.ipset(node);
return true;
}
class CfixDaliDFS
{
void writeSlavesFile(IPropertyTree* rootEnv, const char *newep, IPropertyTree& cluster, bool bNewMachineIsLinux)
{
Owned newnode = createINode(newep);
const char *groupname = cluster.queryProp("@nodeGroup");
if (!groupname||!*groupname)
groupname = cluster.queryProp("@name");
Owned grp = queryNamedGroupStore().lookup(groupname);
if (!grp) {
ERRLOG("writeSlavesFile: group not found for cluster %s",groupname);
return;
}
if (!grp->isMember(newnode))
return;
PROGLOG("Writing slaves file for cluster %s",groupname);
const char *computer = cluster.queryProp("@computer");
if (!computer||!*computer) {
ERRLOG("writeSlavesFile: cluster has no computer specified");
return;
}
const char *dir = cluster.queryProp("@directory");
if (!dir||!*dir) {
ERRLOG("writeSlavesFile: cluster has no directory specified");
return;
}
IpAddress masterip;
if (!resolveComputerName(rootEnv,computer,masterip)) {
ERRLOG("writeSlavesFile: cannot resolve thor master at %s",computer);
return;
}
char sep = bNewMachineIsLinux?'/':'\\';
StringBuffer filename;
filename.append(sep).append(sep);
masterip.getIpText(filename);
if (dir&&*dir&&!isPathSepChar(*dir))
filename.append(sep);
while (dir&&*dir) {
if (isPathSepChar(*dir))
filename.append(sep);
else
filename.append(*dir);
dir++;
}
addPathSepChar(filename,sep);
size32_t dirsz = filename.length();
filename.append(FILES_WRITE_PREFIX "slaves");
StringBuffer str;
ForEachNodeInGroup(r,*grp) {
grp->queryNode(r).endpoint().getUrlStr(str);
if (!bNewMachineIsLinux)
str.append('\r'); // not sure a good idea but consistent with deploy engine
str.append('\n');
}
PROGLOG("Writing slaves to %s",filename.str());
Owned outfile = createIFile(filename.str());
Owned outfileio = outfile->open(IFOcreate);
if (!outfileio)
throw MakeStringException (-1,"Cannot create slaves file %s",filename.str());
outfileio->write(0,str.length(),str.str());
outfileio.clear();
outfile.clear();
str.clear();
Owned spares = cluster.getElements("ThorSpareProcess");
ForEach(*spares) {
computer = spares->query().queryProp("@computer");
if (!computer||!*computer) {
WARNLOG("writeSlavesFile: spare has no computer specified");
continue;
}
IpAddress nodeip;
if (!resolveComputerName(rootEnv,computer,nodeip)) {
WARNLOG("writeSlavesFile: cannot resolve spare at %s",computer);
str.append(computer);
}
else
nodeip.getIpText(str);
if (!bNewMachineIsLinux)
str.append('\r'); // not sure a good idea but consistent with deploy engine
str.append('\n');
}
filename.setLength(dirsz);
filename.append(FILES_WRITE_PREFIX "spares");
PROGLOG("Writing spares to %s",filename.str());
outfile.setown(createIFile(filename.str()));
outfileio.setown(outfile->open(IFOcreate));
if (!outfileio)
throw MakeStringException (-1,"Cannot create spares file %s",filename.str());
outfileio->write(0,str.length(),str.str());
}
public:
void doThorSlavesFiles(const char *newip, IPropertyTree* rootEnv, bool bNewMachineIsLinux)
{ // recreates DFS Groups (bit over the top for this usage, but effective)
Owned clusters= rootEnv->getElements("Software/ThorCluster");
ForEach(*clusters) {
IPropertyTree &cluster = clusters->query();
writeSlavesFile(rootEnv, newip, cluster, bNewMachineIsLinux);
}
}
bool doFiles(IPropertyTree* filesRoot, const char* thor, const char* oldip, const char* newip,unsigned partno)
{
class cfilescan
{
void processScopes(IPropertyTree &root,StringBuffer &name)
{
size32_t ns = name.length();
if (ns)
name.append("::");
size32_t ns2 = name.length();
Owned iter = root.getElements("Scope");
if (iter->first()) {
do {
IPropertyTree &scope = iter->query();
name.append(scope.queryProp("@name"));
processScopes(scope,name);
name.setLength(ns2);
} while (iter->next());
}
processFiles(root,name);
name.setLength(ns);
}
void processFiles(IPropertyTree &root,StringBuffer &name)
{
size32_t ns = name.length();
Owned iter = root.getElements("File");
if (iter->first()) {
do {
IPropertyTree &file = iter->query();
name.append(file.queryProp("@name"));
processFile(file,name);
name.setLength(ns);
} while (iter->next());
}
}
void processFile(IPropertyTree &file,StringBuffer &name)
{
Owned iter = file.getElements(frompart.str());
if (iter->first())
{
loop
{
IPropertyTree &item = iter->query();
if (!partno || item.getPropInt("@num",0)==partno) {
PROGLOG("Processing file %s",name.str());
item.setProp("@node",to);
}
else
WARNLOG("ignoring node on file %s parts don't match (%d,%d)",name.str(),item.getPropInt("@num",0),partno);
if (!iter->next())
break;
}
}
}
public:
void scan(IPropertyTree *sroot)
{
StringBuffer name;
processScopes(*sroot,name);
}
StringBuffer frompart;
const char* to;
unsigned partno;
} filescan;
filescan.frompart.append("Part[@node=\"").append(oldip).append("\"]");
filescan.to = newip;
filescan.partno = partno;
filescan.scan(filesRoot);
return true;
}
};
static bool doSingleSwapNode(IRemoteConnection *connEnv,IRemoteConnection *connFiles,const char* cluster,const char* oldip,const char* newip,unsigned nodenum,IPropertyTree *info,const char *timechecked)
{
IPropertyTree* rootEnv = connEnv->queryRoot();
IPropertyTree* rootFiles = connFiles->queryRoot();
bool bNewMachineIsLinux;
if (doEnvironment(rootEnv, cluster,oldip,newip,nodenum, bNewMachineIsLinux)) {
CfixDaliDFS fixdfs;
fixdfs.doFiles(rootFiles, cluster,oldip,newip,nodenum);
// no turning back now
connEnv->commit();
connFiles->commit();
SocketEndpoint ipfrom(oldip);
SocketEndpoint ipto(newip);
queryNamedGroupStore().swapNode(ipfrom,ipto);
fixdfs.doThorSlavesFiles(newip,connEnv->queryRoot(), bNewMachineIsLinux); // must be done after doEnvironment
if (info) {
StringBuffer times(timechecked);
if (times.length()==0) {
CDateTime dt;
dt.setNow();
dt.getString(times);
}
StringBuffer xpath;
// TBD tie up with bad node in auto?
IPropertyTree *swap = info->addPropTree("Swap",createPTree("Swap"));
swap->setProp("@inNetAddress",newip);
swap->setProp("@outNetAddress",oldip);
swap->setProp("@time",times.str());
swap->setPropInt("@rank",nodenum-1);
}
return true;
}
return false;
}
static bool doSwapNode(IPropertyTree *options,bool doswap,const char* cluster,const char* oldip,const char* newip,unsigned nodenum, bool nofail)
{
Owned connNewEnv; // only used as lock (apparently)
Owned connEnv;
Owned connFiles;
try {
const unsigned int mode = RTM_CREATE | RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT;
const unsigned int mode2 = RTM_LOCK_READ; // only lock for read as NewEnvironment will protect against configenv
connNewEnv.setown(GetRemoteLock("/NewEnvironment",mode,nofail));
if (!connNewEnv)
return false;
connEnv.setown(GetRemoteLock("/Environment",mode2,nofail));
if (!connEnv)
return false;
if (doswap) {
connFiles.setown(GetRemoteLock("/Files",mode2, nofail));
if (!connFiles)
return false;
}
#ifndef _ESP
if (options) {
doAutoSwapNode(connEnv,connFiles,options,doswap);
autoRestart(options);
}
else
#endif
{
ensureThorIsDown(cluster,false,false);
Owned info;
#ifndef _ESP
Owned opt = createPTree(ipt_caseInsensitive);
opt->setProp("@nodeGroup",cluster);
Owned grp;
Owned connSwapNode;
StringAttr grpname;
getSwapNodeInfo(opt,grpname,grp,connSwapNode,info,true);
#endif
doSingleSwapNode(connEnv,connFiles,cluster,oldip,newip,nodenum,info,NULL);
}
}
catch (IException *) {
if (connEnv)
connEnv->rollback();
if (connFiles)
connFiles->rollback();
throw;
}
PROGLOG("SwapNode finished");
return true;
}
void SwapNode(const char* cluster,const char* oldip,const char* newip,unsigned nodenum)
{
PROGLOG("SWAPNODE(%s,%s,%s,%d) starting",cluster,oldip,newip,nodenum);
doSwapNode(NULL,true,cluster,oldip,newip,nodenum,false);
}
#ifndef _ESP
bool WuResubmit(const char *wuid)
{
Owned factory = getWorkUnitFactory();
Owned wu = factory->updateWorkUnit(wuid);
if (!wu) {
ERRLOG("WuResubmit(%s): could not find workunit",wuid);
return false;
}
if (wu->getState()!=WUStateFailed) {
SCMStringBuffer state;
wu->getStateDesc(state);
ERRLOG("WuResubmit(%s): could not resubmit as workunit state is '%s'",wuid,state.str());
return false;
}
SCMStringBuffer token;
wu->getSecurityToken(token);
SCMStringBuffer user;
SCMStringBuffer password;
extractToken(token.str(), wuid, user, password);
wu->resetWorkflow();
wu->setState(WUStateSubmitted);
wu->commit();
wu.clear();
submitWorkUnit(wuid,user.str(),password.str());
PROGLOG("WuResubmit(%s): resubmitted",wuid);
return true;
}
void swapNodeHistory(IPropertyTree *options,unsigned days,StringBuffer *out)
{
Owned grp;
Owned connSwapNode;
Owned info;
StringAttr grpname;
if (!getSwapNodeInfo(options,grpname,grp,connSwapNode,info,true)) {
if (out)
out->append("No swapnode info\n");
else
ERRLOG("No swapnode info");
return;
}
StringBuffer line;
CDateTime tt;
CDateTime cutoff;
if (days) {
cutoff.setNow();
cutoff.adjustTime(-60*24*(int)days);
}
unsigned i=0;
if (out)
out->append("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message\n------------------------------------------------------\n");
else {
PROGLOG("Failure, Time, NodeNum, NodeIp, ErrCode, Error Message");
PROGLOG("------------------------------------------------------");
}
Owned it1 = info->getElements("BadNode");
ForEach(*it1) {
IPropertyTree &badnode = it1->query();
const char *ts = badnode.queryProp("@time");
if (!ts)
continue;
if (days) {
tt.setString(ts);
if (cutoff.compare(tt)>0)
continue;
}
line.clear().append(++i).append(", ");
line.append(ts).append(", ").append(badnode.getPropInt("@rank",-1)+1).append(", ");
badnode.getProp("@netAddress",line);
line.append(", ").append(badnode.getPropInt("@code")).append(", \"");
badnode.getProp(NULL,line);
line.append('\"');
if (out)
out->append(line).append('\n');
else
PROGLOG("%s",line.str());
}
if (out)
out->append("\nSwapped, Time, NodeNum, OutIp, InIp\n-----------------------------------\n");
else {
PROGLOG("%s", "");
PROGLOG("Swapped, Time, NodeNum, OutIp, InIp");
PROGLOG("-----------------------------------");
}
i = 0;
Owned it2 = info->getElements("Swap");
ForEach(*it2) {
IPropertyTree &swappednode = it2->query();
const char *ts = swappednode.queryProp("@time");
if (!ts)
continue;
if (days) {
tt.setString(ts);
if (cutoff.compare(tt)>0)
continue;
}
line.clear().append(++i).append(", ");
swappednode.getProp("@time",line);
line.append(", ").append(swappednode.getPropInt("@rank",-1)+1).append(", ");
swappednode.getProp("@outNetAddress",line);
line.append(", ");
swappednode.getProp("@inNetAddress",line);
if (out)
out->append(line.str()).append('\n');
else
PROGLOG("%s",line.str());
}
}
bool checkIfNodeInUse(IPropertyTree *root, IpAddress &ip, bool includespares, StringBuffer &clustname)
{
SocketEndpoint ep(0,ip);
IPropertyTree* pSoftware = root->queryPropTree("Software");
if(!pSoftware)
throw MakeStringException(-1,"Could not find /Environment/Software!");
// look for all ThorCluster entries with correct nodegroup (needed for multithor)
StringBuffer endpoint;
Owned clusters = pSoftware->getElements("ThorCluster");
StringBuffer xpath;
ForEach(*clusters) {
IPropertyTree &cluster = clusters->query();
const char *groupname = cluster.queryProp("@nodeGroup");
if (!groupname||!*groupname)
groupname = cluster.queryProp("@name");
Owned grp = queryNamedGroupStore().lookup(groupname);
if (!grp) {
ERRLOG("writeSlavesFile: group not found for cluster %s",groupname);
continue;
}
if ((int)grp->rank(ep)>=0) {
clustname.append(groupname);
return true;
}
if (!includespares)
continue;
Owned spares = cluster.getElements("ThorSpareProcess");
ForEach(*spares) {
const char *computer = spares->query().queryProp("@computer");
if (!computer||!*computer) {
WARNLOG("checkIfNodeInUse: spare has no computer specified");
continue;
}
IpAddress nodeip;
if (!resolveComputerName(root,computer,nodeip)) {
WARNLOG("checkIfNodeInUse: cannot resolve spare at %s",computer);
continue;
}
if (nodeip.ipequals(ip)) {
clustname.append(groupname).append(" spares");
return true;
}
}
}
return false;
}
void swappedList(IPropertyTree *options,unsigned days, StringBuffer *out)
{
Owned conn = querySDS().connect("/Environment", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
if (!conn)
return;
Owned grp;
Owned connSwapNode;
Owned info;
StringAttr grpname;
if (!getSwapNodeInfo(options,grpname,grp,connSwapNode,info,true)) { // should put out error if returns false
return;
}
CDateTime tt;
CDateTime cutoff;
if (days) {
cutoff.setNow();
cutoff.adjustTime(-60*24*(int)days);
}
Owned it2 = info->getElements("Swap");
ForEach(*it2) {
IPropertyTree &swappednode = it2->query();
const char *ts = swappednode.queryProp("@time");
if (!ts)
continue;
if (days) {
tt.setString(ts);
if (cutoff.compare(tt)>0)
continue;
}
const char *ips = swappednode.queryProp("@outNetAddress");
if (!ips||!*ips)
continue;
IpAddress ip(ips);
StringBuffer clustname;
if (checkIfNodeInUse(conn->queryRoot(),ip,true,clustname))
continue; // ignore
if (out)
out->append(ips).append('\n');
else
PROGLOG("%s",ips);
}
}
void EmailSwap(IPropertyTree *options, const char *msg, bool warn=false, bool sendswapped=false, bool sendhistory=false)
{
StringBuffer emailtarget;
StringBuffer smtpserver;
if (options->getProp("SwapNode/@EmailAddress",emailtarget)&&emailtarget.length()&&options->getProp("SwapNode/@EmailSMTPServer",smtpserver)&&smtpserver.length()) {
const char * subject = options->queryProp("SwapNode/@EmailSubject");
if (!subject)
subject = "SWAPNODE automated email";
StringBuffer msgs;
if (!msg) {
StringAttr grpname;
grpname.set(options->queryProp("@nodeGroup"));
if (grpname.isEmpty())
grpname.set(options->queryProp("@name"));
msgs.append("Swapnode command line, Cluster: ");
msg = msgs.append(grpname).append('\n').str();
}
CDateTime dt;
dt.setNow();
StringBuffer out;
dt.getString(out,true).append(": ").append(msg).append("\n\n");
if (options->getPropBool("SwapNode/@EmailSwappedList")||sendswapped) {
out.append("Currently swapped out nodes:\n");
swappedList(options,0,&out);
out.append('\n');
}
if (options->getPropBool("SwapNode/@EmailHistory")||sendhistory) {
out.append("Swap history:\n");
swapNodeHistory(options,0,&out);
out.append('\n');
}
SocketEndpoint ep(smtpserver.str(),25);
StringBuffer sender("swapnode@");
queryHostIP().getIpText(sender);
// add tbd
StringBuffer ips;
StringArray warnings;
sendEmail(emailtarget.str(),subject,out.str(),ep.getIpText(ips).str(),ep.port,sender.str(),&warnings);
ForEachItemIn(i,warnings)
WARNLOG("SWAPNODE: %s",warnings.item(i));
}
else if (warn)
WARNLOG("Either SwapNode/@EmailAddress or SwapNode/@EmailSMTPServer not set in thor.xml");
}
// SwapNode info
//
// SwapNode/
// Thor [ @group, @timeChecked ]
// BadNode [ @netAddress, @timeChecked, @time, @numTimes, @code, @rank, @ (msg)
// Swap [ @inNetAddress, @outNetAddress, @time, @rank]
// WorkUnit [ @id @time @resubmitted ]
//time,nodenum,ip,code,errmsg
//time,nodenum,swapout,swapin
static void autoRestart(IPropertyTree *options)
{
// restarts any workunits that failed near to swap
// let see if need resubmit any nodes
StringArray toresubmit;
if (options->getPropBool("SwapNode/@swapNodeRestartJob")) {
Owned grp;
Owned connSwapNode;
Owned info;
StringAttr grpname;
if (!getSwapNodeInfo(options,grpname,grp,connSwapNode,info,false)) { // should put out error if returns false
PROGLOG("SWAPNODE(autoRestart) exiting");
return;
}
CDateTime recent;
recent.setNow();
recent.adjustTime(-SWAPNODE_RETRY_TIME/(1000*60));
Owned it = info->getElements("WorkUnit");
ForEach(*it) {
IPropertyTree &wu = it->query();
const char *wuid = wu.queryProp("@id");
if (!wuid)
continue;
if (!wu.getPropBool("@resubmitted")) {
// see if any swaps recently done
const char *dt1s = wu.queryProp("@time");
if (!dt1s||!*dt1s)
continue;
CDateTime dt1;
dt1.setString(dt1s);
dt1.adjustTime(SWAPNODE_RETRY_TIME/(1000*60));
Owned swit = info->getElements("Swap");
ForEach(*swit) {
IPropertyTree &swap = swit->query();
const char *dt2s = swap.queryProp("@time");
if (!dt2s||!*dt2s)
continue;
CDateTime dt2;
dt2.setString(dt2s);
if ((dt2.compare(recent)>0)&&(dt1.compare(dt2)>0)) {
wu.setPropBool("@resubmitted",true); // only one attempt
toresubmit.append(wuid);
break;
}
}
}
}
}
ForEachItemIn(ir,toresubmit) {
WuResubmit(toresubmit.item(ir));
}
}
static void doAutoSwapNode(IRemoteConnection *connEnv,IRemoteConnection *connFiles,IPropertyTree *options,bool doswap)
{
if (!checkThorNodeSwap(options,NULL,doswap?5:0)) {
PROGLOG("No bad nodes detected");
PROGLOG("SWAPNODE(auto) exiting");
return;
}
Owned grp;
Owned connSwapNode;
Owned info;
StringAttr grpname;
if (!getSwapNodeInfo(options,grpname,grp,connSwapNode,info,false)) { // should put out error if returns false
PROGLOG("SWAPNODE(auto) exiting");
return;
}
StringBuffer ts;
if (!info->getProp("@timeChecked",ts)) {
PROGLOG("SWAPNODE(auto): no check information generated");
return;
}
// enumerate bad nodes
StringBuffer xpath;
xpath.appendf("BadNode[@time=\"%s\"]",ts.str());
Owned it = info->getElements(xpath.str());
SocketEndpointArray epa1;
ForEach(*it) {
IPropertyTree &badnode = it->query();
const char *ip = badnode.queryProp("@netAddress");
if (!ip)
continue;
SocketEndpoint ep(ip);
ep.port = getDaliServixPort();
epa1.append(ep);
}
// recheck
SocketEndpointArray badepa;
UnsignedArray failedcodes;
StringArray failedmessages;
unsigned start = msTick();
validateNodes(epa1,options->getPropBool("SwapNode/@swapNodeCheckC",true),options->getPropBool("SwapNode/@swapNodeCheckD",false),false,options->queryProp("SwapNode/@swapNodeCheckScript"),options->getPropInt("SwapNode/@swapNodeCheckScriptTimeout")*1000,badepa,failedcodes,failedmessages);
if (!badepa.ordinality()) {
PROGLOG("SWAPNODE: on recheck all bad nodes passed (%s,%s)",grpname.get(),ts.str());
return;
}
CDateTime dt;
dt.setNow();
dt.getString(ts.clear());
bool abort=false;
UnsignedArray badrank;
ForEachItemIn(i1,badepa) {
SocketEndpoint ep(badepa.item(i1));
ep.port = 0; // should be no ports in group
StringBuffer ips;
ep.getIpText(ips);
xpath.clear().appendf("BadNode[@netAddress=\"%s\"]",ips.str());
IPropertyTree *bnt = info->queryPropTree(xpath.str());
if (!bnt) {
ERRLOG("SWAPNODE node %s not found in swapnode info!",ips.str());
return;
}
bnt->setProp("@time",ts.str());
int r = bnt->getPropInt("@rank",-1);
if ((int)r<0) { // shouldn't occur
ERRLOG("SWAPNODE node %s rank not found in group %s",ips.str(),grpname.get());
return;
}
badrank.append((unsigned)r);
for (unsigned j1=0;j1ordinality())||
(r1==(r+1)%grp->ordinality())) {
StringBuffer ips1;
ep1.getIpText(ips1);
ERRLOG("SWAPNODE adjacent nodes %d (%s) and %d (%s) are bad!",r+1,ips.str(),r1+1,ips1.str());
abort = true;
}
}
}
// now see if any of bad nodes have been swapped out recently
CDateTime recent = dt;
int snint = options->getPropInt("SwapNode/@swapNodeInterval",24);
recent.adjustTime(-60*snint);
it.setown(info->getElements("Swap"));
ForEach(*it) {
IPropertyTree &swappednode = it->query();
CDateTime dt1;
const char *dt1s = swappednode.queryProp("@time");
if (!dt1s||!*dt1s)
continue;
dt1.setString(dt1s);
if (dt1.compare(recent)<0)
continue;
const char *ips = swappednode.queryProp("@outNetAddress");
if (!ips||!*ips)
continue;
int r1 = swappednode.getPropInt("@rank",-1);
SocketEndpoint swappedep(ips);
swappedep.port = 0;
ForEachItemIn(i2,badepa) {
SocketEndpoint badep(badepa.item(i2));
int badr = (int)badrank.item(i2);
badep.port = 0;
if (swappedep.equals(badep)) {
// not sure if *really* want this
ERRLOG("Node %d (%s) was swapped out on %s (too recent)",badr+1,ips,dt1s);
abort = true;
}
else if ((badr==(r1+1)%grp->ordinality())||
(r1==(badr+1)%grp->ordinality())) {
StringBuffer bs;
ERRLOG("SWAPNODE adjacent node to bad node %d (%s), %d (%s) was swapped on %s (too recent) !",badr+1,badep.getIpText(bs).str(),r1+1,ips,dt1s);
abort = true;
}
}
}
const char *intent = doswap?"will":"would";
// find spares
IPropertyTree* rootEnv = connEnv->queryRoot();
SocketEndpointArray spareepa;
StringArray swapfrom;
StringArray swapto;
if (!abort) {
Owned clusters = connEnv->queryRoot()->getElements("Software/ThorCluster");
ForEach(*clusters) {
IPropertyTree &cluster = clusters->query();
const char *cname = cluster.queryProp("@nodeGroup");
if (!cname||!*cname)
cname = cluster.queryProp("@name");
if (strcmp(grpname.get(),cname)!=0)
continue;
Owned spares = cluster.getElements("ThorSpareProcess");
ForEach(*spares) {
const char *computer = spares->query().queryProp("@computer");
if (!computer||!*computer) {
WARNLOG("SWAPNODE: spare has no computer specified");
continue;
}
SocketEndpoint nodeep;
if (!resolveComputerName(rootEnv,computer,nodeep)) {
WARNLOG("SWAPNODE: cannot resolve spare at %s",computer);
continue;
}
nodeep.port = 0;
bool found = false;
ForEachItemIn(j1,spareepa) {
if (spareepa.item(j1).ipequals(nodeep)) {
found = true;
break;
}
}
if (!found)
spareepa.append(nodeep);
}
}
ForEachItemIn(i3,badepa) {
StringBuffer from;
badepa.item(i3).getIpText(from);
if (i3SWAPNODE_RETRY_TIME) {
ERRLOG("Retry time exceeded, exiting");
break;
}
WARNLOG("Swapnode pausing before retry");
Sleep(60+(getRandom()%60));
}
}
struct DaliClient
{
DaliClient(const char* daliserver): serverGroup(createIGroup(daliserver, DALI_SERVER_PORT))
{
if (!serverGroup)
throw MakeStringException(0, "Could not instantiate IGroup");
if (!initClientProcess(serverGroup,DCR_Util))
throw MakeStringException(0, "Could not initializing client process");
setPasswordsFromSDS();
closeEnvironment();
}
~DaliClient()
{
clearPasswordsFromSDS();
closedownClientProcess();
}
Owned serverGroup;
};
void suppressStdOut(bool suppress=true)
{
static HANDLE out;
static HANDLE saveout;
#ifdef WIN32
if (suppress) {
saveout = GetStdHandle(STD_OUTPUT_HANDLE);
out = ::CreateFile("nul",GENERIC_WRITE,0,NULL,CREATE_ALWAYS,FILE_FLAG_WRITE_THROUGH,NULL);
SetStdHandle(STD_OUTPUT_HANDLE,out);
}
else {
SetStdHandle(STD_OUTPUT_HANDLE,saveout);
CloseHandle(out);
}
#else
saveout = fileno(stdout);
#endif
}
int main(int argc,char** argv)
{
InitModuleObjects();
int ret = 0;
bool isauto = (argc>=2)&&(stricmp(argv[1],"auto")==0);
bool ishistory = (argc>=2)&&(stricmp(argv[1],"history")==0);
bool isswapped = (argc>=2)&&(stricmp(argv[1],"swapped")==0);
bool isemail = (argc>=2)&&(stricmp(argv[1],"email")==0);
if ((argc<5)&&!isauto&&!ishistory&&!isswapped&&!isemail) {
fprintf(stderr,"Usage: swapnode \n");
fprintf(stderr," or: swapnode history [] -- list swap history \n");
fprintf(stderr," or: swapnode history [] 2> outfile.csv -- save swap history \n");
fprintf(stderr," or: swapnode swapped [] -- list currently swapped nodes\n");
fprintf(stderr," or: swapnode email -- tests email\n");
fprintf(stderr," or: swapnode auto [swap]\n");
fprintf(stderr,"NB auto,history,swapped and email must be run in a thor deploy directory \n");
fprintf(stderr," (e.g. /c$/thor) or in a directory with copy of thor.xml\n");
fprintf(stderr,"if 'swap' not specified after 'auto' then only displays what *would* be swapped\n");
ret = 2;
}
else {
try {
const char* daliserver;
Owned options;
if (isauto||ishistory|isswapped|isemail) {
options.setown(createPTreeFromXMLFile("thor.xml", ipt_caseInsensitive));
daliserver = options?options->queryProp("@daliServers"):NULL;
if (!daliserver||!*daliserver)
throw MakeStringException(-1,"Either thor.xml not found or DALISERVERS not found in thor.xml");
}
else {
options.setown(createPTree(ipt_caseInsensitive)); // don't use thor.xml
daliserver = argv[1];
}
DaliClient dclient(daliserver);
StringBuffer logname;
splitFilename(argv[0], NULL, NULL, &logname, NULL);
addFileTimestamp(logname, true);
logname.append(".log");
StringBuffer lf;
openLogFile(lf, logname.str(),0,false,true);
queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_prefix);
if (options&&options->getPropBool("@enableSysLog",true))
UseSysLogForOperatorMessages();
if (argc>=5) {
//DOM- Moved logic to swapnodemain.cpp so I use it within the management console...
const char* thor=argv[2];
const char* oldip=argv[3];
const char* newip=argv[4];
unsigned nodenum=(argc>5)?atoi(argv[5]):0;
SwapNode(thor,oldip,newip,nodenum);
}
else if (isauto)
autoSwapNode(options,(argc>2)&&(stricmp(argv[2],"swap")==0));
else if (ishistory)
swapNodeHistory(options,(argc>2)?atoi(argv[2]):0,NULL);
else if (isswapped)
swappedList(options,(argc>2)?atoi(argv[2]):0,NULL);
else if (isemail) {
bool sendswapped = (argc>=3)&&(stricmp(argv[2],"swapped")==0);
bool sendhistory = (argc>=3)&&(stricmp(argv[2],"history")==0);
EmailSwap(options, NULL,true, sendswapped,sendhistory);
}
}
catch (IException *e) {
EXCLOG(e,"SWAPNODE");
e->Release();
ret = -1;
}
}
UseSysLogForOperatorMessages(false);
ExitModuleObjects();
return ret;
}
#endif