/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#ifdef _WIN32
#define _WIN32_WINNT 0x0400
#include
#endif
#include "platform.h"
#include "thirdparty.h"
#include "jlib.hpp"
#include "jhtree.hpp"
#include "jio.hpp"
#include "jstring.hpp"
#include "jfile.hpp"
#include "jexcept.hpp"
#include "jsocket.hpp"
#include "jlog.hpp"
#include "rmtfile.hpp"
#define USE_JLOG
extern bool outputPartsFiles(const char *daliserver,const char *cluster,const char *outdir,StringBuffer &errstr);
extern void applyPartsFile(IFileIO *in,void (* applyfn)(const char *,const char *));
static AtomRefTable *ignoreExt = NULL;
MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ignoreExt = new AtomRefTable(true);
return true;
}
MODULE_EXIT()
{
::Release(ignoreExt);
}
#define BUFSIZE 0x10000
#define MINCOMPRESS_THRESHOLD 0x8000
static StringAttr unixmirror("/mnt/mirror");
static bool checkMode = false;
static bool silent = false;
static bool verbose = false;
static bool compressAll = false;
static bool noCheckSlaveCount = false;
static bool inexactDateMatch = false;
#ifdef USE_JLOG // and why not?
#define println PROGLOG
#define printerr ERRLOG
#else
static void println(const char *format, ...) __attribute__((format(printf, 1, 2)))
{
va_list x;
va_start(x, format);
vfprintf(stdout,format, x);
fprintf(stdout,"\n");
fflush(stdout);
va_end(x);
}
static void printerr(const char *format, ...) __attribute__((format(printf, 1, 2)))
{
va_list x;
va_start(x, format);
fprintf(stderr,"ERROR: ");
vfprintf(stderr,format, x);
fprintf(stderr,"\n");
fflush(stderr);
va_end(x);
}
#endif
static bool shouldCompressFile(const char *name)
{
if (compressAll)
return true;
OwnedIFile file = createIFile(name);
bool iskey = false;
unsigned __int64 filesize = file->size();
if (filesize < MINCOMPRESS_THRESHOLD)
{
if (verbose)
println("File %s is too small to compress", name);
return false;
}
return !isCompressedIndex(name);
}
static bool CopySingleFile(IFile *srcfile,IFile *dstfile, bool compress, bool suppressnotfounderrs)
{
const char *source = srcfile->queryFilename();
const char *target = dstfile->queryFilename();
#ifdef _WIN32
if (compress && shouldCompressFile(source))
{
if (!silent)
println("Copy %s to %s with compress", source, target);
if (!checkMode)
{
HANDLE hTarget=::CreateFile(target,GENERIC_READ|GENERIC_WRITE,0,NULL,CREATE_NEW,0,NULL);
USHORT compression=COMPRESSION_FORMAT_DEFAULT;
DWORD bytes;
if(::DeviceIoControl(hTarget, FSCTL_SET_COMPRESSION, &compression, sizeof(compression), NULL, 0, &bytes, NULL))
{
HANDLE hSource=::CreateFile(source,GENERIC_READ,0,NULL,OPEN_EXISTING,0,NULL);
void *buf = malloc(BUFSIZE);
loop
{
DWORD read;
if (!::ReadFile(hSource, buf, BUFSIZE, &read, NULL))
throw MakeOsException(GetLastError(), "Failed to read file %s", source);
if (read)
{
DWORD wrote;
if (!::WriteFile(hTarget, buf, read, &wrote, NULL))
throw MakeOsException(GetLastError(), "Failed to write file %s", target);
assertex(wrote==read);
}
else
break;
}
FILETIME c, a, w;
::GetFileTime(hSource, &c, &a, &w);
::SetFileTime(hTarget, &c, &a, &w);
::CloseHandle(hSource);
::CloseHandle(hTarget);
return true;
}
DWORD err=::GetLastError();
::CloseHandle(hTarget);
}
return checkMode;
}
#endif
if (!silent)
println("Copy %s to %s", source, target);
if(checkMode)
return false;
try {
recursiveCreateDirectoryForFile(target); // maybe should only do if fails
dstfile->remove();
srcfile->copyTo(dstfile,0x100000,NULL,true);
}
catch (IException *e) {
if (suppressnotfounderrs) {
if (srcfile&&!srcfile->exists()) { // its gone!
if (verbose)
printerr("File %s no longer exists", source);
e->Release();
return true;
}
}
StringBuffer msg("CopyFile(");
msg.append(source).append(',').append(target).append("): ");
e->errorMessage(msg);
printerr("%s",msg.str());
e->Release();
return false;
}
return true;
}
void syncFile(const char *src, const char *dst)
{
// from must exist otherwise ignore
Owned srcfile = createIFile(src);
bool isdir;
CDateTime srcdt;
offset_t srcsz;
if (srcfile->getInfo(isdir,srcsz,srcdt)) { // ignore if not there
if (isdir)
printerr("src file %s is directory, ignoring copy", src);
else {
Owned dstfile = createIFile(dst);
CDateTime dstdt;
offset_t dstsz;
if (dstfile->getInfo(isdir,dstsz,dstdt)) { // check if there
if (isdir) {
printerr("dst file %s is directory, ignoring copy", dst);
return;
}
if ((srcsz==dstsz)&&srcdt.equals(dstdt,!inexactDateMatch))
return;
}
CopySingleFile(srcfile,dstfile, false, true);
}
}
}
static void usage()
{
printf("\nBACKUPNODE sourcepath targetpath [options]\n");
printf(" Copies and optionally compresses files from source to target\n\n");
printf("BACKUPNODE -T slavesfile slaveno path1 path2 path3...\n");
printf(" Thor node backup mode - syncs named paths with adjacent d: drive\n\n");
printf(" if no paths specified use DAT files in directory specified by -X\n\n");
printf("BACKUPNODE -W slavesfile dir\n");
printf(" Waits for .ERR files in the specified directory then concatenates into a log file\n\n");
printf("BACKUPNODE -O daliip cluster outdir\n");
printf(" generates data files in outdir containing all files to be checked (*.DAT) \n\n");
printf("Options:\n");
printf(" -A - compression options apply to all files (normally excludes small files and all keys)\n");
printf(" -B - use /mnt/mirror for replicate target\n");
printf(" -C - compress files on target (including existing files)\n");
printf(" -D - overwrite existing files if size/date mismatch\n");
printf(" -E - set compression state of existing files\n");
printf(" -F - use option XML file\n");
printf(" -I - ignore files that have specified extention\n");
printf(" -M - ignore sub-second differences when comparing file dates\n");
printf(" -N - Include files even if slave count does not match filename\n");
printf(" -Q - quiet mode: only errors are reported\n");
printf(" -V - verbose mode\n");
printf(" -Y - report what would have been copied/compressed but do nothing\n");
printf(" -S - snmp enabled\n");
printf(" -X - read part lists (%%n.DAT) from and write %%n.ERR to specified dir\n");
exit(2);
}
static bool different(IFile &target, IFile &source)
{
CDateTime tmt, smt;
if (target.size() != source.size())
return true;
target.getTime(NULL, &tmt, NULL);
if (inexactDateMatch)
{
unsigned hour, min, sec, nanosec;
tmt.getTime(hour, min, sec, nanosec);
tmt.setTime(hour, min, sec, 0);
}
source.getTime(NULL, &smt, NULL);
if (inexactDateMatch)
{
unsigned hour, min, sec, nanosec;
smt.getTime(hour, min, sec, nanosec);
smt.setTime(hour, min, sec, 0);
}
return tmt.compare(smt) != 0;
}
static bool includeFile(IFile &file, unsigned numSlaves)
{
StringBuffer ext;
splitFilename(file.queryFilename(), NULL, NULL, NULL, &ext);
const char *_ext = ext.length()?ext.str()+1:"";
if (ignoreExt->find(*_ext))
return false;
if (!numSlaves || noCheckSlaveCount)
return true;
const char *partcount = strstr(ext.str(), "_of_");
if (partcount)
{
unsigned clusterSize = atoi(partcount+4);
return clusterSize==numSlaves || clusterSize==numSlaves+1;
}
else
return false;
}
static void CopyDirectory(const char *source, const char *target, unsigned numSlaves, bool compress, bool sourceIsMaster)
{
if (verbose)
println("Copy directory %s to %s", source, target);
bool first = true;
Owned dir = createDirectoryIterator(source, "*");
ForEach (*dir)
{
IFile &sourceFile = dir->query();
if (sourceFile.isFile())
{
if (includeFile(sourceFile, numSlaves))
{
StringBuffer targetname(target);
targetname.append(PATHSEPCHAR);
dir->getName(targetname);
OwnedIFile destFile = createIFile(targetname.str());
if ((destFile->size()==-1) || (sourceIsMaster && different(*destFile, sourceFile)))
{
if (first && !checkMode)
{
if (!recursiveCreateDirectory(target)) {
throw MakeStringException(-1,"Cannot create directory %s",target);
}
first = false;
}
if (!CopySingleFile(&sourceFile, destFile, compress, true))
printerr("File %s copy to %s failed", sourceFile.queryFilename(), destFile->queryFilename());
}
else if (verbose)
{
println("File %s already exists", destFile->queryFilename());
}
}
else if (verbose)
println("Skipping file %s (cluster size mismatch)", sourceFile.queryFilename());
}
else if (sourceFile.isDirectory())
{
StringBuffer newSource(source);
StringBuffer newTarget(target);
newSource.append(PATHSEPCHAR);
newTarget.append(PATHSEPCHAR);
dir->getName(newSource);
dir->getName(newTarget);
CopyDirectory(newSource.str(), newTarget.str(), numSlaves, compress, sourceIsMaster);
}
}
if (verbose)
println("Copied directory %s to %s", source, target);
}
static void CompressDirectory(const char *target, unsigned numSlaves, bool compress)
{
#ifdef _WIN32
if (verbose)
println("%s directory %s", compress ? "Compress" : "Decompress", target);
Owned dir = createDirectoryIterator(target, "*");
ForEach (*dir)
{
IFile &targetFile = dir->query();
if (targetFile.isFile())
{
if (includeFile(targetFile, numSlaves))
{
// Quick test to see if it's a key file.
bool compressThis = compress && shouldCompressFile(targetFile.queryFilename());
DWORD attr=::GetFileAttributes(targetFile.queryFilename());
if (attr==-1)
printerr("Could not read compression state of %s: error %x", targetFile.queryFilename(), ::GetLastError());
else
{
bool compressed = (attr & FILE_ATTRIBUTE_COMPRESSED) != 0;
if (compressed != compressThis)
{
if (!silent)
{
if (compressThis)
println("Compress %s before %"I64F"d", targetFile.queryFilename(), targetFile.size());
else
println("Decompress %s before %"I64F"d", targetFile.queryFilename(), targetFile.compressedSize());
}
if (!checkMode)
targetFile.setCompression(compressThis);
if (!silent)
{
if (compressThis)
{
if (checkMode)
println(""); // size after not known
else
println("after %"I64F"d", targetFile.compressedSize());
}
else
println("after %"I64F"d", targetFile.size());
}
}
}
}
}
else if (targetFile.isDirectory())
{
StringBuffer newTarget(target);
newTarget.append(PATHSEPCHAR);
dir->getName(newTarget);
CompressDirectory(newTarget.str(), numSlaves, compress);
}
}
if (verbose)
println("%s directory %s", compress ? "Compressed" : "Decompressed", target);
#endif
}
#define MAX_SLAVES 1000
static StringAttr slaveIP[MAX_SLAVES+1];
static unsigned numSlaves;
static void loadSlaves(const char *slavesName)
{
FILE *slavesFile = fopen(slavesName, "rt");
if( !slavesFile)
{
printerr("failed to open slaves file %s", slavesName);
throw MakeStringException(MSGAUD_operator, 0, "failed to open slaves file %s", slavesName);
}
char inbuf[1000];
numSlaves = 0;
while (fgets( inbuf, sizeof(inbuf), slavesFile))
{
char *hash = strchr(inbuf, '#');
if (hash)
*hash = 0;
char *finger = inbuf;
loop
{
while (isspace(*finger))
finger++;
char *start = finger;
while (*finger && !isspace(*finger))
finger++;
if (finger > start)
slaveIP[numSlaves ++].set(start, finger - start);
else
break;
if (numSlaves > MAX_SLAVES)
{
printerr("Too many slaves - invalid slaves file %s?", slavesName);
throw MakeStringException(MSGAUD_operator, 0, "Too many slaves - invalid slaves file %s?", slavesName);
}
}
}
fclose(slavesFile);
slaveIP[numSlaves].set(slaveIP[0].get());
}
static void waitSlaves(const char *dir,unsigned num,StringAttr *slaves)
{
unsigned start=msTick();
unsigned last=0;
bool *done = (bool *)calloc(num,sizeof(bool));
unsigned ndone = 0;
unsigned errors = 0;
StringBuffer name;
while (ndone file = createIFile(name.str());
Owned fio = file->open(IFOread);
if (fio) {
size32_t sz = (size32_t)fio->size();
if (sz) {
StringBuffer s;
fio->read(0,sz,s.reserve(sz));
println("%s: %s",slaves[i].get(),s.str());
errors++;
}
else {
try {
fio.clear();
file->remove();
}
catch (IException *e) {
StringBuffer msg("waitSlaves.1: ");
e->errorMessage(msg);
println("%s",msg.str());
e->Release();
}
println("%s: DONE",slaves[i].get());
}
break;
}
}
catch (IException *e) {
if (attempt==9) {
StringBuffer msg("waitSlaves.2: ");
e->errorMessage(msg);
println("%s",msg.str());
}
e->Release();
}
Sleep(5000);
}
}
}
}
if (startndone==ndone) {
Sleep(5000);
}
unsigned t = (msTick()-start)/(5*1000*60);
if (t!=last) {
last = t;
println("Running: %d minutes taken, %d slave%s complete of %d",t*5,ndone,(ndone==1)?"":"s",num);
if (num-ndone<10) {
StringBuffer waiting;
for (unsigned j=0;jqueryCreate(argv[argNo++]);
break;
}
case 'M':
inexactDateMatch = true;
break;
case 'N':
noCheckSlaveCount = true;
break;
case 'O':
outputMode = true;
break;
case 'Q':
if (verbose)
println("Silent and verbose specified - silent will be ignored");
else
silent = true;
break;
case 'S':
snmpEnabled = true;
break;
case 'T':
thorMode = true;
break;
case 'V':
if (silent)
{
println("Silent and verbose specified - silent will be ignored");
silent = false;
}
verbose = true;
break;
case 'W':
waitMode = true;
break;
case 'X':
if ((int)argNonumSlaves)
{
printerr("'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
throw MakeStringException(-1, "'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
}
if (!forceSlaveIP)
{
IpAddress myip;
GetHostIp(myip);
IpAddress myipfromSlaves(slaveIP[slaveNum-1]);
if (!myip.ipequals(myipfromSlaves))
{
StringBuffer ips1, ips2;
myipfromSlaves.getIpText(ips1);
myip.getIpText(ips2);
printerr("IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
throw MakeStringException(-1, "IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
}
}
if (usedatfile) {
StringBuffer datafile(errdatdir);
addPathSepChar(datafile).append(slaveNum).append(".DAT");
Owned file = createIFile(datafile.str());
Owned fio;
// add a slight stagger
Sleep(slaveNum*200);
for (unsigned attempt=0;attempt<10;attempt++) {
try {
fio.setown(file->open(IFOread));
if (fio)
break;
}
catch (IException *e) {
if (attempt==9) {
StringBuffer msg;
e->errorMessage(msg);
printerr("%s",msg.str());
}
e->Release();
}
Sleep(5000);
}
if (fio)
applyPartsFile(fio,syncFile);
else {
printerr("Could not read file %s",datafile.str());
throw MakeStringException(-1, "Could not read file %s",datafile.str());
}
}
else {
aindex_t numArgs = args.ordinality();
for (aindex_t idx = 2; idxerrorMessage(erroutstr);
printerr("%s",erroutstr.str());
E->Release();
retValue = 2;
}
if (errdatdir.length()&&slaveNum) {
StringBuffer errfilename(errdatdir);
addPathSepChar(errfilename).append(slaveNum).append(".ERR");
Owned file = createIFile(errfilename.str());
for (unsigned attempt=0;attempt<10;attempt++) {
try {
Owned fio = file->open(IFOcreate);
if (fio) {
if (erroutstr.length()) {
if (erroutstr.charAt(erroutstr.length()-1)!='\n')
erroutstr.append('\n');
fio->write(0,erroutstr.length(),erroutstr.str());
}
releaseAtoms();
return retValue;
}
}
catch (IException *e) {
if (attempt==9) {
StringBuffer msg;
e->errorMessage(msg);
printerr("%s",msg.str());
}
e->Release();
}
Sleep(5000);
}
printerr("Could not write to %s",errfilename.str());
}
releaseAtoms();
return retValue;
}