/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include "jlib.hpp"
#include "jiface.hpp"
#include "jptree.hpp"
#include "jmisc.hpp"
#include "jregexp.hpp"
#include "jset.hpp"
#include "jflz.hpp"
#include "mpbase.hpp"
#include "mpcomm.hpp"
#include "daclient.hpp"
#include "dadfs.hpp"
#include "dautils.hpp"
#include "dasds.hpp"
#include "rmtfile.hpp"
#define LOGPFX "backupnode: "
#define FLZCOMPRESS
class CFileListWriter
{
public:
bool abort;
bool verbose;
StringArray clustersin;
StringArray clustersout;
IGroup *group;
IpAddress *iphash;
unsigned *ipnum;
unsigned iphashsz;
unsigned numparts;
unsigned numfiles;
IArrayOf *outStreams;
void log(const char * format, ...) __attribute__((format(printf, 2, 3)))
{
va_list args;
va_start(args, format);
StringBuffer line;
line.valist_appendf(format, args);
va_end(args);
PROGLOG(LOGPFX "%s",line.str());
}
void error(const char * format, ...) __attribute__((format(printf, 2, 3)))
{
va_list args;
va_start(args, format);
StringBuffer line;
line.valist_appendf(format, args);
va_end(args);
ERRLOG(LOGPFX "%s",line.str());
}
void warn(const char * format, ...) __attribute__((format(printf, 2, 3)))
{
va_list args;
va_start(args, format);
StringBuffer line;
line.valist_appendf(format, args);
va_end(args);
WARNLOG(LOGPFX "%s",line.str());
}
void addIpHash(const IpAddress &ip,unsigned n)
{
unsigned r;
_cpyrev4(&r,&ip);
unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
while (!iphash[h].isNull())
if (++h==iphashsz)
h = 0;
iphash[h] = ip;
ipnum[h] = n;
}
unsigned checkIpHash(const IpAddress &ip)
{
unsigned r;
_cpyrev4(&r,&ip);
unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
while (!iphash[h].isNull()) {
if (iphash[h].ipequals(ip))
return ipnum[h];
if (++h==iphashsz)
h = 0;
}
return NotFound;
}
CFileListWriter()
{
abort = false;
verbose = false;
iphash = NULL;
ipnum = NULL;
iphashsz = 0;
numfiles = 0;
numparts = 0;
}
~CFileListWriter()
{
if (iphash)
delete [] iphash;
delete [] ipnum;
}
void write(IGroup *_group,IArrayOf &_outStreams)
{
if (!_group||abort)
return;
group = _group;
outStreams = &_outStreams;
delete [] iphash;
iphash = NULL;
delete [] ipnum;
iphashsz = group->ordinality()*2;
iphash = new IpAddress[iphashsz];
ipnum = new unsigned[iphashsz];
bool grphasports = false;
ForEachNodeInGroup(i,*group) {
const SocketEndpoint &ep = group->queryNode(i).endpoint();
if (ep.port!=0)
grphasports = true;
addIpHash(ep,i);
}
if (grphasports)
ERRLOG(LOGPFX "Group has ports!");
class cfilescan1 : public CSDSFileScanner
{
Owned conn;
CFileListWriter &parent;
bool &abort;
bool checkFileOk(IPropertyTree &file,const char *filename)
{
if (abort)
return false;
StringArray groups;
getFileGroups(&file,groups);
if (groups.ordinality()==0) {
parent.error("File has no group defined: %s",filename);
return false;
}
ForEachItemIn(i,groups) {
const char *cluster = groups.item(i);
ForEachItemIn(j1,parent.clustersin) {
if (strcmp(parent.clustersin.item(j1),cluster)==0)
return true;
}
bool excluded = false;
ForEachItemIn(j2,parent.clustersout) {
if (strcmp(parent.clustersout.item(j2),cluster)==0) {
excluded = true;
break;
}
}
if (excluded)
continue;
Owned group = queryNamedGroupStore().lookup(cluster);
if (!group) {
parent.error("cannot find cluster %s",cluster);
parent.clustersout.append(cluster);
continue;
}
ForEachNodeInGroup(i,*group) {
unsigned nn = parent.checkIpHash(group->queryNode(i).endpoint());
if (nn!=NotFound) {
parent.clustersin.append(cluster);
return true;
}
}
}
return false;
}
bool checkScopeOk(const char *scopename)
{
return !abort;
}
void processFile(IPropertyTree &file,StringBuffer &name)
{
if (abort)
return;
if (parent.verbose)
parent.log("Process file %s",name.str());
Owned fdesc;
try {
fdesc.setown(deserializeFileDescriptorTree(&file,&queryNamedGroupStore()));
}
catch (IException *e) {
EXCLOG(e,LOGPFX "processFile");
e->Release();
}
if (fdesc) {
unsigned np = fdesc->numParts();
if (np==0) {
parent.error("File has no parts %s",name.str());
return;
}
parent.numfiles++;
StringBuffer fn;
StringBuffer dir;
bool incluster = true;
StringBuffer ln;
for (unsigned p=0;pnumCopies(p);
unsigned c;
UnsignedArray map;
unsigned nf = 0;
for (c=0;cqueryNode(p,c);
unsigned nn = parent.checkIpHash(node->endpoint());
map.append(nn);
if (nn!=NotFound)
nf++;
}
if (nf>1) { // 1 not much use
parent.numparts++;
ForEachItemIn(i,map) {
unsigned from = map.item(i);
if (from!=NotFound) {
ForEachItemIn(j,map) {
if (i!=j) {
unsigned to = map.item(j);
if (to!=NotFound) {
// right lets go for it
IFileIOStream &out = parent.outStreams->item(from);
RemoteFilename rfn;
fdesc->getFilename(p,i,rfn);
rfn.getLocalPath(ln.clear());
ln.append('|');
fdesc->getFilename(p,j,rfn);
rfn.getRemotePath(ln);
ln.append('\n');
out.write(ln.length(),ln.str());
}
}
}
}
}
}
}
}
else
parent.error("cannot create file descriptor",name.str());
}
public:
cfilescan1(CFileListWriter &_parent,bool &_abort)
: parent(_parent), abort(_abort)
{
}
~cfilescan1()
{
}
void scan()
{
if (abort)
return;
conn.setown(querySDS().connect("/Files", myProcessSession(), 0, 100000));
if (!conn||abort)
return;
CSDSFileScanner::scan(conn);
}
} filescan(*this,abort);
filescan.scan();
log("File scan complete, %d files, %d parts",numfiles,numparts);
}
};
class CFileStreamReader // this ought to be in jlib really
{
Linked strm;
MemoryAttr mba;
size32_t maxlinesize;
size32_t buffsize;
char *buf;
size32_t lbsize;
char *p;
bool eof;
public:
CFileStreamReader(IFileIOStream * _in,size32_t _maxlinesize=8192,size32_t _buffsize=0x10000)
: strm(_in)
{
maxlinesize = _maxlinesize;
buffsize = _buffsize;
buf = (char *)mba.allocate(buffsize+maxlinesize+1);
lbsize = 0;
p=NULL;
eof = false;
}
char* nextLine(size32_t &lnsize)
{
if (lbsizeread(buffsize,buf+lbsize);
if (rd==0) {
eof = true;
if (lbsize==0)
return NULL;
if (buf[lbsize-1]!='\n')
buf[lbsize++] = '\n'; // terminate unfinished line
}
else
lbsize += rd;
}
else if (lbsize==0)
return NULL;
}
size32_t len = 0;
char *ret = p;
while ((len serverGroup = createIGroup(daliserver,DALI_SERVER_PORT);
initClientProcess(serverGroup, DCR_BackupGen, 0, NULL, NULL, 1000*60*5);
dalistarted = true;
CFileListWriter writer;
Owned group = queryNamedGroupStore().lookup(cluster);
if (group) {
IArrayOf outStreams;
StringBuffer path;
ForEachNodeInGroup(i,*group) {
addPathSepChar(path.clear().append(outdir)).append(i+1).append(".DAT");
Owned outf = createIFile(path.str());
Owned outio = outf?outf->open(IFOcreate):NULL;
#ifdef FLZCOMPRESS
Owned out = outio?createFastLZStreamWrite(outio):NULL;
#else
Owned out = outio?createBufferedIOStream(outio):NULL;
#endif
if (!out) {
errstr.appendf(LOGPFX "cannot create file %s",path.str());
closedownClientProcess();
return false;
}
outStreams.append(*out.getClear());
}
writer.write(group,outStreams);
closedownClientProcess();
return true;
}
else
errstr.appendf(LOGPFX "cannot find cluster %s",cluster);
}
catch (IException *e) {
errstr.append(LOGPFX "outPartsFile : ");
e->errorMessage(errstr);
e->Release();
}
}
else
errstr.append(LOGPFX "no dali server specified");
if (dalistarted)
closedownClientProcess();
return errstr.length()==0;
}
void applyPartsFile(IFileIO *in,void (* sync)(const char *,const char *))
{
#ifdef FLZCOMPRESS
Owned strm = createFastLZStreamRead(in);
#else
Owned strm = createBufferedIOStream(in);
#endif
CFileStreamReader reader(strm);
loop {
size32_t sz;
char *line = reader.nextLine(sz);
if (!line)
break;
char *split = strchr(line,'|');
if (split) {
*(split++) = 0;
sync(line,split);
}
}
}