/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "platform.h" #include "jlib.hpp" #include "jiface.hpp" #include "jptree.hpp" #include "jmisc.hpp" #include "jregexp.hpp" #include "jset.hpp" #include "jflz.hpp" #include "mpbase.hpp" #include "mpcomm.hpp" #include "daclient.hpp" #include "dadfs.hpp" #include "dautils.hpp" #include "dasds.hpp" #include "rmtfile.hpp" #define LOGPFX "backupnode: " #define FLZCOMPRESS class CFileListWriter { public: bool abort; bool verbose; StringArray clustersin; StringArray clustersout; IGroup *group; IpAddress *iphash; unsigned *ipnum; unsigned iphashsz; unsigned numparts; unsigned numfiles; IArrayOf *outStreams; void log(const char * format, ...) __attribute__((format(printf, 2, 3))) { va_list args; va_start(args, format); StringBuffer line; line.valist_appendf(format, args); va_end(args); PROGLOG(LOGPFX "%s",line.str()); } void error(const char * format, ...) __attribute__((format(printf, 2, 3))) { va_list args; va_start(args, format); StringBuffer line; line.valist_appendf(format, args); va_end(args); ERRLOG(LOGPFX "%s",line.str()); } void warn(const char * format, ...) __attribute__((format(printf, 2, 3))) { va_list args; va_start(args, format); StringBuffer line; line.valist_appendf(format, args); va_end(args); WARNLOG(LOGPFX "%s",line.str()); } void addIpHash(const IpAddress &ip,unsigned n) { unsigned r; _cpyrev4(&r,&ip); unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz; while (!iphash[h].isNull()) if (++h==iphashsz) h = 0; iphash[h] = ip; ipnum[h] = n; } unsigned checkIpHash(const IpAddress &ip) { unsigned r; _cpyrev4(&r,&ip); unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz; while (!iphash[h].isNull()) { if (iphash[h].ipequals(ip)) return ipnum[h]; if (++h==iphashsz) h = 0; } return NotFound; } CFileListWriter() { abort = false; verbose = false; iphash = NULL; ipnum = NULL; iphashsz = 0; numfiles = 0; numparts = 0; } ~CFileListWriter() { if (iphash) delete [] iphash; delete [] ipnum; } void write(IGroup *_group,IArrayOf &_outStreams) { if (!_group||abort) return; group = _group; outStreams = &_outStreams; delete [] iphash; iphash = NULL; delete [] ipnum; iphashsz = group->ordinality()*2; iphash = new IpAddress[iphashsz]; ipnum = new unsigned[iphashsz]; bool grphasports = false; ForEachNodeInGroup(i,*group) { const SocketEndpoint &ep = group->queryNode(i).endpoint(); if (ep.port!=0) grphasports = true; addIpHash(ep,i); } if (grphasports) ERRLOG(LOGPFX "Group has ports!"); class cfilescan1 : public CSDSFileScanner { Owned conn; CFileListWriter &parent; bool &abort; bool checkFileOk(IPropertyTree &file,const char *filename) { if (abort) return false; StringArray groups; getFileGroups(&file,groups); if (groups.ordinality()==0) { parent.error("File has no group defined: %s",filename); return false; } ForEachItemIn(i,groups) { const char *cluster = groups.item(i); ForEachItemIn(j1,parent.clustersin) { if (strcmp(parent.clustersin.item(j1),cluster)==0) return true; } bool excluded = false; ForEachItemIn(j2,parent.clustersout) { if (strcmp(parent.clustersout.item(j2),cluster)==0) { excluded = true; break; } } if (excluded) continue; Owned group = queryNamedGroupStore().lookup(cluster); if (!group) { parent.error("cannot find cluster %s",cluster); parent.clustersout.append(cluster); continue; } ForEachNodeInGroup(i,*group) { unsigned nn = parent.checkIpHash(group->queryNode(i).endpoint()); if (nn!=NotFound) { parent.clustersin.append(cluster); return true; } } } return false; } bool checkScopeOk(const char *scopename) { return !abort; } void processFile(IPropertyTree &file,StringBuffer &name) { if (abort) return; if (parent.verbose) parent.log("Process file %s",name.str()); Owned fdesc; try { fdesc.setown(deserializeFileDescriptorTree(&file,&queryNamedGroupStore())); } catch (IException *e) { EXCLOG(e,LOGPFX "processFile"); e->Release(); } if (fdesc) { unsigned np = fdesc->numParts(); if (np==0) { parent.error("File has no parts %s",name.str()); return; } parent.numfiles++; StringBuffer fn; StringBuffer dir; bool incluster = true; StringBuffer ln; for (unsigned p=0;pnumCopies(p); unsigned c; UnsignedArray map; unsigned nf = 0; for (c=0;cqueryNode(p,c); unsigned nn = parent.checkIpHash(node->endpoint()); map.append(nn); if (nn!=NotFound) nf++; } if (nf>1) { // 1 not much use parent.numparts++; ForEachItemIn(i,map) { unsigned from = map.item(i); if (from!=NotFound) { ForEachItemIn(j,map) { if (i!=j) { unsigned to = map.item(j); if (to!=NotFound) { // right lets go for it IFileIOStream &out = parent.outStreams->item(from); RemoteFilename rfn; fdesc->getFilename(p,i,rfn); rfn.getLocalPath(ln.clear()); ln.append('|'); fdesc->getFilename(p,j,rfn); rfn.getRemotePath(ln); ln.append('\n'); out.write(ln.length(),ln.str()); } } } } } } } } else parent.error("cannot create file descriptor",name.str()); } public: cfilescan1(CFileListWriter &_parent,bool &_abort) : parent(_parent), abort(_abort) { } ~cfilescan1() { } void scan() { if (abort) return; conn.setown(querySDS().connect("/Files", myProcessSession(), 0, 100000)); if (!conn||abort) return; CSDSFileScanner::scan(conn); } } filescan(*this,abort); filescan.scan(); log("File scan complete, %d files, %d parts",numfiles,numparts); } }; class CFileStreamReader // this ought to be in jlib really { Linked strm; MemoryAttr mba; size32_t maxlinesize; size32_t buffsize; char *buf; size32_t lbsize; char *p; bool eof; public: CFileStreamReader(IFileIOStream * _in,size32_t _maxlinesize=8192,size32_t _buffsize=0x10000) : strm(_in) { maxlinesize = _maxlinesize; buffsize = _buffsize; buf = (char *)mba.allocate(buffsize+maxlinesize+1); lbsize = 0; p=NULL; eof = false; } char* nextLine(size32_t &lnsize) { if (lbsizeread(buffsize,buf+lbsize); if (rd==0) { eof = true; if (lbsize==0) return NULL; if (buf[lbsize-1]!='\n') buf[lbsize++] = '\n'; // terminate unfinished line } else lbsize += rd; } else if (lbsize==0) return NULL; } size32_t len = 0; char *ret = p; while ((len serverGroup = createIGroup(daliserver,DALI_SERVER_PORT); initClientProcess(serverGroup, DCR_BackupGen, 0, NULL, NULL, 1000*60*5); dalistarted = true; CFileListWriter writer; Owned group = queryNamedGroupStore().lookup(cluster); if (group) { IArrayOf outStreams; StringBuffer path; ForEachNodeInGroup(i,*group) { addPathSepChar(path.clear().append(outdir)).append(i+1).append(".DAT"); Owned outf = createIFile(path.str()); Owned outio = outf?outf->open(IFOcreate):NULL; #ifdef FLZCOMPRESS Owned out = outio?createFastLZStreamWrite(outio):NULL; #else Owned out = outio?createBufferedIOStream(outio):NULL; #endif if (!out) { errstr.appendf(LOGPFX "cannot create file %s",path.str()); closedownClientProcess(); return false; } outStreams.append(*out.getClear()); } writer.write(group,outStreams); closedownClientProcess(); return true; } else errstr.appendf(LOGPFX "cannot find cluster %s",cluster); } catch (IException *e) { errstr.append(LOGPFX "outPartsFile : "); e->errorMessage(errstr); e->Release(); } } else errstr.append(LOGPFX "no dali server specified"); if (dalistarted) closedownClientProcess(); return errstr.length()==0; } void applyPartsFile(IFileIO *in,void (* sync)(const char *,const char *)) { #ifdef FLZCOMPRESS Owned strm = createFastLZStreamRead(in); #else Owned strm = createBufferedIOStream(in); #endif CFileStreamReader reader(strm); loop { size32_t sz; char *line = reader.nextLine(sz); if (!line) break; char *split = strchr(line,'|'); if (split) { *(split++) = 0; sync(line,split); } } }