/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ // todo look at IRemoteFileServer stop #include "platform.h" #include "limits.h" #include "jlib.hpp" #include "jio.hpp" #include "jmutex.hpp" #include "jfile.hpp" #include "jmisc.hpp" #include "jthread.hpp" #include "sockfile.hpp" #include "portlist.h" #include "jsocket.hpp" #include "jencrypt.hpp" #include "jset.hpp" #include "remoteerr.hpp" #define SOCKET_CACHE_MAX 500 #define MAX_THREADS 100 #define TARGET_MIN_THREADS 20 #define TARGET_ACTIVE_THREADS 80 #ifdef _DEBUG //#define SIMULATE_PACKETLOSS 1 #endif #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger) #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day #if SIMULATE_PACKETLOSS #define TESTING_FAILURE_RATE_LOST_SEND 10 // per 1000 #define TESTING_FAILURE_RATE_LOST_RECV 10 // per 1000 #define DUMMY_TIMEOUT_MAX (1000*10) static bool errorSimulationOn = true; static ISocket *timeoutreadsock = NULL; // used to trigger struct dummyReadWrite { class X { dummyReadWrite *parent; public: X(dummyReadWrite *_parent) { parent = _parent; } ~X() { delete parent; } }; class TimeoutSocketException: public CInterface, public IJSOCK_Exception { public: IMPLEMENT_IINTERFACE; TimeoutSocketException() { } virtual ~TimeoutSocketException() { } int errorCode() const { return JSOCKERR_timeout_expired; } StringBuffer & errorMessage(StringBuffer &str) const { return str.append("timeout expired"); } MessageAudience errorAudience() const { return MSGAUD_user; } }; ISocket *sock; dummyReadWrite(ISocket *_sock) { sock = _sock; } void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, time_t timeout) { X x(this); unsigned t = msTick(); unsigned r = getRandom(); bool timeoutread = (timeoutreadsock==sock); timeoutreadsock=NULL; if (!timeoutread) sock->readtms(buf, min_size, max_size, size_read, timeout); if (timeoutread||(errorSimulationOn&&(TESTING_FAILURE_RATE_LOST_RECV>0)&&(r%1000DUMMY_TIMEOUT_MAX) timeout = DUMMY_TIMEOUT_MAX; t = msTick()-t; if (t0)&&(r%1000write(buf,size); } }; #define SOCKWRITE(sock) (new dummyReadWrite(sock))->write #define SOCKREADTMS(sock) (new dummyReadWrite(sock))->readtms #else #define SOCKWRITE(sock) sock->write #define SOCKREADTMS(sock) sock->readtms #endif // backward compatible modes typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode; static const char *VERSTRING= "DS V1.7e - 6 " // dont forget FILESRV_VERSION in header #ifdef _WIN32 "Windows "; #else "Linux "; #endif typedef unsigned char RemoteFileCommandType; typedef int RemoteFileIOHandle; static unsigned maxConnectTime = 0; static unsigned maxReceiveTime = 0; void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime) { maxConnectTime = maxconnecttime; maxReceiveTime = maxreadtime; } struct sRFTM { CTimeMon *timemon; sRFTM() { timemon = maxReceiveTime?new CTimeMon(maxReceiveTime):NULL; } ~sRFTM() { delete timemon; } }; const char *remoteServerVersionString() { return VERSTRING; } static bool AuthenticationEnabled = true; bool enableDafsAuthentication(bool on) { bool ret = AuthenticationEnabled; AuthenticationEnabled = on; return ret; } #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command // (increased when waiting for large block) #define DAFS_CONNECT_FAIL_RETRY_TIME (1000*60*15) #ifdef SIMULATE_PACKETLOSS #define NORMAL_RETRIES (1) #define LENGTHY_RETRIES (1) #else #define NORMAL_RETRIES (3) #define LENGTHY_RETRIES (12) #endif #ifdef _DEBUG static byte traceFlags=0x30; #else static byte traceFlags=0x20; #endif #define TF_TRACE (traceFlags&1) #define TF_TRACE_PRE_IO (traceFlags&2) #define TF_TRACE_FULL (traceFlags&4) #define TF_TRACE_CLIENT_CONN (traceFlags&8) #define TF_TRACE_TREE_COPY (traceFlags&0x10) #define TF_TRACE_CLIENT_STATS (traceFlags&0x20) enum { RFCopenIO, // 0 RFCcloseIO, RFCread, RFCwrite, RFCsize, RFCexists, RFCremove, RFCrename, RFCgetver, RFCisfile, RFCisdirectory, // 10 RFCisreadonly, RFCsetreadonly, RFCgettime, RFCsettime, RFCcreatedir, RFCgetdir, RFCstop, RFCexec, RFCkill, RFCredeploy, // 20 RFCgetcrc, RFCmove, // 1.5 features below RFCsetsize, RFCextractblobelements, RFCcopy, RFCappend, RFCmonitordir, RFCsettrace, RFCgetinfo, RFCfirewall, // not used currently // 30 RFCunlock, RFCunlockreply, RFCinvalid, RFCcopysection, // 1.7e RFCtreecopy, // 1.7e - 1 RFCtreecopytmp, RFCmax, }; typedef enum { ACScontinue, ACSdone, ACSerror} AsyncCommandStatus; typedef byte OnceKey[16]; static void genOnce(OnceKey &key) { static __int64 inc=0; *(unsigned *)&key[0] = getRandom(); *(__int64 *)&key[4] = ++inc; *(unsigned *)&key[12] = getRandom(); } static void mergeOnce(OnceKey &key,size32_t sz,const void *data) { assertex(sz<=sizeof(OnceKey)); const byte *p = (const byte *)data; while (sz) key[--sz] ^= *(p++); } //--------------------------------------------------------------------------- class CThrottler { Semaphore &sem; bool got; public: CThrottler(Semaphore &_sem) : sem(_sem) { got = false; loop { if (sem.wait(5000)) { got = true; break; } unsigned cpu = getLatestCPUUsage(); PROGLOG("Throttler stalled (%d%% cpu)",cpu); if (getLatestCPUUsage()<75) break; } } ~CThrottler() { if (got) sem.signal(); } }; #define THROTTLE(throttlesem) CThrottler throttle(throttlesem); //--------------------------------------------------------------------------- class CDafsException: public CInterface, public IDAFS_Exception { StringAttr msg; int errcode; public: IMPLEMENT_IINTERFACE; CDafsException(int code,const char *_msg) : errcode(code), msg(_msg) { }; int errorCode() const { return errcode; } StringBuffer & errorMessage(StringBuffer &str) const { return str.append(msg); } MessageAudience errorAudience() const { return MSGAUD_user; } }; static IDAFS_Exception *createDafsException(int code,const char *msg) { return new CDafsException(code,msg); } void setDafsEndpointPort(SocketEndpoint &ep) { // odd kludge (don't do this at home) byte ipb[4]; if (ep.getNetAddress(sizeof(ipb),&ipb)==sizeof(ipb)) { if ((ipb[0]==255)&&(ipb[1]==255)) { ep.port = (((unsigned)ipb[2])<<8)+ipb[3]; ep.ipset(queryLocalIP()); } } if (ep.port==0) ep.port = DAFILESRV_PORT; } inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff) { buff.setEndian(__BIG_ENDIAN); // transfer as big endian... buff.append((unsigned)0); // reserve space for length prefix return buff; } inline void sendBuffer(ISocket * socket, MemoryBuffer & src) { unsigned length = src.length() - sizeof(unsigned); byte * buffer = (byte *)src.toByteArray(); if (TF_TRACE_FULL) PROGLOG("sendBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]); _WINCPYREV(buffer, &length, sizeof(unsigned)); SOCKWRITE(socket)(buffer, src.length()); } inline size32_t receiveBufferSize(ISocket * socket, unsigned numtries=NORMAL_RETRIES,CTimeMon *timemon=NULL) { unsigned timeout = SERVER_TIMEOUT; if (numtries==0) { numtries = 1; timeout = 10*1000; // 10s } while (numtries--) { try { if (timemon) { unsigned remaining; if (timemon->timedout(&remaining)||(remaining<10)) remaining = 10; if (remainingerrorCode()!=JSOCKERR_timeout_expired)||(timemon&&timemon->timedout())) { throw; } StringBuffer err; char peername[256]; socket->peer_name(peername,sizeof(peername)-1); WARNLOG("Remote connection %s: %s",peername,e->errorMessage(err).str()); // why no peername e->Release(); Sleep(500+getRandom()%1000); // ~1s } } return 0; } static void flush(ISocket *socket) { MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCgetver); sendbuf.append((unsigned)RFCgetver); MemoryBuffer reply; size32_t totread=0; try { sendBuffer(socket, sendbuf); char buf[1024]; loop { Sleep(1000); // breathe size32_t szread; SOCKREADTMS(socket)(buf, 1, sizeof(buf), szread, 1000*60); totread += szread; } } catch (IJSOCK_Exception *e) { if (totread) PROGLOG("%d bytes discarded",totread); if (e->errorCode()!=JSOCKERR_timeout_expired) EXCLOG(e,"flush"); e->Release(); } } inline void receiveBuffer(ISocket * socket, MemoryBuffer & tgt, unsigned numtries=1, size32_t maxsz=0x7fffffff) // maxsz is a guess at a resonable upper max to catch where protocol error { sRFTM tm; size32_t gotLength = receiveBufferSize(socket, numtries,tm.timemon); if (gotLength) { size32_t origlen = tgt.length(); try { if (gotLength>maxsz) { StringBuffer msg; msg.appendf("receiveBuffer maximum block size exceeded %d/%d",gotLength,maxsz); PrintStackReport(); throw createDafsException(DAFSERR_protocol_failure,msg.str()); } unsigned timeout = SERVER_TIMEOUT*(numtries?numtries:1); if (tm.timemon) { unsigned remaining; if (tm.timemon->timedout(&remaining)||(remaining<10)) remaining = 10; if (remainingerrorCode()!=JSOCKERR_timeout_expired) { EXCLOG(e,"receiveBuffer(1)"); PrintStackReport(); if (!tm.timemon||!tm.timemon->timedout()) flush(socket); } else { EXCLOG(e,"receiveBuffer"); PrintStackReport(); } tgt.setLength(origlen); throw; } catch (IException *e) { EXCLOG(e,"receiveBuffer(2)"); PrintStackReport(); if (!tm.timemon||!tm.timemon->timedout()) flush(socket); tgt.setLength(origlen); throw; } } tgt.setEndian(__BIG_ENDIAN); } struct CConnectionRec { SocketEndpoint ep; unsigned tick; IArrayOf socks; // relies on isShared }; //--------------------------------------------------------------------------- // Local mount redirect struct CLocalMountRec: public CInterface { IpAddress ip; StringAttr dir; // dir path on remote ip StringAttr local; // local dir path }; static CIArrayOf localMounts; static CriticalSection localMountCrit; void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir) { CriticalBlock block(localMountCrit); ForEachItemInRev(i,localMounts) { CLocalMountRec &mount = localMounts.item(i); if (dir==NULL) { // remove all matching mount if (!mountdir) return; if (strcmp(mount.local,mountdir)==0) localMounts.remove(i); } else if (mount.ip.ipequals(ip)&&(strcmp(mount.dir,dir)==0)) { if (mountdir) { mount.local.set(mountdir); return; } else localMounts.remove(i); } } if (dir&&mountdir) { CLocalMountRec &mount = *new CLocalMountRec; mount.ip.ipset(ip); mount.dir.set(dir); mount.local.set(mountdir); localMounts.append(mount); } } IFile *createFileLocalMount(const IpAddress &ip, const char * filename) { CriticalBlock block(localMountCrit); ForEachItemInRev(i,localMounts) { CLocalMountRec &mount = localMounts.item(i); if (mount.ip.ipequals(ip)) { size32_t bl = mount.dir.length(); if (isPathSepChar(mount.dir[bl-1])) bl--; if ((memcmp((void *)filename,(void *)mount.dir.get(),bl)==0)&&(isPathSepChar(filename[bl])||!filename[bl])) { // match StringBuffer locpath(mount.local); if (filename[bl]) addPathSepChar(locpath).append(filename+bl+1); locpath.replace((PATHSEPCHAR=='\\')?'/':'\\',PATHSEPCHAR); return createIFile(locpath.str()); } } } return NULL; } //--------------------------------------------------------------------------- static class CConnectionTable: public SuperHashTableOf { void onAdd(void *) {} void onRemove(void *e) { CConnectionRec *r=(CConnectionRec *)e; delete r; } unsigned getHashFromElement(const void *e) const { const CConnectionRec &elem=*(const CConnectionRec *)e; return elem.ep.hash(0); } unsigned getHashFromFindParam(const void *fp) const { return ((const SocketEndpoint *)fp)->hash(0); } const void * getFindParam(const void *p) const { const CConnectionRec &elem=*(const CConnectionRec *)p; return (void *)&elem.ep; } bool matchesFindParam(const void * et, const void *fp, unsigned) const { return ((CConnectionRec *)et)->ep.equals(*(SocketEndpoint *)fp); } IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CConnectionRec,SocketEndpoint); unsigned numsockets; public: static CriticalSection crit; CConnectionTable() { numsockets = 0; } ~CConnectionTable() { releaseAll(); } ISocket *lookup(const SocketEndpoint &ep) { // always called from crit block CConnectionRec *r = SuperHashTableOf::find(&ep); if (r) { ForEachItemIn(i,r->socks) { ISocket *s = &r->socks.item(i); if (!QUERYINTERFACE(s, CInterface)->IsShared()) { r->tick = msTick(); s->Link(); return s; } } } return NULL; } void addLink(SocketEndpoint &ep,ISocket *sock) { // always called from crit block while (numsockets>=SOCKET_CACHE_MAX) { // find oldest CConnectionRec *c = NULL; unsigned oldest = 0; CConnectionRec *old = NULL; unsigned oldi; unsigned now = msTick(); loop { c = (CConnectionRec *)SuperHashTableOf::next(c); if (!c) break; ForEachItemIn(i,c->socks) { ISocket *s = &c->socks.item(i); if (!QUERYINTERFACE(s, CInterface)->IsShared()) { // candidate to remove unsigned t = now-c->tick; if (t>oldest) { oldest = t; old = c; oldi = i; } } } } if (!old) return; old->socks.remove(oldi); numsockets--; } CConnectionRec *r = SuperHashTableOf::find(&ep); if (!r) { r = new CConnectionRec; r->ep = ep; SuperHashTableOf::add(*r); } sock->Link(); r->socks.append(*sock); numsockets++; r->tick = msTick(); } void remove(SocketEndpoint &ep,ISocket *sock) { // always called from crit block CConnectionRec *r = SuperHashTableOf::find(&ep); if (r) if (r->socks.zap(*sock)&&numsockets) numsockets--; } } *ConnectionTable = NULL; CriticalSection CConnectionTable::crit; void clientSetDaliServixSocketCaching(bool on) { CriticalBlock block(CConnectionTable::crit); if (on) { if (!ConnectionTable) ConnectionTable = new CConnectionTable; } else { delete ConnectionTable; ConnectionTable = NULL; } } //--------------------------------------------------------------------------- // TreeCopy #define TREECOPY_CACHE_SIZE 50 struct CTreeCopyItem: public CInterface { StringAttr net; StringAttr mask; offset_t sz; // original size CDateTime dt; // original date RemoteFilenameArray loc; // locations for file - 0 is original Owned busy; unsigned lastused; CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) : net(_net), mask(_mask) { loc.append(orig); dt.set(_dt); sz = _sz; busy.setown(createBitSet()); lastused = msTick(); } bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) { if (!orig.equals(loc.item(0))) return false; if (strcmp(_net,net)!=0) return false; if (strcmp(_mask,mask)!=0) return false; if (sz!=_sz) return false; return (dt.equals(_dt,false)); } }; static CIArrayOf treeCopyArray; static CriticalSection treeCopyCrit; static unsigned treeCopyWaiting=0; static Semaphore treeCopySem; #define DEBUGSAMEIP false static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp) { unsigned start = msTick(); Owned dstfile = createIFile(dstfn); // the following is really to check the dest node is up and working (otherwise not much point in continuing!) if (dstfile->exists()) PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename()); Owned srcfile = createIFile(srcfn); unsigned lastmin = 0; if (!srcfn.queryIP().ipequals(dstfn.queryIP())) { CriticalBlock block(treeCopyCrit); loop { CDateTime dt; offset_t sz; try { sz = srcfile->size(); if (sz==(offset_t)-1) { if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename()); break; } srcfile->getTime(NULL,&dt,NULL); } catch (IException *e) { EXCLOG(e,"treeCopyFile(1)"); e->Release(); break; } Linked tc; unsigned now = msTick(); ForEachItemInRev(i1,treeCopyArray) { CTreeCopyItem &item = treeCopyArray.item(i1); // prune old entries (not strictly needed buf I think better) if (now-item.lastused>TREECOPYPRUNETIME) treeCopyArray.remove(i1); else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) { tc.set(&item); item.lastused = now; } } if (!tc.get()) { if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE) treeCopyArray.remove(0); tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt)); treeCopyArray.append(*tc.getLink()); } ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?) if (!tc->busy->testSet(cand)) { // check file accessible and matches if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor continue; bool ok = true; Owned rmtfile = createIFile(tc->loc.item(cand)); if (cand) { // only need to check if remote try { if (rmtfile->size()!=sz) ok = false; else { CDateTime fdt; rmtfile->getTime(NULL,&fdt,NULL); ok = fdt.equals(dt); } } catch (IException *e) { EXCLOG(e,"treeCopyFile(2)"); e->Release(); ok = false; } } if (ok) { // if not ok leave 'busy' // finally lets try and copy! try { if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename()); { CriticalUnblock unblock(treeCopyCrit); // note we have tc linked rmtfile->copyTo(dstfile,0x100000,NULL,usetmp); } if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename()); tc->busy->set(cand,false); if (treeCopyWaiting) treeCopySem.signal((treeCopyWaiting>1)?2:1); // add to known locations tc->busy->set(tc->loc.ordinality(),false); // prob already is clear tc->loc.append(dstfn); ip.ipset(tc->loc.item(cand).queryIP()); return; } catch (IException *e) { if (cand==0) { tc->busy->set(0,false); // don't leave busy if (treeCopyWaiting) treeCopySem.signal(); throw; // what more can we do! } EXCLOG(e,"treeCopyFile(3)"); e->Release(); } } } } // all locations busy if (msTick()-start>TREECOPYTIMEOUT) { WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename()); break; } treeCopyWaiting++; // note this isn't precise - just indication { CriticalUnblock unblock(treeCopyCrit); treeCopySem.wait(TREECOPYPOLLTIME); } treeCopyWaiting--; if ((msTick()-start)/10*1000!=lastmin) { lastmin = (msTick()-start)/10*1000; PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename()); } } } else if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY source on same node as destination"); if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename()); try { GetHostIp(ip); srcfile->copyTo(dstfile,0x100000,NULL,usetmp); } catch (IException *e) { EXCLOG(e,"TREECOPY(done,fallback)"); throw; } if (TF_TRACE_TREE_COPY) PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename()); } //--------------------------------------------------------------------------- class CRemoteBase: public CInterface { Owned socket; static SocketEndpoint lastfailep; static unsigned lastfailtime; void connectSocket(SocketEndpoint &ep) { sRFTM tm; // called in CConnectionTable::crit unsigned retries = 3; if (ep.equals(lastfailep)) { if (msTick()-lastfailtimetimedout(&remaining); socket.setown(ISocket::connect_timeout(ep,remaining)); } else socket.setown(ISocket::connect(ep)); } catch (IJSOCK_Exception *e) { ok = false; if (!retries||(tm.timemon&&tm.timemon->timedout())) { if (e->errorCode()==JSOCKERR_connection_failed) { lastfailep.set(ep); lastfailtime = msTick(); e->Release(); StringBuffer msg("Failed to connect to dafilesrv/daliservix on "); ep.getUrlStr(msg); throw createDafsException(DAFSERR_connection_failed,msg.str()); } throw; } StringBuffer err; WARNLOG("Remote file connect %s",e->errorMessage(err).str()); e->Release(); } if (ok) { if (TF_TRACE_CLIENT_CONN) { PROGLOG("Connected to %s",eps.str()); } if (AuthenticationEnabled) { try { sendAuthentication(ep); // this will log error break; } catch (IJSOCK_Exception *e) { StringBuffer err; WARNLOG("Remote file authenticate %s for %s ",e->errorMessage(err).str(),ep.getUrlStr(eps.clear()).str()); e->Release(); if (!retries) break; } } else break; } unsigned sleeptime = getRandom()%3000+1000; if (tm.timemon) { unsigned remaining; tm.timemon->timedout(&remaining); if (remaining/2addLink(ep,socket); } void killSocket(SocketEndpoint &tep) { CriticalBlock block2(CConnectionTable::crit); // this is nested with crit if (socket) { try { Owned s = socket.getClear(); if (ConnectionTable) ConnectionTable->remove(tep,s); } catch (IJSOCK_Exception *e) { e->Release(); // ignore errors closing } Sleep(getRandom()%1000*5+500); // prevent multiple beating } } protected: friend class CRemoteFileIO; StringAttr filename; CriticalSection crit; SocketEndpoint ep; void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false) { CriticalBlock block(crit); // serialize commands on same file SocketEndpoint tep(ep); setDafsEndpointPort(tep); unsigned nretries = retry?3:0; Owned firstexc; // when retrying return first error if fails loop { try { if (socket) { sendBuffer(socket, src); receiveBuffer(socket, reply, lengthy?LENGTHY_RETRIES:NORMAL_RETRIES); break; } } catch (IJSOCK_Exception *e) { if (!nretries--) { if (firstexc) { e->Release(); e = firstexc.getClear(); } killSocket(tep); throw e; } StringBuffer str; e->errorMessage(str); WARNLOG("Remote File: %s, retrying (%d)",str.str(),nretries); if (firstexc) e->Release(); else firstexc.setown(e); killSocket(tep); } CriticalBlock block2(CConnectionTable::crit); // this is nested with crit if (ConnectionTable) { socket.setown(ConnectionTable->lookup(tep)); if (socket) { // validate existing socket by sending an 'exists' command with short time out // (use exists for backward compatibility) bool ok = false; try { MemoryBuffer sendbuf; initSendBuffer(sendbuf); MemoryBuffer replybuf; sendbuf.append((RemoteFileCommandType)RFCexists).append(filename); sendBuffer(socket, sendbuf); receiveBuffer(socket, replybuf, 0, 1024); ok = true; } catch (IException *e) { e->Release(); } if (!ok) killSocket(tep); } } if (!socket) { connectSocket(tep); } } unsigned errCode; reply.read(errCode); if (errCode) { StringBuffer msg; if (filename.get()) msg.append(filename); ep.getUrlStr(msg.append('[')).append("] "); size32_t pos = reply.getPos(); if (posgetPassword(serverip, username, password); if (!username.length()) username.append("sds_system"); // default account (note if exists should have restricted access!) if (!password.length()) password.append("sds_man"); if (replybuf.remaining()<=sizeof(size32_t)) throwUnauthenticated(serverip,username.str()); size32_t bs; replybuf.read(bs); if (replybuf.remaining()remove(tep,s); } ::Release(s); } const char *queryLocalName() { return filename; } }; SocketEndpoint CRemoteBase::lastfailep; unsigned CRemoteBase::lastfailtime; //--------------------------------------------------------------------------- class CRemoteDirectoryIterator : public CInterface, implements IDirectoryDifferenceIterator { Owned cur; bool curvalid; bool curisdir; StringAttr curname; CDateTime curdt; __int64 cursize; StringAttr dir; SocketEndpoint ep; byte *flags; unsigned numflags; unsigned curidx; unsigned mask; MemoryBuffer buf; public: static CriticalSection crit; CRemoteDirectoryIterator(SocketEndpoint &_ep,const char *_dir) : dir(_dir) { // an extended difference iterator starts with 2 (for bwd compatibility) ep = _ep; curisdir = false; cursize = 0; curidx = (unsigned)-1; mask = 0; } bool appendBuf(MemoryBuffer &_buf) { buf.setSwapEndian(_buf.needSwapEndian()); byte hdr; _buf.read(hdr); if (hdr==2) { _buf.read(numflags); flags = (byte *)malloc(numflags); _buf.read(numflags,flags); } else { buf.append(hdr); flags = NULL; numflags = 0; } size32_t rest = _buf.length()-_buf.getPos(); const byte *rb = (const byte *)_buf.readDirect(rest); bool ret = true; if (rest&&(rb[rest-1]!=0)) { rest--; ret = false; // continuation } buf.append(rest,rb); return ret; } ~CRemoteDirectoryIterator() { free(flags); } IMPLEMENT_IINTERFACE bool first() { curidx = (unsigned)-1; buf.reset(); return next(); } bool next() { loop { curidx++; cur.clear(); curdt.clear(); curname.clear(); cursize = 0; curisdir = false; if (buf.getPos()>=buf.length()) return false; byte b; buf.read(b); curvalid = b!=0; if (!curvalid) return false; buf.read(curisdir); buf.read(cursize); curdt.deserialize(buf); buf.read(curname); // kludge for bug in old linux jlibs if (strchr(curname,'\\')&&(getPathSepChar(dir)=='/')) { StringBuffer temp(curname); temp.replace('\\','/'); curname.set(temp.str()); } if ((mask==0)||(getFlags()&mask)) break; } return true; } bool isValid() { return curvalid; } IFile & query() { if (!cur) { StringBuffer full(dir); addPathSepChar(full).append(curname); if (ep.isNull()) cur.setown(createIFile(full.str())); else { RemoteFilename rfn; rfn.setPath(ep,full.str()); cur.setown(createIFile(rfn)); } } return *cur; } StringBuffer &getName(StringBuffer &buf) { return buf.append(curname); } bool isDir() { return curisdir; } __int64 getFileSize() { if (curisdir) return -1; return cursize; } bool getModifiedTime(CDateTime &ret) { ret = curdt; return true; } void setMask(unsigned _mask) { mask = _mask; } virtual unsigned getFlags() { if (flags&&(curidxisDir(); __int64 sz = isdir?0:iter->getFileSize(); CDateTime dt; iter->getModifiedTime(dt); iter->getName(tmp.clear()); mb.append(isdir).append(sz); dt.serialize(mb); mb.append(tmp.str()); if (bufsize&&(mb.length()>=bufsize-1)) { ret = false; break; } } b = 0; mb.append(b); return ret; } static void serializeDiff(MemoryBuffer &mb,IDirectoryDifferenceIterator *iter) { // bit slow MemoryBuffer flags; ForEach(*iter) flags.append((byte)iter->getFlags()); if (flags.length()) { byte b = 2; mb.append(b).append((unsigned)flags.length()).append(flags); } serialize(mb,iter,0); } void serialize(MemoryBuffer &mb,bool isdiff) { byte b; if (isdiff&&numflags&&flags) { b = 2; mb.append(b).append(numflags).append(numflags,flags); } serialize(mb,this,0); } }; CriticalSection CRemoteDirectoryIterator::crit; class CRemoteFile : public CRemoteBase, implements IFile { StringAttr remotefilename; unsigned flags; public: IMPLEMENT_IINTERFACE CRemoteFile(const SocketEndpoint &_ep, const char * _filename) : CRemoteBase(_ep, _filename) { flags = ((unsigned)IFSHread)|((S_IRUSR|S_IWUSR)<<16); } bool exists() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCexists).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); bool ok; replyBuffer.read(ok); return ok; } bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime) { CDateTime dummyTime; if (!createTime) createTime = &dummyTime; if (!modifiedTime) modifiedTime = &dummyTime; if (!accessedTime) accessedTime = &dummyTime; MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCgettime).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); bool ok; replyBuffer.read(ok); if (ok) { createTime->deserialize(replyBuffer); modifiedTime->deserialize(replyBuffer); accessedTime->deserialize(replyBuffer); } return ok; } bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCsettime).append(filename); if (createTime) { sendBuffer.append((bool)true); createTime->serialize(sendBuffer); } else sendBuffer.append((bool)false); if (modifiedTime) { sendBuffer.append((bool)true); modifiedTime->serialize(sendBuffer); } else sendBuffer.append((bool)false); if (accessedTime) { sendBuffer.append((bool)true); accessedTime->serialize(sendBuffer); } else sendBuffer.append((bool)false); sendRemoteCommand(sendBuffer, replyBuffer); bool ok; replyBuffer.read(ok); return ok; } fileBool isDirectory() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCisdirectory).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); unsigned ret; replyBuffer.read(ret); return (fileBool)ret; } fileBool isFile() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCisfile).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); unsigned ret; replyBuffer.read(ret); return (fileBool)ret; } fileBool isReadOnly() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCisreadonly).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); unsigned ret; replyBuffer.read(ret); return (fileBool)ret; } IFileIO * open(IFOmode mode); IFileIO * openShared(IFOmode mode,IFSHmode shmode); IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported const char * queryFilename() { if (remotefilename.isEmpty()) { RemoteFilename rfn; rfn.setPath(ep,filename); StringBuffer path; rfn.getRemotePath(path); remotefilename.set(path); } return remotefilename.get(); } void resetLocalFilename(const char *name) { remotefilename.clear(); filename.set(name); } bool remove() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCremove).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); bool ok; replyBuffer.read(ok); return ok; } void rename(const char *newname) { // currently ignores directory on newname (in future versions newname will be required to be tail only and not full path) StringBuffer path; splitDirTail(filename,path); StringBuffer newdir; path.append(splitDirTail(newname,newdir)); if (newdir.length()&&(strcmp(newdir.str(),path.str())!=0)) WARNLOG("CRemoteFile::rename passed full path '%s' that may not to match original directory '%s'",newname,path.str()); MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCrename).append(filename).append(path); sendRemoteCommand(sendBuffer, replyBuffer); filename.set(path); remotefilename.clear(); } void move(const char *newname) { // like rename except between directories // first create replote path if (!newname||!*newname) return; RemoteFilename destrfn; if (isPathSepChar(newname[0])&&isPathSepChar(newname[1])) { destrfn.setRemotePath(newname); if (!destrfn.queryEndpoint().ipequals(ep)) { StringBuffer msg; msg.appendf("IFile::move %s to %s, destination node must match source node", queryFilename(), newname); throw createDafsException(RFSERR_MoveFailed,msg.str()); } } else destrfn.setPath(ep,newname); StringBuffer dest; newname = destrfn.getLocalPath(dest).str(); MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; StringBuffer path; splitDirTail(filename,path); StringBuffer newdir; const char *newtail = splitDirTail(newname,newdir); if (strcmp(newdir.str(),path.str())==0) { path.append(newtail); newname = path; sendBuffer.append((RemoteFileCommandType)RFCrename); // use rename if we can (supported on older dafilesrv) } else sendBuffer.append((RemoteFileCommandType)RFCmove); sendBuffer.append(filename).append(newname); sendRemoteCommand(sendBuffer, replyBuffer); filename.set(newname); remotefilename.clear(); } void setReadOnly(bool set) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCsetreadonly).append(filename).append(set); sendRemoteCommand(sendBuffer, replyBuffer); } offset_t size() { #if 1 // faster method (consistant with IFile) // do this by using dir call (could be improved with new function but this not *too* bad) if (isSpecialPath(filename)) return 0; // queries deemed to always exist (though don't know size). // if needed to get size I guess could use IFileIO method and cache (bit of pain though) StringBuffer dir; const char *tail = splitDirTail(filename,dir); if (!dir.length()) return false; MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; bool includedirs = true; bool sub=false; sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub); sendRemoteCommand(sendBuffer, replyBuffer); // now should be 0 or 1 files returned CriticalBlock block(CRemoteDirectoryIterator::crit); Owned iter = new CRemoteDirectoryIterator(ep, dir.str()); iter->appendBuf(replyBuffer); if (!iter->first()) return (offset_t)-1; return (offset_t) iter->getFileSize(); #else IFileIO * io = open(IFOread); offset_t length = (offset_t)-1; if (io) { length = io->size(); io->Release(); } return length; #endif } bool createDirectory() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCcreatedir).append(filename); sendRemoteCommand(sendBuffer, replyBuffer); bool ok; replyBuffer.read(ok); return ok; } virtual IDirectoryIterator *directoryFiles(const char *mask,bool sub,bool includedirs) { if (mask&&!*mask) return createDirectoryIterator("",""); // NULL iterator CriticalBlock block(CRemoteDirectoryIterator::crit); CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename); byte stream=1; loop { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(filename).append(mask?mask:"").append(includedirs).append(sub).append(stream); sendRemoteCommand(sendBuffer, replyBuffer); if (ret->appendBuf(replyBuffer)) break; stream = 2; } return ret; } IDirectoryDifferenceIterator *monitorDirectory( IDirectoryIterator *prev=NULL, // in (NULL means use current as baseline) const char *mask=NULL, bool sub=false, bool includedirs=false, unsigned checkinterval=60*1000, unsigned timeout=(unsigned)-1, Semaphore *abortsem=NULL) // returns NULL if timed out { // abortsem not yet supported CriticalBlock block(CRemoteDirectoryIterator::crit); MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCmonitordir).append(filename).append(mask?mask:"").append(includedirs).append(sub); sendBuffer.append(checkinterval).append(timeout); __int64 cancelid=0; // not yet used sendBuffer.append(cancelid); byte isprev=(prev!=NULL)?1:0; sendBuffer.append(isprev); if (prev) CRemoteDirectoryIterator::serialize(sendBuffer,prev,0); sendRemoteCommand(sendBuffer, replyBuffer); byte status; replyBuffer.read(status); if (status==1) { CRemoteDirectoryIterator *iter = new CRemoteDirectoryIterator(ep, filename); iter->appendBuf(replyBuffer); return iter; } return NULL; } bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime) { // do this by using dir call (could be improved with new function but this not *too* bad) CriticalBlock block(CRemoteDirectoryIterator::crit); StringBuffer dir; const char *tail = splitDirTail(filename,dir); if (!dir.length()) return false; MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; bool includedirs = true; bool sub=false; sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub); sendRemoteCommand(sendBuffer, replyBuffer); // now should be 0 or 1 files returned Owned iter = new CRemoteDirectoryIterator(ep, dir.str()); iter->appendBuf(replyBuffer); if (!iter->first()) return false; isdir = iter->isDir(); size = (offset_t) iter->getFileSize(); iter->getModifiedTime(modtime); return true; } bool setCompression(bool set) { assertex(!"Need to implement compress()"); return false; } offset_t compressedSize() { assertex(!"Need to implement actualSize()"); return (offset_t)-1; } void serialize(MemoryBuffer &tgt) { throwUnexpected(); } void deserialize(MemoryBuffer &src) { throwUnexpected(); } unsigned getCRC() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCgetcrc).append(filename); sendRemoteCommand(sendBuffer, replyBuffer, true, true); unsigned crc; replyBuffer.read(crc); return crc; } void setCreateFlags(unsigned cflags) { flags |= (cflags<<16); } void setShareMode(IFSHmode shmode) { flags &= ~(IFSHfull|IFSHread); flags |= (unsigned)(shmode&(IFSHfull|IFSHread)); } void remoteExtractBlobElements(const char * prefix, ExtractedBlobArray & extracted) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); sendBuffer.append((RemoteFileCommandType)RFCextractblobelements).append(prefix).append(filename); MemoryBuffer replyBuffer; sendRemoteCommand(sendBuffer, replyBuffer, true, true); // handles error code unsigned n; replyBuffer.read(n); for (unsigned i=0;ideserialize(replyBuffer); extracted.append(*item); } } bool copySectionAsync(const char *uuid,const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout) { // now if we get here is it can be assumed the source file is local to where we send the command StringBuffer tos; dest.getRemotePath(tos); MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCcopysection).append(uuid).append(queryLocalName()).append(tos).append(toOfs).append(fromOfs).append(size).append(timeout); sendRemoteCommand(sendBuffer, replyBuffer); unsigned status; replyBuffer.read(status); if (progress) { offset_t sizeDone; offset_t totalSize; replyBuffer.read(sizeDone).read(totalSize); progress->onProgress(sizeDone,totalSize); } return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception } void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress) { StringBuffer uuid; genUUID(uuid,true); unsigned timeout = 60*1000; // check every minute while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout)); } void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp); virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write) { return NULL; } void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp) { resfrom.ipset(NULL); MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)(usetmp?RFCtreecopytmp:RFCtreecopy)); RemoteFilename rfn; rfn.setPath(ep,filename); rfn.serialize(sendBuffer); const char *d = dest->queryFilename(); if (!isAbsolutePath(d)) throw MakeStringException(-1,"treeCopyFile destination '%s' is not an absolute path", d); rfn.setRemotePath(d); rfn.serialize(sendBuffer); StringBuffer tmp; subnet.getNetText(tmp); sendBuffer.append(tmp); subnet.getMaskText(tmp.clear()); sendBuffer.append(tmp); unsigned status=1; try { sendRemoteCommand(sendBuffer, replyBuffer); replyBuffer.read(status); } catch (IDAFS_Exception *e) { if (e->errorCode()!=RFSERR_InvalidCommand) throw; e->Release(); status = (unsigned)-1; } if (status==-1) { resfrom.ipset(ep); StringBuffer tmp; WARNLOG("dafilesrv on %s does not support treeCopyTo - falling back to copyTo",resfrom.getIpText(tmp).str()); copyTo(dest,0x100000,NULL,usetmp); status = 0; } else if (status==0) resfrom.ipdeserialize(replyBuffer); } }; void clientCacheFileConnect(SocketEndpoint &_ep,unsigned timeout) { if (!timeout) { SocketEndpoint ep(_ep); setDafsEndpointPort(ep); Owned cfile = new CRemoteFile(ep, "null"); cfile->connect(); return; // frees file and adds its socket to cache } // timeout needed so start a thread (that may become orphaned) class cThread: public Thread { SocketEndpoint ep; public: cThread(SocketEndpoint &_ep) : Thread("cacheFileConnect") { ep = _ep; } int run() { try { clientCacheFileConnect(ep,0); } catch (IException *e) { CriticalBlock block(sect); except.setown(e); } return 0; } Owned except; CriticalSection sect; } *thread; thread = new cThread(_ep); thread->start(); IException *e =NULL; if (!thread->join(timeout)) { StringBuffer msg("Timed out connecting to "); _ep.getUrlStr(msg); e = createDafsException(RFSERR_AuthenticateFailed,msg.str()); } { CriticalBlock block(thread->sect); if (!e&&thread->except) e = thread->except.getClear(); } thread->Release(); if (e) throw e; } void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket) { CriticalBlock block(CConnectionTable::crit); if (ConnectionTable) ConnectionTable->addLink(ep,socket); } IFile * createRemoteFile(SocketEndpoint &ep, const char * filename) { IFile *ret = createFileLocalMount(ep,filename); if (ret) return ret; return new CRemoteFile(ep, filename); } void clientDisconnectRemoteFile(IFile *file) { CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile); if (cfile) cfile->disconnect(); } bool clientResetFilename(IFile *file, const char *newname) // returns false if not remote { CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile); if (!cfile) return false; cfile->resetLocalFilename(newname); return true; } extern bool clientAsyncCopyFileSection(const char *uuid, IFile *from, // expected to be remote RemoteFilename &to, offset_t toOfs, // -1 created file and copies to start offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout) // returns true when done { CRemoteFile *cfile = QUERYINTERFACE(from,CRemoteFile); if (!cfile) { // local - do sync from->copySection(to,toOfs,fromOfs,size,progress); return true; } return cfile->copySectionAsync(uuid,to,toOfs,fromOfs, size, progress, timeout); } //--------------------------------------------------------------------------- class CRemoteFileIO : public CInterface, implements IFileIO { protected: Linked parent; RemoteFileIOHandle handle; IFOmode mode; compatIFSHmode compatmode; bool disconnectonexit; public: IMPLEMENT_IINTERFACE CRemoteFileIO(CRemoteFile *_parent) : parent(_parent) { handle = 0; disconnectonexit = false; } ~CRemoteFileIO() { if (handle) { try { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle); parent->sendRemoteCommand(sendBuffer,false); } catch (IDAFS_Exception *e) { if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle)) { // ignore already disconnected StringBuffer s; e->errorMessage(s); WARNLOG("CRemoteFileIO close file: %s",s.str()); } e->Release(); } catch (IException *e) { StringBuffer s; e->errorMessage(s); WARNLOG("CRemoteFileIO close file: %s",s.str()); e->Release(); } } if (disconnectonexit) parent->disconnect(); handle = 0; } bool open(IFOmode _mode,compatIFSHmode _compatmode) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; const char *localname = parent->queryLocalName(); localname = skipSpecialPath(localname); sendBuffer.append((RemoteFileCommandType)RFCopenIO).append(localname).append((byte)_mode).append((byte)_compatmode); parent->sendRemoteCommand(sendBuffer, replyBuffer); replyBuffer.read(handle); if (!handle) return false; switch (_mode) { case IFOcreate: mode = IFOwrite; break; case IFOcreaterw: mode = IFOreadwrite; break; default: mode = _mode; } compatmode = _compatmode; return true; } bool reopen() { StringBuffer s; PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str()); if (open(mode,compatmode)) { return true; } return false; } offset_t size() { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCsize).append(handle); parent->sendRemoteCommand(sendBuffer, replyBuffer, false); // Retry using reopen TBD offset_t ret; replyBuffer.read(ret); return ret; } size32_t read(offset_t pos, size32_t len, void * data) { size32_t got; MemoryBuffer replyBuffer; const void *b = doRead(pos,len,replyBuffer,got,data); if (b!=data) memcpy(data,b,got); return got; } const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf) { unsigned tries=0; loop { try { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); replyBuffer.clear(); sendBuffer.append((RemoteFileCommandType)RFCread).append(handle).append(pos).append(len); parent->sendRemoteCommand(sendBuffer, replyBuffer,false); // kludge dafilesrv versions <= 1.5e don't return error correctly if (replyBuffer.length()>len+sizeof(size32_t)+sizeof(unsigned)) { size32_t save = replyBuffer.getPos(); replyBuffer.reset(len+sizeof(size32_t)+sizeof(unsigned)); unsigned errCode; replyBuffer.read(errCode); if (errCode) { StringBuffer msg; parent->ep.getUrlStr(msg.append('[')).append("] "); if (replyBuffer.getPos()replyBuffer.remaining())||(got>len)) { PROGLOG("Read beyond buffer %d,%d,%d",got,replyBuffer.remaining(),len); throw createDafsException(RFSERR_ReadFailed, "Read beyond buffer"); } return replyBuffer.readDirect(got); } catch (IJSOCK_Exception *e) { EXCLOG(e,"CRemoteFileIO::read"); if (++tries>3) throw; WARNLOG("Retrying read of %s (%d)",parent->queryLocalName(),tries); Owned exc = e; if (!reopen()) throw exc.getClear(); } } got = 0; return NULL; } size32_t write(offset_t pos, size32_t len, const void * data) { unsigned tries=0; size32_t ret = 0; loop { try { MemoryBuffer replyBuffer; MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); sendBuffer.append((RemoteFileCommandType)RFCwrite).append(handle).append(pos).append(len).append(len, data); parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); replyBuffer.read(ret); break; } catch (IJSOCK_Exception *e) { EXCLOG(e,"CRemoteFileIO::write"); if (++tries>3) throw; WARNLOG("Retrying write(%"I64F"d,%d) of %s (%d)",pos,len,parent->queryLocalName(),tries); Owned exc = e; if (!reopen()) throw exc.getClear(); } } if ((ret==(size32_t)-1) || (ret < len)) throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?"); return ret; } offset_t appendFile(IFile *file,offset_t pos,offset_t len) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; const char * fname = file->queryFilename(); sendBuffer.append((RemoteFileCommandType)RFCappend).append(handle).append(fname).append(pos).append(len); parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry not safe offset_t ret; replyBuffer.read(ret); if ((ret==(offset_t)-1) || (ret < len)) throw createDafsException(DISK_FULL_EXCEPTION_CODE,"append failed, disk full?"); // though could be file missing TBD return ret; } void setSize(offset_t size) { MemoryBuffer sendBuffer; initSendBuffer(sendBuffer); MemoryBuffer replyBuffer; sendBuffer.append((RemoteFileCommandType)RFCsetsize).append(handle).append(size); parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry using reopen TBD } void setDisconnectOnExit(bool set) { disconnectonexit = set; } }; void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set) { CRemoteFileIO *cfileio = QUERYINTERFACE(fileio,CRemoteFileIO); if (cfileio) cfileio->setDisconnectOnExit(set); } IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode) { assertex(((unsigned)shmode&0xffffffc7)==0); compatIFSHmode compatmode; unsigned fileflags = (flags>>16) & (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH); if (fileflags&S_IXUSR) // this is bit hit and miss but backward compatible compatmode = compatIFSHexec; else if (fileflags&(S_IWGRP|S_IWOTH)) compatmode = compatIFSHall; else if (shmode&IFSHfull) compatmode = compatIFSHwrite; else if (((shmode&(IFSHread|IFSHfull))==0) && ((fileflags&(S_IRGRP|S_IROTH))==0)) compatmode = compatIFSHnone; else compatmode = compatIFSHread; Owned res = new CRemoteFileIO(this); if (res->open(mode,compatmode)) return res.getClear(); return NULL; } IFileIO * CRemoteFile::open(IFOmode mode) { return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull))); } //--------------------------------------------------------------------------- void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp) { CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile); if (dstfile&&!dstfile->queryEp().isLocal()) { StringBuffer tmpname; Owned destf; RemoteFilename dest; if (usetmp) { makeTempCopyName(tmpname,dstfile->queryLocalName()); dest.setPath(dstfile->queryEp(),tmpname.str()); } else dest.setPath(dstfile->queryEp(),dstfile->queryLocalName()); destf.setown(createIFile(dest)); try { // following may fail if new dafilesrv not deployed on src copySection(dest,(offset_t)-1,0,(offset_t)-1,progress); if (usetmp) { StringAttr tail(pathTail(dstfile->queryLocalName())); dstfile->remove(); destf->rename(tail); } return; } catch (IException *e) { StringBuffer s; s.appendf("Remote File Copy (%d): ",e->errorCode()); e->errorMessage(s); s.append(", retrying local"); WARNLOG("%s",s.str()); e->Release(); } // delete dest try { destf->remove(); } catch (IException *e) { EXCLOG(e,"Remote File Copy, Deleting temporary file"); e->Release(); } } // assumption if we get here that source remote, dest local (or equiv) class cIntercept: implements ICopyFileIntercept { MemoryAttr ma; MemoryBuffer mb; virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz) { if (ma.length()doRead(ofs,sz,mb.clear(),got,buf); else { // shouldn't ever get here if source remote got = from->read(ofs, sz, buf); dst = buf; } if (got != 0) to->write(ofs, got, dst); return got; } } intercept; doCopyFile(dest,this,buffersize,progress,&intercept,usetmp); } unsigned getRemoteVersion(ISocket * socket, StringBuffer &ver) { static CriticalSection sect; CriticalBlock block(sect); if (!socket) return 0; unsigned ret; MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCgetver); sendbuf.append((unsigned)RFCgetver); MemoryBuffer reply; try { sendBuffer(socket, sendbuf); receiveBuffer(socket, reply, 1 ,4096); unsigned errCode; reply.read(errCode); if (errCode==RFSERR_InvalidCommand) { ver.append("DS V1.0"); return 10; } else if (errCode==0) ret = 11; else if (errCode<0x10000) return 0; else ret = errCode-0x10000; } catch (IException *e) { EXCLOG(e); ::Release(e); return 0; } StringAttr vers; reply.read(vers); ver.append(vers); return ret; } extern unsigned stopRemoteServer(ISocket * socket) { static CriticalSection sect; CriticalBlock block(sect); if (!socket) return 0; MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCstop); sendbuf.append((unsigned)RFCstop); MemoryBuffer replybuf; unsigned errCode = RFSERR_InvalidCommand; try { sendBuffer(socket, sendbuf); receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024); replybuf.read(errCode); } catch (IJSOCK_Exception *e) { if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) EXCLOG(e); else errCode = 0; } catch (IException *e) { EXCLOG(e); ::Release(e); } return errCode; } int remoteExec(ISocket * socket, const char *cmdline, const char *workdir,bool sync, size32_t insize, void *inbuf, MemoryBuffer *outbuf) { if (!socket) return -1; bool hasoutput = (outbuf!=NULL); if (!inbuf) insize = 0; MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCexec).append(cmdline).append(workdir).append(sync). append(hasoutput).append(insize); if (insize) sendbuf.append(insize, inbuf); MemoryBuffer replybuf; try { sendBuffer(socket, sendbuf); receiveBuffer(socket, replybuf, LENGTHY_RETRIES); // we don't know how long program will take really - assume <1hr int retcode; unsigned phandle; size32_t outsz; replybuf.read(retcode).read(phandle).read(outsz); if (outsz&&outbuf) replybuf.read(outsz,outbuf->reserve(outsz)); return retcode; } catch (IException *e) { EXCLOG(e); ::Release(e); } return -1; } int setDafsTrace(ISocket * socket,byte flags) { if (!socket) { byte ret = traceFlags; traceFlags = flags; return ret; } MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCsettrace).append(flags); MemoryBuffer replybuf; try { sendBuffer(socket, sendbuf); receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024); int retcode; replybuf.read(retcode); return retcode; } catch (IException *e) { EXCLOG(e); ::Release(e); } return -1; } int getDafsInfo(ISocket * socket,StringBuffer &retstr) { if (!socket) { retstr.append(VERSTRING); return 0; } MemoryBuffer sendbuf; initSendBuffer(sendbuf); sendbuf.append((RemoteFileCommandType)RFCgetinfo); MemoryBuffer replybuf; try { sendBuffer(socket, sendbuf); receiveBuffer(socket, replybuf, 1); int retcode; replybuf.read(retcode); if (retcode==0) { StringAttr s; replybuf.read(s); retstr.append(s); } return retcode; } catch (IException *e) { EXCLOG(e); ::Release(e); } return -1; } void remoteExtractBlobElements(const SocketEndpoint &ep,const char * prefix, const char * filename, ExtractedBlobArray & extracted) { Owned file = new CRemoteFile (ep,filename); file->remoteExtractBlobElements(prefix, extracted); } //==================================================================================================== class CAsyncCommandManager { class CAsyncJob: public CInterface { class cThread: public Thread { CAsyncJob *parent; public: cThread(CAsyncJob *_parent) : Thread("CAsyncJob") { parent = _parent; } int run() { int ret; try { ret = parent->run(); parent->setDone(); } catch (IException *e) { parent->setException(e); } parent->threadsem.signal(); return ret; } } *thread; StringAttr uuid; public: Semaphore &threadsem; CAsyncJob(const char *_uuid,Semaphore &_threadsem) : uuid(_uuid),threadsem(_threadsem) { thread = new cThread(this); hash = hashc((const byte *)uuid.get(),uuid.length(),~0U); } ~CAsyncJob() { thread->join(); thread->Release(); } static void destroy(CAsyncJob *j) { j->Release(); } void start() { threadsem.wait(); thread->start(); } void join() { thread->join(); } static unsigned getHash(const char *key) { return hashc((const byte *)key,strlen(key),~0U); } static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; } unsigned hash; bool eq(const char *key) { return stricmp(key,uuid.get())==0; } virtual int run()=0; virtual void setException(IException *e)=0; virtual void setDone()=0; }; class CAsyncCopySection: public CAsyncJob { Owned src; RemoteFilename dst; offset_t toOfs; offset_t fromOfs; offset_t size; CFPmode mode; // not yet supported CriticalSection sect; offset_t done; offset_t total; Semaphore finished; AsyncCommandStatus status; Owned exc; public: CAsyncCopySection(const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size, Semaphore &threadsem) : CAsyncJob(_uuid,threadsem) { status = ACScontinue; src.setown(createIFile(fromFile)); dst.setRemotePath(toFile); toOfs = _toOfs; fromOfs = _fromOfs; size = _size; mode = CFPcontinue; done = 0; total = (offset_t)-1; } AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout) { if (timeout&&finished.wait(timeout)) finished.signal(); // may need to call again CriticalBlock block(sect); if (exc) throw exc.getClear(); _done = done; _total = total; return status; } int run() { class cProgress: implements ICopyFileProgress { CriticalSection § CFPmode &mode; offset_t &done; offset_t &total; public: cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode) : sect(_sect), done(_done), total(_total), mode(_mode) { } CFPmode onProgress(offset_t sizeDone, offset_t totalSize) { CriticalBlock block(sect); done = sizeDone; total = totalSize; return mode; } } progress(sect,total,done,mode); src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class return 0; } void setException(IException *e) { EXCLOG(e,"CAsyncCommandManager::CAsyncJob"); CriticalBlock block(sect); if (exc.get()) e->Release(); else exc.setown(e); status = ACSerror; } void setDone() { CriticalBlock block(sect); finished.signal(); status = ACSdone; } }; CMinHashTable jobtable; CriticalSection sect; Semaphore threadsem; public: CAsyncCommandManager() { threadsem.signal(10); // max number of async jobs } void join() { CriticalBlock block(sect); unsigned i; CAsyncJob *j=jobtable.first(i); while (j) { j->join(); j=jobtable.next(i); } } AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout) { // return 0 if continuing, 1 if done CAsyncCopySection * job; Linked cjob; { CriticalBlock block(sect); cjob.set(jobtable.find(uuid,false)); if (cjob) { job = QUERYINTERFACE(cjob.get(),CAsyncCopySection); if (!job) { throw MakeStringException(-1,"Async job ID mismatch"); } } else { job = new CAsyncCopySection(uuid, fromFile, toFile, toOfs, fromOfs, size, threadsem); cjob.setown(job); jobtable.add(cjob.getLink()); cjob->start(); } } AsyncCommandStatus ret; Owned rete; try { ret = job->poll(done,total,timeout); } catch (IException * e) { rete.setown(e); } if ((ret!=ACScontinue)||rete.get()) { job->join(); CriticalBlock block(sect); jobtable.remove(job); if (rete.get()) throw rete.getClear(); } return ret; } }; //==================================================================================================== #define throwErr3(e,v,s) { StringBuffer msg; \ msg.appendf("ERROR: %s(%d) '%s'",#e,v,s?s:""); \ reply.append(e); reply.append(msg.str()); } #define throwErr(e) { reply.append(e).append(#e); } #define throwErr2(e,v) { StringBuffer tmp; tmp.append(#e).append(':').append(v); reply.append(e).append(tmp.str()); } #define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; } #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; } static unsigned ClientCount = 0; static unsigned MaxClientCount = 0; static CriticalSection ClientCountSect; class CRemoteFileServer : public CInterface, implements IRemoteFileServer, implements IThreadFactory { int lasthandle; CriticalSection sect; Owned acceptsock; Owned selecthandler; Owned threads; // for commands bool stopping; unsigned clientcounttick; unsigned closedclients; CAsyncCommandManager asyncCommandManager; Semaphore throttlesem; atomic_t globallasttick; int getNextHandle() { // called in sect critical block loop { if (lasthandle==INT_MAX) lasthandle = 1; else lasthandle++; unsigned idx1; unsigned idx2; if (!findHandle(lasthandle,idx1,idx2)) return lasthandle; } } bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx) { // called in sect critical block clientidx = (unsigned)-1; handleidx = (unsigned)-1; ForEachItemIn(i,clients) { CRemoteClientHandler &client = clients.item(i); ForEachItemIn(j,client.handles) { if (client.handles.item(j)==handle) { handleidx = j; clientidx = i; return true; } } } return false; } struct CRemoteClientHandler: public CInterface, implements ISocketSelectNotify { CRemoteFileServer *parent; Owned socket; Owned user; MemoryBuffer buf; bool selecthandled; size32_t left; IArrayOf openfiles; // kept in sync with handles Owned opendir; StringAttrArray opennames; // for debug IntArray handles; unsigned lasttick; atomic_t &globallasttick; unsigned previdx; // for debug IMPLEMENT_IINTERFACE; CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,IAuthenticatedUser *_user,atomic_t &_globallasttick) : socket(_socket), user(_user), globallasttick(_globallasttick) { previdx = (unsigned)-1; CriticalBlock block(ClientCountSect); if (++ClientCount>MaxClientCount) MaxClientCount = ClientCount; if (TF_TRACE_CLIENT_CONN) { StringBuffer s; s.appendf("Connecting(%x) [%d,%d] to ",(unsigned)(long)this,ClientCount,MaxClientCount); peerName(s); PROGLOG("%s",s.str()); } parent = _parent; left = 0; buf.setEndian(__BIG_ENDIAN); selecthandled = false; touch(); } ~CRemoteClientHandler() { { CriticalBlock block(ClientCountSect); ClientCount--; if (TF_TRACE_CLIENT_CONN) { PROGLOG("Disconnecting(%x) [%d,%d] ",(unsigned)(long)this,ClientCount,MaxClientCount); } } ISocket *sock = socket.getClear(); try { sock->Release(); } catch (IException *e) { EXCLOG(e,"~CRemoteClientHandler"); e->Release(); } } bool notifySelected(ISocket *sock,unsigned selected) { if (TF_TRACE_FULL) PROGLOG("notifySelected(%x)",(unsigned)(long)this); if (sock!=socket) WARNLOG("notifySelected - invalid socket passed"); size32_t avail = (size32_t)socket->avail_read(); if (avail) touch(); if (left==0) { try { left = avail?receiveBufferSize(socket):0; } catch (IException *e) { EXCLOG(e,"notifySelected(1)"); e->Release(); left = 0; } if (left) { avail = (size32_t)socket->avail_read(); try { buf.ensureCapacity(left); } catch (IException *e) { EXCLOG(e,"notifySelected(2)"); e->Release(); left = 0; // if too big then corrupted packet so read avail to try and consume char fbuf[1024]; while (avail) { size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail; try { socket->read(fbuf, rd); // don't need timeout here avail -= rd; } catch (IException *e) { EXCLOG(e,"notifySelected(2) flush"); e->Release(); break; } } avail = 0; left = 0; } } } size32_t toread = left>avail?avail:left; if (toread) { try { socket->read(buf.reserve(toread), toread); // don't need timeout here } catch (IException *e) { EXCLOG(e,"notifySelected(3)"); e->Release(); toread = left; buf.clear(); } } if (TF_TRACE_FULL) PROGLOG("notifySelected %d,%d",toread,left); if ((left!=0)&&(avail==0)) { WARNLOG("notifySelected: Closing mid packet, %d remaining", left); toread = left; buf.clear(); } left -= toread; if (left==0) { // DEBUG parent->notify(this); } return false; } void logPrevHandle() { if (previdxthrottleSem()); MemoryBuffer reply; parent->dispatchCommand(buf, initSendBuffer(reply),this); buf.clear(); sendBuffer(socket, reply); } bool immediateCommand() // returns false if socket closed or failure { try { buf.clear(); touch(); size32_t avail = (size32_t)socket->avail_read(); if (avail==0) return false; receiveBuffer(socket,buf, 5); // shouldn't timeout as data is available touch(); if (buf.length()==0) return false; processCommand(); } catch (IException *e) { EXCLOG(e,"CRemoteClientHandler::immediateCommand"); e->Release(); buf.clear(); return false; } return true; } void process() { if (selecthandled) processCommand(); // buffer already filled else { while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) { // if too many threads add to select handler int w = socket->wait_read(1000); if (w==0) break; if ((w<0)||!immediateCommand()) { if (w<0) WARNLOG("CRemoteClientHandler::main wait_read error"); parent->onCloseSocket(this,1); return; } } selecthandled = true; parent->addClient(this); // add to select handler } } bool timedOut() { return (msTick()-lasttick)>CLIENT_TIMEOUT; } void touch() { lasttick = msTick(); atomic_set(&globallasttick,lasttick); } bool peerName(StringBuffer &buf) { if (socket) { char name[256]; name[0] = 0; int port = socket->peer_name(name,sizeof(name)-1); if (port>=0) { buf.append(name); if (port) buf.append(':').append(port); return true; } } return false; } bool getInfo(StringBuffer &str) { str.append("client("); bool ok = peerName(str); unsigned ms = msTick(); str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms); ForEachItemIn(i,handles) { str.appendf("\n %d: ",handles.item(i)); str.append(opennames.item(i).text); } return ok; } }; class cCommandProcessor: public CInterface, implements IPooledThread { Owned client; public: IMPLEMENT_IINTERFACE; struct cCommandProcessorParams { CRemoteClientHandler *client; }; void init(void *_params) { cCommandProcessorParams ¶ms = *(cCommandProcessorParams *)_params; client.setown(params.client); } void main() { // idea is that initially we process commands inline then pass over to select handler try { client->process(); } catch (IException *e) { // suppress some errors EXCLOG(e,"cCommandProcessor::main"); e->Release(); } try { client.clear(); } catch (IException *e) { // suppress some more errors clearing client EXCLOG(e,"cCommandProcessor::main(2)"); } } bool stop() { return true; } bool canReuse() { return false; // want to free owned osocke } }; IArrayOf clients; class cImpersonateBlock { CRemoteClientHandler &client; public: cImpersonateBlock(CRemoteClientHandler &_client) : client(_client) { if (client.user.get()) { if (TF_TRACE) PROGLOG("Impersonate user: %s",client.user->username()); client.user->impersonate(); } } ~cImpersonateBlock() { if (client.user.get()) { if (TF_TRACE) PROGLOG("Stop impersonating user: %s",client.user->username()); client.user->revert(); } } }; #define IMPERSONATE_USER(client) cImpersonateBlock ublock(client) public: IMPLEMENT_IINTERFACE CRemoteFileServer() { throttlesem.signal(10); lasthandle = 0; selecthandler.setown(createSocketSelectHandler(NULL)); threads.setown(createThreadPool("CRemoteFileServerPool",this,NULL,MAX_THREADS,60*1000, #ifdef __64BIT__ 0, #else 0x10000, #endif INFINITE,TARGET_MIN_THREADS)); stopping = false; clientcounttick = msTick(); closedclients = 0; atomic_set(&globallasttick,msTick()); } ~CRemoteFileServer() { #ifdef _DEBUG PROGLOG("Exiting CRemoteFileServer"); #endif asyncCommandManager.join(); clients.kill(); #ifdef _DEBUG PROGLOG("Exited CRemoteFileServer"); #endif } //MORE: The file handles should timeout after a while, and accessing an old (invalid handle) // should throw a different exception bool checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false) { CriticalBlock block(sect); fileio = NULL; if (handle<=0) { throwErr(RFSERR_NullFileIOHandle); return false; } unsigned clientidx; unsigned handleidx; if (findHandle(handle,clientidx,handleidx)) { CRemoteClientHandler &client = clients.item(clientidx); if (del) { client.handles.remove(handleidx); client.openfiles.remove(handleidx); client.opennames.remove(handleidx); client.previdx = (unsigned)-1; } else { fileio = &client.openfiles.item(handleidx); client.previdx = handleidx; } return true; } throwErr(RFSERR_InvalidFileIOHandle); return false; } void onCloseSocket(CRemoteClientHandler *client, int which) { if (!client) return; CriticalBlock block(sect); #ifdef _DEBUG StringBuffer s; client->peerName(s); PROGLOG("onCloseSocket(%d) %s",which,s.str()); #endif if (client->socket) { try { selecthandler->remove(client->socket); } catch (IException *e) { EXCLOG(e,"CRemoteFileServer::onCloseSocket.1"); e->Release(); } } try { clients.zap(*client); } catch (IException *e) { EXCLOG(e,"CRemoteFileServer::onCloseSocket.2"); e->Release(); } } bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); Owned name = new StringAttrItem; byte mode; byte share; msg.read(name->text).read(mode).read(share); try { Owned file = createIFile(name->text); unsigned smode = 0; switch ((compatIFSHmode)share) { case compatIFSHnone: file->setCreateFlags(S_IRUSR|S_IWUSR); file->setShareMode(IFSHnone); break; case compatIFSHread: file->setShareMode(IFSHread); break; case compatIFSHwrite: file->setShareMode(IFSHfull); break; case compatIFSHexec: file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH); break; case compatIFSHall: file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive file->setShareMode(IFSHfull); break; } if (TF_TRACE_PRE_IO) PROGLOG("before open file '%s', (%d,%d)",name->text.get(),(int)mode,(int)share); IFileIO *fileio = file->open((IFOmode)mode); int handle; if (fileio) { CriticalBlock block(sect); handle = getNextHandle(); client.previdx = client.opennames.ordinality(); client.handles.append(handle); client.openfiles.append(*fileio); client.opennames.append(*name.getLink()); } else handle = 0; reply.append(RFEnoerror); reply.append(handle); if (TF_TRACE) PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_OpenFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply) { int handle; msg.read(handle); IFileIO *fileio; if (!checkFileIOHandle(reply, handle, fileio, true)) return false; if (TF_TRACE) PROGLOG("close file, handle = %d",handle); reply.append(RFEnoerror); return true; } bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply) { int handle; __int64 pos; size32_t len; msg.read(handle).read(pos).read(len); IFileIO *fileio; if (!checkFileIOHandle(reply, handle, fileio)) return false; //arrange it so we read directly into the reply buffer... unsigned posOfErr = reply.length(); reply.append((unsigned)RFEnoerror); size32_t numRead; unsigned posOfLength = reply.length(); if (TF_TRACE_PRE_IO) PROGLOG("before read file, handle = %d, toread = %d",handle,len); void * data; { reply.reserve(sizeof(numRead)); data = reply.reserve(len); } try { numRead = fileio->read(pos,len,data); } catch (IException *e) { reply.setWritePos(posOfErr); StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_ReadFailed,e->errorCode(),s.str()); e->Release(); return false; } if (TF_TRACE) PROGLOG("read file, handle = %d, pos = %"I64F"d, toread = %d, read = %d",handle,pos,len,numRead); { reply.setLength(posOfLength + sizeof(numRead) + numRead); reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead); } return true; } bool cmdSize(MemoryBuffer & msg, MemoryBuffer & reply) { int handle; msg.read(handle); IFileIO *fileio; if (!checkFileIOHandle(reply, handle, fileio)) return false; __int64 size = fileio->size(); reply.append((unsigned)RFEnoerror).append(size); if (TF_TRACE) PROGLOG("size file, handle = %d, size = %"I64F"d",handle,size); return true; } bool cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply) { int handle; offset_t size; msg.read(handle).read(size); IFileIO *fileio; if (TF_TRACE) PROGLOG("set size file, handle = %d, size = %"I64F"d",handle,size); if (!checkFileIOHandle(reply, handle, fileio)) return false; fileio->setSize(size); reply.append((unsigned)RFEnoerror); return true; } bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply) { int handle; __int64 pos; size32_t len; msg.read(handle).read(pos).read(len); IFileIO *fileio; if (!checkFileIOHandle(reply, handle, fileio)) return false; const byte *data = (const byte *)msg.readDirect(len); try { if (TF_TRACE_PRE_IO) PROGLOG("before write file, handle = %d, towrite = %d",handle,len); size32_t numWritten = fileio->write(pos,len,data); if (TF_TRACE) PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten); reply.append((unsigned)RFEnoerror).append(numWritten); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_WriteFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("exists, '%s'",name.get()); Owned file=createIFile(name); try { bool e = file->exists(); reply.append((unsigned)RFEnoerror).append(e); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_ExistsFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("remove, '%s'",name.get()); Owned file=createIFile(name); try { bool e = file->remove(); reply.append((unsigned)RFEnoerror).append(e); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_RemoveFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply) { if (TF_TRACE) PROGLOG("getVer"); if (msg.getPos()+sizeof(unsigned)>msg.length()) reply.append((unsigned)RFEnoerror); else reply.append((unsigned)FILESRV_VERSION+0x10000); reply.append(VERSTRING); return true; } bool cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr fromname; msg.read(fromname); StringAttr toname; msg.read(toname); if (TF_TRACE) PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get()); Owned file=createIFile(fromname); try { file->rename(toname); reply.append((unsigned)RFEnoerror); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_RenameFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr fromname; msg.read(fromname); StringAttr toname; msg.read(toname); if (TF_TRACE) PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get()); Owned file=createIFile(fromname); try { file->move(toname); reply.append((unsigned)RFEnoerror); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_MoveFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr fromname; msg.read(fromname); StringAttr toname; msg.read(toname); if (TF_TRACE) PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get()); try { copyFile(toname, fromname); reply.append((unsigned)RFEnoerror); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_CopyFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); int handle; __int64 pos; __int64 len; StringAttr srcname; msg.read(handle).read(srcname).read(pos).read(len); IFileIO *fileio; if (!checkFileIOHandle(reply, handle, fileio)) return false; try { Owned file = createIFile(srcname.get()); __int64 written = fileio->appendFile(file,pos,len); if (TF_TRACE) PROGLOG("append file, handle = %d, file=%s, pos = %"I64F"d len = %"I64F"d written = %"I64F"d",handle,srcname.get(),pos,len,written); reply.append((unsigned)RFEnoerror).append(written); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_AppendFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("isFile, '%s'",name.get()); Owned file=createIFile(name); try { unsigned ret = (unsigned)file->isFile(); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_IsFileFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("isDir, '%s'",name.get()); Owned file=createIFile(name); try { unsigned ret = (unsigned)file->isDirectory(); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_IsDirectoryFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("isReadOnly, '%s'",name.get()); Owned file=createIFile(name); try { unsigned ret = (unsigned)file->isReadOnly(); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_IsReadOnlyFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; bool set; msg.read(name).read(set); if (TF_TRACE) PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set); Owned file=createIFile(name); try { file->setReadOnly(set); reply.append((unsigned)RFEnoerror); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_SetReadOnlyFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("getTime, '%s'",name.get()); Owned file=createIFile(name); CDateTime createTime; CDateTime modifiedTime; CDateTime accessedTime; try { bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime); reply.append((unsigned)RFEnoerror).append(ret); if (ret) { createTime.serialize(reply); modifiedTime.serialize(reply); accessedTime.serialize(reply); } return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_GetTimeFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; bool creategot; CDateTime createTime; bool modifiedgot; CDateTime modifiedTime; bool accessedgot; CDateTime accessedTime; msg.read(name); msg.read(creategot); if (creategot) createTime.deserialize(msg); msg.read(modifiedgot); if (modifiedgot) modifiedTime.deserialize(msg); msg.read(accessedgot); if (accessedgot) accessedTime.deserialize(msg); if (TF_TRACE) PROGLOG("setTime, '%s'",name.get()); Owned file=createIFile(name); try { bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_SetTimeFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("CreateDir, '%s'",name.get()); Owned dir=createIFile(name); try { bool ret = dir->createDirectory(); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_CreateDirFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; StringAttr mask; bool includedir; bool sub; byte stream = 0; msg.read(name).read(mask).read(includedir).read(sub); if (msg.remaining()>sizeof(byte)) { msg.read(stream); if (stream==1) client.opendir.clear(); } if (TF_TRACE) PROGLOG("GetDir, '%s', '%s'",name.get(),mask.get()); try { Owned dir=createIFile(name); Owned iter; if (stream>1) iter.set(client.opendir); else { iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir)); client.opendir.set(iter); } if (!iter) { reply.append((unsigned)RFSERR_GetDirFailed); return false; } reply.append((unsigned)RFEnoerror); if (CRemoteDirectoryIterator::serialize(reply,iter,stream?0x100000:0)) client.opendir.clear(); else { bool cont=true; reply.append(cont); } return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_GetDirFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; StringAttr mask; bool includedir; bool sub; unsigned checkinterval; unsigned timeout; __int64 cancelid; // not yet used msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid); byte isprev; msg.read(isprev); Owned prev; if (isprev==1) { SocketEndpoint ep; CRemoteDirectoryIterator *di = new CRemoteDirectoryIterator(ep,name); di->appendBuf(msg); prev.setown(di); } if (TF_TRACE) PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get()); try { Owned dir=createIFile(name); Owned iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout); reply.append((unsigned)RFEnoerror); byte state = (iter.get()==NULL)?0:1; reply.append(state); if (state==1) CRemoteDirectoryIterator::serializeDiff(reply,iter); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_GetDirFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr uuid; StringAttr fromFile; StringAttr toFile; offset_t toOfs; offset_t fromOfs; offset_t size; offset_t sizeDone=0; offset_t totalSize=(offset_t)-1; unsigned timeout; msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout); try { AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout); reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize); } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_CopySectionFailed,e->errorCode(),s.str()); e->Release(); } return true; } bool cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client,bool usetmp=false) { IMPERSONATE_USER(client); RemoteFilename src; src.deserialize(msg); RemoteFilename dst; dst.deserialize(msg); StringAttr net; StringAttr mask; msg.read(net).read(mask); try { IpAddress ip; treeCopyFile(src,dst,net,mask,ip,usetmp); unsigned status = 0; reply.append((unsigned)RFEnoerror).append((unsigned)status); ip.ipserialize(reply); } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_TreeCopyFailed,e->errorCode(),s.str()); e->Release(); } return true; } bool cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { return cmdTreeCopy(msg, reply, client,true); } bool cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); StringAttr name; msg.read(name); if (TF_TRACE) PROGLOG("getCRC, '%s'",name.get()); Owned file=createIFile(name); try { unsigned ret = file->getCRC(); reply.append((unsigned)RFEnoerror).append(ret); return true; } catch (IException *e) { StringBuffer s; e->errorMessage(s); throwErr3(RFSERR_GetCrcFailed,e->errorCode(),s.str()); e->Release(); } return false; } bool cmdStop(MemoryBuffer &msg, MemoryBuffer &reply) { PROGLOG("Abort request received"); stopping = true; if (acceptsock) acceptsock->cancel_accept(); reply.append((unsigned)RFEnoerror); return false; } bool cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { StringAttr cmdline; StringAttr workdir; bool sync; bool hasoutput; size32_t insize; MemoryAttr inbuf; msg.read(cmdline).read(workdir).read(sync).read(hasoutput).read(insize); if (insize) msg.read(insize, inbuf.allocate(insize)); Owned pipe = createPipeProcess(); int retcode=-1; HANDLE phandle=(HANDLE)0; MemoryBuffer outbuf; if (pipe->run("EXEC",cmdline,workdir,insize!=0,hasoutput)) { if (insize) { pipe->write(insize, inbuf.get()); pipe->closeInput(); } if (hasoutput) { byte buf[4096]; loop { size32_t read = pipe->read(sizeof(buf),buf); if (!read) break; outbuf.append(read,buf); } } if (sync) retcode = pipe->wait(); else { phandle = pipe->getProcessHandle(); retcode = 0; } } size32_t outsz = outbuf.length(); reply.append(retcode).append((unsigned)phandle).append(outsz); if (outsz) reply.append(outbuf); return true; } bool cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply) { byte flags; msg.read(flags); int retcode=-1; if (flags!=255) { // escape retcode = traceFlags; traceFlags = flags; } reply.append(retcode); return true; } bool cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply) { StringBuffer retstr; int retcode = getInfo(retstr); reply.append(retcode).append(retstr.str()); return true; } bool cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply) { // TBD StringBuffer retstr; int retcode = getInfo(retstr); reply.append(retcode).append(retstr.str()); return true; } bool cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client) { IMPERSONATE_USER(client); try { StringAttr prefix; StringAttr filename; msg.read(prefix).read(filename); RemoteFilename rfn; rfn.setLocalPath(filename); ExtractedBlobArray extracted; extractBlobElements(prefix, rfn, extracted); unsigned n = extracted.ordinality(); reply.append((unsigned)RFEnoerror).append(n); for (unsigned i=0;ierrorMessage(s); throwErr3(RFSERR_ExtractBlobElementsFailed,e->errorCode(),s.str()); e->Release(); } return true; } bool cmdRedeploy(MemoryBuffer &msg, MemoryBuffer &reply) { return false; // TBD } bool cmdKill(MemoryBuffer & msg, MemoryBuffer & reply) { // TBD throwErr2(RFSERR_InvalidCommand,(unsigned)RFCkill); return false; } bool cmdUnknown(MemoryBuffer & msg, MemoryBuffer & reply,RemoteFileCommandType cmd) { throwErr2(RFSERR_InvalidCommand,(unsigned)cmd); return false; } bool cmdUnlock(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client) { // this is an attempt to authenticate when we haven't got authentication turned on StringBuffer s; client.peerName(s); if (TF_TRACE_CLIENT_STATS) PROGLOG("Connect from %s",s.str()); throwErr2(RFSERR_InvalidCommand,(unsigned)RFCunlock); return false; } bool dispatchCommand(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client) { RemoteFileCommandType cmd; msg.read(cmd); bool ret = true; switch(cmd) { MAPCOMMAND(RFCcloseIO, cmdCloseFileIO); MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client); MAPCOMMAND(RFCread, cmdRead); MAPCOMMAND(RFCsize, cmdSize); MAPCOMMAND(RFCwrite, cmdWrite); MAPCOMMANDCLIENT(RFCexists, cmdExists, *client); MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client); MAPCOMMANDCLIENT(RFCrename, cmdRename, *client); MAPCOMMAND(RFCgetver, cmdGetVer); MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client); MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client); MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client); MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client); MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client); MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client); MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client); MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client); MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client); MAPCOMMAND(RFCstop, cmdStop); MAPCOMMANDCLIENT(RFCexec, cmdExec, *client); MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client); MAPCOMMAND(RFCkill, cmdKill); MAPCOMMAND(RFCredeploy, cmdRedeploy); // only Windows MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client); MAPCOMMANDCLIENT(RFCmove, cmdMove, *client); MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client); MAPCOMMANDCLIENT(RFCappend, cmdAppend, *client); MAPCOMMAND(RFCsetsize, cmdSetSize); MAPCOMMAND(RFCsettrace, cmdSetTrace); MAPCOMMAND(RFCgetinfo, cmdGetInfo); MAPCOMMAND(RFCfirewall, cmdFirewall); MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client); MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client); MAPCOMMANDCLIENT(RFCtreecopy, cmdTreeCopy, *client); MAPCOMMANDCLIENT(RFCtreecopytmp, cmdTreeCopyTmp, *client); default: ret = cmdUnknown(msg,reply,cmd); } if (!ret) { // append error string if (reply.length()>=sizeof(unsigned)*2) { reply.reset(); unsigned z; unsigned e; reply.read(z).read(e); StringBuffer err("ERR("); err.append(e).append(") "); if (client&&(client->peerName(err))) err.append(" : "); if (e&&(reply.getPos()logPrevHandle(); } } return ret; } virtual IPooledThread *createNew() { return new cCommandProcessor(); } void run(SocketEndpoint &listenep) { if (listenep.isNull()) acceptsock.setown(ISocket::create(listenep.port)); else { StringBuffer ips; listenep.getIpText(ips); acceptsock.setown(ISocket::create_ip(listenep.port,ips.str())); } selecthandler->start(); loop { Owned sock; bool sockavail; try { sockavail = acceptsock->wait_read(1000*60*1)!=0; #if 0 if (!sockavail) { JSocketStatistics stats; getSocketStatistics(stats); StringBuffer s; getSocketStatisticsString(stats,s); PROGLOG( "Socket statistics : \n%s\n",s.str()); } #endif } catch (IException *e) { EXCLOG(e,"CRemoteFileServer(1)"); e->Release(); // not sure what to do so just accept sockavail = true; } if (stopping) break; if (sockavail) { try { sock.setown(acceptsock->accept(true)); if (!sock||stopping) break; } catch (IException *e) { EXCLOG(e,"CRemoteFileServer"); e->Release(); break; } runClient(sock.getClear()); } else checkTimeout(); } if (TF_TRACE_CLIENT_STATS) PROGLOG("CRemoteFileServer:run exiting"); selecthandler->stop(true); } void processUnauthenticatedCommand(RemoteFileCommandType typ,ISocket *socket, MemoryBuffer &msg) { // these are unauthenticated commands switch (typ) { case RFCgetver: break; default: typ = RFCinvalid; msg.writeDirect(msg.getPos()-sizeof(RemoteFileCommandType),sizeof(RemoteFileCommandType),&typ); } MemoryBuffer reply; dispatchCommand(msg, initSendBuffer(reply),NULL); sendBuffer(socket, reply); } bool checkAuthentication(ISocket *socket, IAuthenticatedUser *&ret) { ret = NULL; if (!AuthenticationEnabled) return true; MemoryBuffer reqbuf; MemoryBuffer reply; MemoryBuffer encbuf; // because aesEncrypt clears input initSendBuffer(reply); receiveBuffer(socket, reqbuf, 1); RemoteFileCommandType typ=0; if (reqbuf.remaining()getPeerAddress(ip); byte ipdata[16]; size32_t ipds = ip.getNetAddress(sizeof(ipdata),&ipdata); mergeOnce(oncekey,sizeof(ipdata),&ipdata); // this is clients key OnceKey mykey; genOnce(mykey); reply.append((unsigned)0); // errcode aesEncrypt(&oncekey,sizeof(oncekey),&mykey,sizeof(oncekey),encbuf); reply.append(encbuf.length()).append(encbuf); sendBuffer(socket, reply); // send my oncekey reqbuf.clear(); receiveBuffer(socket, reqbuf, 1); if (reqbuf.remaining()>sizeof(RemoteFileCommandType)+sizeof(size32_t)) { reqbuf.read(typ); if (typ==RFCunlockreply) { size32_t bs; reqbuf.read(bs); if (bs<=reqbuf.remaining()) { MemoryBuffer userbuf; aesDecrypt(&mykey,sizeof(mykey),reqbuf.readDirect(bs),bs,userbuf); byte n; userbuf.read(n); if (n>=2) { StringAttr user; StringAttr password; userbuf.read(user).read(password); Owned iau = createAuthenticatedUser(); if (iau->login(user,password)) { initSendBuffer(reply.clear()); reply.append((unsigned)0); sendBuffer(socket, reply); // send OK ret = iau; return true; } } } } } reply.clear(); throwErr(RFSERR_AuthenticateFailed); sendBuffer(socket, reply); // send OK return false; } void runClient(ISocket *sock) { cCommandProcessor::cCommandProcessorParams params; IAuthenticatedUser *user=NULL; bool authenticated = false; try { if (checkAuthentication(sock,user)) authenticated = true; } catch (IException *e) { e->Release(); } if (!authenticated) { try { sock->Release(); } catch (IException *e) { e->Release(); } return; } params.client = new CRemoteClientHandler(this,sock,user,globallasttick); { CriticalBlock block(sect); params.client->Link(); clients.append(*params.client); } threads->start(¶ms); } void stop() { // stop accept loop if (TF_TRACE_CLIENT_STATS) PROGLOG("CRemoteFileServer::stop"); if (acceptsock) acceptsock->cancel_accept(); threads->stopAll(); threads->joinAll(true,60*1000); } bool notify(CRemoteClientHandler *_client) { Linked client; client.set(_client); if (TF_TRACE_FULL) PROGLOG("notify %d",client->buf.length()); if (client->buf.length()) { cCommandProcessor::cCommandProcessorParams params; params.client = client.getClear(); threads->start(¶ms); } else onCloseSocket(client,3); // removes owned handles return false; } void addClient(CRemoteClientHandler *client) { if (client&&client->socket) selecthandler->add(client->socket,SELECTMODE_READ,client); } void checkTimeout() { if (msTick()-clientcounttick>1000*60*60) { CriticalBlock block(ClientCountSect); if ((ClientCount!=0)||(MaxClientCount!=0)&&TF_TRACE_CLIENT_STATS) PROGLOG("Client count = %d, max = %d",ClientCount,MaxClientCount); clientcounttick = msTick(); MaxClientCount = ClientCount; if (closedclients) { if (TF_TRACE_CLIENT_STATS) PROGLOG("Closed client count = %d",closedclients); closedclients = 0; } } CriticalBlock block(sect); ForEachItemInRev(i,clients) { CRemoteClientHandler &client = clients.item(i); if (client.timedOut()) { StringBuffer s; bool ok = client.getInfo(s); // will spot duff sockets if (ok&&(client.handles.ordinality()!=0)) { if (TF_TRACE_CLIENT_CONN) WARNLOG("Inactive %s",s.str()); } else { #ifndef _DEBUG if (TF_TRACE_CLIENT_CONN) #endif PROGLOG("Timing out %s",s.str()); closedclients++; onCloseSocket(&client,4); // removes owned handles } } } } int getInfo(StringBuffer &str) { { CriticalBlock block(ClientCountSect); str.append(VERSTRING).append('\n'); str.appendf("Client count = %d\n",ClientCount); str.appendf("Max client count = %d",MaxClientCount); } CriticalBlock block(sect); ForEachItemIn(i,clients) { str.append('\n').append(i).append(": "); clients.item(i).getInfo(str); } return 0; } unsigned threadRunningCount() { if (!threads) return 0; return threads->runningCount(); } Semaphore &throttleSem() { return throttlesem; } unsigned idleTime() { unsigned t = (unsigned)atomic_read(&globallasttick); return msTick()-t; } }; IRemoteFileServer * createRemoteFileServer() { #if SIMULATE_PACKETLOSS errorSimulationOn = false; #endif return new CRemoteFileServer(); }