12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <string>
- #include <unordered_map>
- #include "platform.h"
- #include "portlist.h"
- #include "jlib.hpp"
- #include "jflz.hpp"
- #include "jio.hpp"
- #include "jlog.hpp"
- #include "jregexp.hpp"
- #include "jmutex.hpp"
- #include "jfile.hpp"
- #include "jhtree.hpp"
- #include "rtldynfield.hpp"
- #include "rtlds_imp.hpp"
- #include "rtlread_imp.hpp"
- #include "rtlrecord.hpp"
- #include "eclhelper_dyn.hpp"
- #include "rtlcommon.hpp"
- #include "rtlformat.hpp"
- #include "digisign.hpp"
- #include "remoteerr.hpp"
- #include "rmtclient.hpp"
- #include "rmtclient_impl.hpp"
- #include "rmtfile.hpp"
- //----------------------------------------------------------------------------
- //#define TEST_DAFILESRV_FOR_UNIX_PATHS // probably not needed
- static class CSecuritySettings
- {
- unsigned short daliServixPort;
- public:
- CSecuritySettings()
- {
- querySecuritySettings(nullptr, &daliServixPort, nullptr, nullptr, nullptr);
- }
- unsigned short queryDaliServixPort() { return daliServixPort; }
- } securitySettings;
- unsigned short getDaliServixPort()
- {
- return securitySettings.queryDaliServixPort();
- }
- void setCanAccessDirectly(RemoteFilename & file,bool set)
- {
- if (set)
- file.setPort(0);
- else if (file.getPort()==0) // foreign daliservix may be passed in
- file.setPort(getDaliServixPort());
- }
- bool canAccessDirectly(const RemoteFilename & file) // not that well named but historical
- {
- return (file.getPort()==0);
- }
- void setLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
- {
- setDafsLocalMountRedirect(ip,dir,mountdir);
- }
- class CDaliServixFilter : public CInterface
- {
- protected:
- StringAttr dir, sourceRangeText;
- SocketEndpointArray sourceRangeIps;
- bool sourceRangeHasPorts, trace;
- bool checkForPorts(SocketEndpointArray &ips)
- {
- ForEachItemIn(i, ips)
- {
- if (ips.item(i).port)
- return true;
- }
- return false;
- }
- public:
- CDaliServixFilter(const char *_dir, const char *sourceRange, bool _trace) : dir(_dir), trace(_trace)
- {
- if (sourceRange)
- {
- sourceRangeText.set(sourceRange);
- sourceRangeIps.fromText(sourceRange, 0);
- sourceRangeHasPorts = checkForPorts(sourceRangeIps);
- }
- else
- sourceRangeHasPorts = false;
- }
- bool queryTrace() const { return trace; }
- const char *queryDirectory() const { return dir; }
- bool testPath(const char *path) const
- {
- if (!dir) // if no dir in filter, match any
- return true;
- else
- return startsWith(path, dir.get());
- }
- bool applyFilter(const SocketEndpoint &ep) const
- {
- if (sourceRangeText.length())
- {
- SocketEndpoint _ep = ep;
- if (!sourceRangeHasPorts) // if source range doesn't have ports, only check ip
- _ep.port = 0;
- return NotFound != sourceRangeIps.find(_ep);
- }
- // NB: If no source range, use target range to decide if filter should apply
- return testEp(ep);
- }
- virtual bool testEp(const SocketEndpoint &ep) const = 0;
- virtual StringBuffer &getInfo(StringBuffer &info)
- {
- if (dir.length())
- info.append(", dir=").append(dir.get());
- if (sourceRangeText.get())
- info.append(", sourcerange=").append(sourceRangeText.get());
- info.append(", trace=(").append(trace ? "true" : "false").append(")");
- return info;
- }
- };
- class CDaliServixSubnetFilter : public CDaliServixFilter
- {
- IpSubNet ipSubNet;
- public:
- CDaliServixSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace) :
- CDaliServixFilter(dir, sourceRange, trace)
- {
- if (!ipSubNet.set(subnet, mask))
- throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
- }
- virtual bool testEp(const SocketEndpoint &ep) const
- {
- return ipSubNet.test(ep);
- }
- virtual StringBuffer &getInfo(StringBuffer &info)
- {
- info.append("subnet=");
- ipSubNet.getNetText(info);
- info.append(", mask=");
- ipSubNet.getMaskText(info);
- CDaliServixFilter::getInfo(info);
- return info;
- }
- };
- class CDaliServixRangeFilter : public CDaliServixFilter
- {
- StringAttr rangeText;
- SocketEndpointArray rangeIps;
- bool rangeIpsHavePorts;
- public:
- CDaliServixRangeFilter(const char *_range, const char *dir, const char *sourceRange, bool trace)
- : CDaliServixFilter(dir, sourceRange, trace)
- {
- rangeText.set(_range);
- rangeIps.fromText(_range, 0);
- rangeIpsHavePorts = checkForPorts(rangeIps);
- }
- virtual bool testEp(const SocketEndpoint &ep) const
- {
- SocketEndpoint _ep = ep;
- if (!rangeIpsHavePorts) // if range doesn't have ports, only check ip
- _ep.port = 0;
- return NotFound != rangeIps.find(_ep);
- }
- virtual StringBuffer &getInfo(StringBuffer &info)
- {
- info.append("range=").append(rangeText.get());
- CDaliServixFilter::getInfo(info);
- return info;
- }
- };
- CDaliServixFilter *createDaliServixFilter(IPropertyTree &filterProps)
- {
- CDaliServixFilter *filter = NULL;
- const char *dir = filterProps.queryProp("@directory");
- const char *sourceRange = filterProps.queryProp("@sourcerange");
- bool trace = filterProps.getPropBool("@trace");
- if (filterProps.hasProp("@subnet"))
- filter = new CDaliServixSubnetFilter(filterProps.queryProp("@subnet"), filterProps.queryProp("@mask"), dir, sourceRange, trace);
- else if (filterProps.hasProp("@range"))
- filter = new CDaliServixRangeFilter(filterProps.queryProp("@range"), dir, sourceRange, trace);
- else
- throw MakeStringException(0, "Unknown DaliServix filter definition");
- return filter;
- }
- class CDaliServixIntercept: public CInterface, implements IDaFileSrvHook
- {
- CIArrayOf<CDaliServixFilter> filters;
- StringAttr forceRemotePattern;
- void addFilter(CDaliServixFilter *filter)
- {
- filters.append(*filter);
- StringBuffer msg("DaFileSrvHook: adding translateToLocal [");
- filter->getInfo(msg);
- msg.append("]");
- PROGLOG("%s", msg.str());
- }
- public:
- IMPLEMENT_IINTERFACE;
- virtual void forceRemote(const char *pattern)
- {
- forceRemotePattern.set(pattern);
- }
- virtual IFile * createIFile(const RemoteFilename & filename)
- {
- SocketEndpoint ep = filename.queryEndpoint();
- bool noport = (ep.port==0);
- setDafsEndpointPort(ep);
- if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT && ep.port!=SECURE_DAFILESRV_PORT)) // assume standard port is running on local machine
- {
- #ifdef __linux__
- #ifndef USE_SAMBA
- if (noport && filters.ordinality())
- {
- ForEachItemIn(sn, filters)
- {
- CDaliServixFilter &filter = filters.item(sn);
- if (filter.testEp(ep))
- {
- StringBuffer lPath;
- filename.getLocalPath(lPath);
- if (filter.testPath(lPath.str()))
- {
- if (filter.queryTrace())
- {
- StringBuffer fromPath;
- filename.getRemotePath(fromPath);
- PROGLOG("Redirecting path: '%s' to '%s", fromPath.str(), lPath.str());
- }
- return ::createIFile(lPath.str());
- }
- }
- }
- }
- return createDaliServixFile(filename);
- #endif
- #endif
- if (!noport) // expect all filenames that specify port to be dafilesrc or daliservix
- return createDaliServixFile(filename);
- if (filename.isUnixPath()
- #ifdef TEST_DAFILESRV_FOR_UNIX_PATHS
- &&testDaliServixPresent(ep)
- #endif
- )
- return createDaliServixFile(filename);
- }
- else if (forceRemotePattern)
- {
- StringBuffer localPath;
- filename.getLocalPath(localPath);
- // must be local to be here, check if matches forceRemotePattern
- if (WildMatch(localPath, forceRemotePattern, false))
- return createDaliServixFile(filename);
- }
- return NULL;
- }
- virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace)
- {
- Owned<CDaliServixFilter> filter = new CDaliServixSubnetFilter(subnet, mask, dir, sourceRange, trace);
- addFilter(filter.getClear());
- }
- virtual void addRangeFilter(const char *range, const char *dir, const char *sourceRange, bool trace)
- {
- Owned<CDaliServixFilter> filter = new CDaliServixRangeFilter(range, dir, sourceRange, trace);
- addFilter(filter.getClear());
- }
- virtual IPropertyTree *addFilters(IPropertyTree *config, const SocketEndpoint *myEp)
- {
- if (!config)
- return NULL;
- Owned<IPropertyTree> result;
- Owned<IPropertyTreeIterator> iter = config->getElements("Filter");
- ForEach(*iter)
- {
- Owned<CDaliServixFilter> filter = createDaliServixFilter(iter->query());
- // Only add filters where myIP matches filter criteria
- if (!myEp || filter->applyFilter(*myEp))
- {
- addFilter(filter.getClear());
- if (!result)
- result.setown(createPTree());
- result->addPropTree("Filter", LINK(&iter->query()));
- }
- }
- return result.getClear();
- }
- virtual IPropertyTree *addMyFilters(IPropertyTree *config, SocketEndpoint *myEp)
- {
- if (myEp)
- return addFilters(config, myEp);
- else
- {
- SocketEndpoint ep;
- GetHostIp(ep);
- return addFilters(config, &ep);
- }
- }
- virtual void clearFilters()
- {
- filters.kill();
- }
- } *DaliServixIntercept = NULL;
- void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted)
- {
- SocketEndpoint ep(file.queryEndpoint());
- setDafsEndpointPort(ep);
- if (ep.isNull())
- return;
- StringBuffer filename;
- remoteExtractBlobElements(ep, prefix, file.getLocalPath(filename).str(), extracted);
- }
- //---------------------------------------------------------------------------
- class CRemoteDirectoryIterator : implements IDirectoryDifferenceIterator, public CInterface
- {
- Owned<IFile> cur;
- bool curvalid;
- bool curisdir;
- StringAttr curname;
- CDateTime curdt;
- __int64 cursize;
- StringAttr dir;
- SocketEndpoint ep;
- byte *flags;
- unsigned numflags;
- unsigned curidx;
- unsigned mask;
- MemoryBuffer buf;
- public:
- static CriticalSection crit;
- CRemoteDirectoryIterator(const SocketEndpoint &_ep,const char *_dir)
- : dir(_dir)
- {
- // an extended difference iterator starts with 2 (for bwd compatibility)
- ep = _ep;
- curisdir = false;
- curvalid = false;
- cursize = 0;
- curidx = (unsigned)-1;
- mask = 0;
- numflags = 0;
- flags = NULL;
- }
- bool appendBuf(MemoryBuffer &_buf)
- {
- buf.setSwapEndian(_buf.needSwapEndian());
- byte hdr;
- _buf.read(hdr);
- if (hdr==2) {
- _buf.read(numflags);
- flags = (byte *)malloc(numflags);
- _buf.read(numflags,flags);
- }
- else {
- buf.append(hdr);
- flags = NULL;
- numflags = 0;
- }
- size32_t rest = _buf.length()-_buf.getPos();
- const byte *rb = (const byte *)_buf.readDirect(rest);
- bool ret = true;
- // At the last byte of the rb (rb[rest-1]) is the stream live flag
- // True if the stream has more data
- // False at the end of stream
- // The previous byte (rb[rest-2]) is the flag to signal there are more
- // valid entries in this block
- // True if there are valid directory entry follows this flag
- // False if there are no more valid entry in this block aka end of block
- // If there is more data in the stream, the end of block flag should be removed
- if (rest&&(rb[rest-1]!=0))
- {
- rest--; // remove stream live flag
- if(rest && (0 == rb[rest-1]))
- rest--; //Remove end of block flag
- ret = false; // continuation
- }
- buf.append(rest,rb);
- return ret;
- }
- ~CRemoteDirectoryIterator()
- {
- free(flags);
- }
- IMPLEMENT_IINTERFACE
- bool first()
- {
- curidx = (unsigned)-1;
- buf.reset();
- return next();
- }
- bool next()
- {
- for (;;) {
- curidx++;
- cur.clear();
- curdt.clear();
- curname.clear();
- cursize = 0;
- curisdir = false;
- if (buf.getPos()>=buf.length())
- return false;
- byte isValidEntry;
- buf.read(isValidEntry);
- curvalid = isValidEntry!=0;
- if (!curvalid)
- return false;
- buf.read(curisdir);
- buf.read(cursize);
- curdt.deserialize(buf);
- buf.read(curname);
- // kludge for bug in old linux jlibs
- if (strchr(curname,'\\')&&(getPathSepChar(dir)=='/')) {
- StringBuffer temp(curname);
- temp.replace('\\','/');
- curname.set(temp.str());
- }
- if ((mask==0)||(getFlags()&mask))
- break;
- }
- return true;
- }
- bool isValid()
- {
- return curvalid;
- }
- IFile & query()
- {
- if (!cur) {
- StringBuffer full(dir);
- addPathSepChar(full).append(curname);
- if (ep.isNull())
- cur.setown(createIFile(full.str()));
- else {
- RemoteFilename rfn;
- rfn.setPath(ep,full.str());
- cur.setown(createIFile(rfn));
- }
- }
- return *cur;
- }
- StringBuffer &getName(StringBuffer &buf)
- {
- return buf.append(curname);
- }
- bool isDir()
- {
- return curisdir;
- }
- __int64 getFileSize()
- {
- if (curisdir)
- return -1;
- return cursize;
- }
- bool getModifiedTime(CDateTime &ret)
- {
- ret = curdt;
- return true;
- }
- void setMask(unsigned _mask)
- {
- mask = _mask;
- }
- virtual unsigned getFlags()
- {
- if (flags&&(curidx<numflags))
- return flags[curidx];
- return 0;
- }
- static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize, bool first)
- {
- bool ret = true;
- byte b=1;
- StringBuffer tmp;
- if (first ? iter->first() : iter->next()) {
- for (;;) {
- mb.append(b);
- bool isdir = iter->isDir();
- __int64 sz = isdir?0:iter->getFileSize();
- CDateTime dt;
- iter->getModifiedTime(dt);
- iter->getName(tmp.clear());
- mb.append(isdir).append(sz);
- dt.serialize(mb);
- mb.append(tmp.str());
- if (bufsize&&(mb.length()>=bufsize-1)) {
- ret = false;
- break;
- }
- if (!iter->next())
- break;
- }
- }
- b = 0;
- mb.append(b);
- return ret;
- }
- static void serializeDiff(MemoryBuffer &mb,IDirectoryDifferenceIterator *iter)
- {
- // bit slow
- MemoryBuffer flags;
- ForEach(*iter)
- flags.append((byte)iter->getFlags());
- if (flags.length()) {
- byte b = 2;
- mb.append(b).append((unsigned)flags.length()).append(flags);
- }
- serialize(mb,iter,0,true);
- }
- void serialize(MemoryBuffer &mb,bool isdiff)
- {
- byte b;
- if (isdiff&&numflags&&flags) {
- b = 2;
- mb.append(b).append(numflags).append(numflags,flags);
- }
- serialize(mb,this,0,true);
- }
- };
- IDirectoryIterator *createRemoteDirectorIterator(const SocketEndpoint &ep, const char *name, MemoryBuffer &state)
- {
- Owned<CRemoteDirectoryIterator> di = new CRemoteDirectoryIterator(ep, name);
- di->appendBuf(state);
- return di.getClear();
- }
- bool serializeRemoteDirectoryIterator(MemoryBuffer &tgt, IDirectoryIterator *iter, size32_t bufsize, bool first)
- {
- return CRemoteDirectoryIterator::serialize(tgt, iter, bufsize, first);
- }
- void serializeRemoteDirectoryDiff(MemoryBuffer &tgt, IDirectoryDifferenceIterator *iter)
- {
- CRemoteDirectoryIterator::serializeDiff(tgt, iter);
- }
- class CCritTable;
- class CEndpointCS : public CriticalSection, public CInterface
- {
- CCritTable &table;
- const SocketEndpoint ep;
- public:
- CEndpointCS(CCritTable &_table, const SocketEndpoint &_ep) : table(_table), ep(_ep) { }
- const void *queryFindParam() const { return &ep; }
- virtual void beforeDispose();
- };
- class CCritTable : private SimpleHashTableOf<CEndpointCS, const SocketEndpoint>
- {
- typedef SimpleHashTableOf<CEndpointCS, const SocketEndpoint> PARENT;
- CriticalSection crit;
- public:
- CEndpointCS *getCrit(const SocketEndpoint &ep)
- {
- CriticalBlock b(crit);
- CEndpointCS * clientCrit = find(ep);
- if (!clientCrit || !clientCrit->isAliveAndLink()) // if !isAliveAndLink(), then it is in the process of being destroyed/removed.
- {
- clientCrit = new CEndpointCS(*this, ep);
- replace(*clientCrit); // NB table doesn't own
- }
- return clientCrit;
- }
- unsigned getHashFromElement(const void *e) const
- {
- const CEndpointCS &elem=*(const CEndpointCS *)e;
- return getHashFromFindParam(elem.queryFindParam());
- }
- unsigned getHashFromFindParam(const void *fp) const
- {
- return ((const SocketEndpoint *)fp)->hash(0);
- }
- void removeExact(CEndpointCS *clientCrit)
- {
- CriticalBlock b(crit);
- PARENT::removeExact(clientCrit); // NB may not exist, could have been replaced if detected !isAlive() in getCrit()
- }
- } *dirCSTable;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- dirCSTable = new CCritTable;
- return true;
- }
- MODULE_EXIT()
- {
- delete dirCSTable;
- }
- void CEndpointCS::beforeDispose()
- {
- table.removeExact(this);
- }
- //---------------------------------------------------------------------------
- class CRemoteFile : public CRemoteBase, implements IFile
- {
- StringAttr remotefilename;
- unsigned flags;
- bool isShareSet;
- public:
- IMPLEMENT_IINTERFACE_O_USING(CRemoteBase);
- CRemoteFile(const SocketEndpoint &_ep, const char * _filename)
- : CRemoteBase(_ep, _filename)
- {
- flags = ((unsigned)IFSHread)|((S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)<<16);
- isShareSet = false;
- if (filename.length()>2 && isPathSepChar(filename[0]) && isShareChar(filename[2]))
- {
- VStringBuffer winDriveFilename("%c:%s", filename[1], filename+3);
- filename.set(winDriveFilename);
- }
- }
- bool exists()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCexists).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- bool ok;
- replyBuffer.read(ok);
- return ok;
- }
- bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime)
- {
- CDateTime dummyTime;
- if (!createTime)
- createTime = &dummyTime;
- if (!modifiedTime)
- modifiedTime = &dummyTime;
- if (!accessedTime)
- accessedTime = &dummyTime;
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCgettime).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- bool ok;
- replyBuffer.read(ok);
- if (ok) {
- createTime->deserialize(replyBuffer);
- modifiedTime->deserialize(replyBuffer);
- accessedTime->deserialize(replyBuffer);
- }
- return ok;
- }
- bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCsettime).append(filename);
- if (createTime)
- {
- sendBuffer.append((bool)true);
- createTime->serialize(sendBuffer);
- }
- else
- sendBuffer.append((bool)false);
- if (modifiedTime)
- {
- sendBuffer.append((bool)true);
- modifiedTime->serialize(sendBuffer);
- }
- else
- sendBuffer.append((bool)false);
- if (accessedTime)
- {
- sendBuffer.append((bool)true);
- accessedTime->serialize(sendBuffer);
- }
- else
- sendBuffer.append((bool)false);
- sendRemoteCommand(sendBuffer, replyBuffer);
- bool ok;
- replyBuffer.read(ok);
- return ok;
- }
- fileBool isDirectory()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCisdirectory).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- unsigned ret;
- replyBuffer.read(ret);
- return (fileBool)ret;
- }
- fileBool isFile()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCisfile).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- unsigned ret;
- replyBuffer.read(ret);
- return (fileBool)ret;
- }
- fileBool isReadOnly()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCisreadonly).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- unsigned ret;
- replyBuffer.read(ret);
- return (fileBool)ret;
- }
- IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone);
- IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
- IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported
- const char * queryFilename()
- {
- if (remotefilename.isEmpty()) {
- RemoteFilename rfn;
- rfn.setPath(ep,filename);
- StringBuffer path;
- rfn.getRemotePath(path);
- remotefilename.set(path);
- }
- return remotefilename.get();
- }
- void resetLocalFilename(const char *name)
- {
- remotefilename.clear();
- filename.set(name);
- }
- bool remove()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCremove).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- bool ok;
- replyBuffer.read(ok);
- return ok;
- }
- void rename(const char *newname)
- {
- // currently ignores directory on newname (in future versions newname will be required to be tail only and not full path)
- StringBuffer path;
- splitDirTail(filename,path);
- StringBuffer newdir;
- path.append(splitDirTail(newname,newdir));
- if (newdir.length()&&(strcmp(newdir.str(),path.str())!=0))
- WARNLOG("CRemoteFile::rename passed full path '%s' that may not to match original directory '%s'",newname,path.str());
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCrename).append(filename).append(path);
- sendRemoteCommand(sendBuffer, replyBuffer);
- filename.set(path);
- remotefilename.clear();
- }
- void move(const char *newname)
- {
- // like rename except between directories
- // first create replote path
- if (!newname||!*newname)
- return;
- RemoteFilename destrfn;
- if (isPathSepChar(newname[0])&&isPathSepChar(newname[1])) {
- destrfn.setRemotePath(newname);
- if (!destrfn.queryEndpoint().ipequals(ep)) {
- StringBuffer msg;
- msg.appendf("IFile::move %s to %s, destination node must match source node", queryFilename(), newname);
- throw createDafsException(RFSERR_MoveFailed,msg.str());
- }
- }
- else
- destrfn.setPath(ep,newname);
- StringBuffer dest;
- newname = destrfn.getLocalPath(dest).str();
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- StringBuffer path;
- splitDirTail(filename,path);
- StringBuffer newdir;
- const char *newtail = splitDirTail(newname,newdir);
- if (strcmp(newdir.str(),path.str())==0)
- {
- path.append(newtail);
- newname = path;
- sendBuffer.append((RemoteFileCommandType)RFCrename); // use rename if we can (supported on older dafilesrv)
- }
- else
- sendBuffer.append((RemoteFileCommandType)RFCmove);
- sendBuffer.append(filename).append(newname);
- sendRemoteCommand(sendBuffer, replyBuffer);
- filename.set(newname);
- remotefilename.clear();
- }
- void setReadOnly(bool set)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCsetreadonly).append(filename).append(set);
- sendRemoteCommand(sendBuffer, replyBuffer);
- }
- void setFilePermissions(unsigned fPerms)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCsetfileperms).append(filename).append(fPerms);
- try
- {
- sendRemoteCommand(sendBuffer, replyBuffer);
- }
- catch (IDAFS_Exception *e)
- {
- if (e->errorCode() == RFSERR_InvalidCommand)
- {
- WARNLOG("umask setFilePermissions (0%o) not supported on remote server", fPerms);
- e->Release();
- }
- else
- throw;
- }
- }
- offset_t size()
- {
- #if 1 // faster method (consistant with IFile)
- // do this by using dir call (could be improved with new function but this not *too* bad)
- if (isSpecialPath(filename))
- return 0; // queries deemed to always exist (though don't know size).
- // if needed to get size I guess could use IFileIO method and cache (bit of pain though)
- StringBuffer dir;
- const char *tail = splitDirTail(filename,dir);
- if (!dir.length())
- return false;
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- bool includedirs = true;
- bool sub=false;
- {
- //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
- Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
- CriticalBlock block(*crit);
- sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
- try
- {
- sendRemoteCommand(sendBuffer, replyBuffer);
- }
- catch (IDAFS_Exception * e)
- {
- if (e->errorCode() == RFSERR_GetDirFailed)
- {
- e->Release();
- return (offset_t)-1;
- }
- else
- throw e;
- }
- }
- // now should be 0 or 1 files returned
- Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
- iter->appendBuf(replyBuffer);
- if (!iter->first())
- return (offset_t)-1;
- return (offset_t) iter->getFileSize();
- #else
- IFileIO * io = open(IFOread);
- offset_t length = (offset_t)-1;
- if (io)
- {
- length = io->size();
- io->Release();
- }
- return length;
- #endif
- }
- bool createDirectory()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCcreatedir).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer);
- bool ok;
- replyBuffer.read(ok);
- return ok;
- }
- virtual IDirectoryIterator *directoryFiles(const char *mask,bool sub,bool includedirs)
- {
- if (mask&&!*mask)
- return createDirectoryIterator("",""); // NULL iterator
- CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename);
- byte stream = (sub || !mask || containsFileWildcard(mask)) ? 1 : 0; // no point in streaming if mask without wildcards or sub, as will only be <= 1 match.
- Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
- CriticalBlock block(*crit);
- for (;;)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(filename).append(mask?mask:"").append(includedirs).append(sub).append(stream);
- sendRemoteCommand(sendBuffer, replyBuffer);
- if (ret->appendBuf(replyBuffer))
- break;
- stream = 2; // NB: will never get here if streaming was off (if stream==0 above)
- }
- return ret;
- }
- IDirectoryDifferenceIterator *monitorDirectory(
- IDirectoryIterator *prev=NULL, // in (NULL means use current as baseline)
- const char *mask=NULL,
- bool sub=false,
- bool includedirs=false,
- unsigned checkinterval=60*1000,
- unsigned timeout=(unsigned)-1,
- Semaphore *abortsem=NULL) // returns NULL if timed out
- {
- // abortsem not yet supported
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCmonitordir).append(filename).append(mask?mask:"").append(includedirs).append(sub);
- sendBuffer.append(checkinterval).append(timeout);
- __int64 cancelid=0; // not yet used
- sendBuffer.append(cancelid);
- byte isprev=(prev!=NULL)?1:0;
- sendBuffer.append(isprev);
- if (prev)
- CRemoteDirectoryIterator::serialize(sendBuffer,prev,0,true);
- sendRemoteCommand(sendBuffer, replyBuffer);
- byte status;
- replyBuffer.read(status);
- if (status==1)
- {
- CRemoteDirectoryIterator *iter = new CRemoteDirectoryIterator(ep, filename);
- iter->appendBuf(replyBuffer);
- return iter;
- }
- return NULL;
- }
- bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
- {
- // do this by using dir call (could be improved with new function but this not *too* bad)
- StringBuffer dir;
- const char *tail = splitDirTail(filename,dir);
- if (!dir.length())
- return false;
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- bool includedirs = true;
- bool sub=false;
- {
- //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
- Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
- CriticalBlock block(*crit);
- sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
- sendRemoteCommand(sendBuffer, replyBuffer);
- }
- // now should be 0 or 1 files returned
- Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
- iter->appendBuf(replyBuffer);
- if (!iter->first())
- return false;
- isdir = iter->isDir();
- size = (offset_t) iter->getFileSize();
- iter->getModifiedTime(modtime);
- return true;
- }
- bool setCompression(bool set)
- {
- assertex(!"Need to implement compress()");
- return false;
- }
- offset_t compressedSize()
- {
- assertex(!"Need to implement actualSize()");
- return (offset_t)-1;
- }
- void serialize(MemoryBuffer &tgt)
- {
- throwUnexpected();
- }
- void deserialize(MemoryBuffer &src)
- {
- throwUnexpected();
- }
- unsigned getCRC()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCgetcrc).append(filename);
- sendRemoteCommand(sendBuffer, replyBuffer, true, true);
- unsigned crc;
- replyBuffer.read(crc);
- return crc;
- }
- void setCreateFlags(unsigned short cflags)
- {
- flags &= 0xffff;
- flags |= ((unsigned)cflags<<16);
- }
- unsigned short getCreateFlags()
- {
- return (unsigned short)(flags>>16);
- }
- void setShareMode(IFSHmode shmode)
- {
- flags &= ~(IFSHfull|IFSHread);
- flags |= (unsigned)(shmode&(IFSHfull|IFSHread));
- isShareSet = true;
- }
- unsigned short getShareMode()
- {
- return (unsigned short)(flags&0xffff);
- }
- bool getIsShareSet()
- {
- return isShareSet;
- }
- void remoteExtractBlobElements(const char * prefix, ExtractedBlobArray & extracted)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- sendBuffer.append((RemoteFileCommandType)RFCextractblobelements).append(prefix).append(filename);
- MemoryBuffer replyBuffer;
- sendRemoteCommand(sendBuffer, replyBuffer, true, true); // handles error code
- unsigned n;
- replyBuffer.read(n);
- for (unsigned i=0;i<n;i++) {
- ExtractedBlobInfo *item = new ExtractedBlobInfo;
- item->deserialize(replyBuffer);
- extracted.append(*item);
- }
- }
- bool copySectionAsync(const char *uuid,const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout)
- {
- // now if we get here is it can be assumed the source file is local to where we send the command
- StringBuffer tos;
- dest.getRemotePath(tos);
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCcopysection).append(uuid).append(queryLocalName()).append(tos).append(toOfs).append(fromOfs).append(size).append(timeout);
- sendRemoteCommand(sendBuffer, replyBuffer);
- unsigned status;
- replyBuffer.read(status);
- if (progress)
- {
- offset_t sizeDone;
- offset_t totalSize;
- replyBuffer.read(sizeDone).read(totalSize);
- progress->onProgress(sizeDone,totalSize);
- }
- return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception
- }
- void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags=CFnone)
- {
- StringBuffer uuid;
- genUUID(uuid,true);
- unsigned timeout = 60*1000; // check every minute
- while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout));
- }
- void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
- virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write)
- {
- return NULL;
- }
- };
- class CRemoteFileIO : implements IFileIO, public CInterface
- {
- protected:
- Linked<CRemoteFile> parent;
- RemoteFileIOHandle handle;
- std::atomic<cycle_t> ioReadCycles;
- std::atomic<cycle_t> ioWriteCycles;
- std::atomic<__uint64> ioReadBytes;
- std::atomic<__uint64> ioWriteBytes;
- std::atomic<__uint64> ioReads;
- std::atomic<__uint64> ioWrites;
- std::atomic<unsigned> ioRetries;
- IFOmode mode;
- compatIFSHmode compatmode;
- IFEflags extraFlags;
- bool disconnectonexit;
- public:
- IMPLEMENT_IINTERFACE
- CRemoteFileIO(CRemoteFile *_parent)
- : parent(_parent), ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0), ioRetries(0)
- {
- handle = 0;
- disconnectonexit = false;
- }
- ~CRemoteFileIO()
- {
- if (handle) {
- try {
- close();
- }
- catch (IException *e) {
- StringBuffer s;
- e->errorMessage(s);
- WARNLOG("CRemoteFileIO close file: %s",s.str());
- e->Release();
- }
- }
- if (disconnectonexit)
- parent->disconnect();
- }
- void close()
- {
- if (handle)
- {
- try
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
- parent->sendRemoteCommand(sendBuffer,false);
- }
- catch (IDAFS_Exception *e)
- {
- if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
- throw;
- e->Release();
- }
- handle = 0;
- }
- }
- RemoteFileIOHandle getHandle() const { return handle; }
- bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- const char *localname = parent->queryLocalName();
- localname = skipSpecialPath(localname);
- // also send _extraFlags
- // then also send sMode, cFlags
- unsigned short sMode = parent->getShareMode();
- unsigned short cFlags = parent->getCreateFlags();
- if (!(parent->getIsShareSet()))
- {
- switch ((compatIFSHmode)_compatmode)
- {
- case compatIFSHnone:
- sMode = IFSHnone;
- break;
- case compatIFSHread:
- sMode = IFSHread;
- break;
- case compatIFSHwrite:
- sMode = IFSHfull;
- break;
- case compatIFSHall:
- sMode = IFSHfull;
- break;
- }
- }
- sendBuffer.append((RemoteFileCommandType)RFCopenIO).append(localname).append((byte)_mode).append((byte)_compatmode).append((byte)_extraFlags).append(sMode).append(cFlags);
- parent->sendRemoteCommand(sendBuffer, replyBuffer);
- replyBuffer.read(handle);
- if (!handle)
- return false;
- switch (_mode) {
- case IFOcreate:
- mode = IFOwrite;
- break;
- case IFOcreaterw:
- mode = IFOreadwrite;
- break;
- default:
- mode = _mode;
- break;
- }
- compatmode = _compatmode;
- extraFlags = _extraFlags;
- return true;
- }
- bool reopen()
- {
- StringBuffer s;
- PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str());
- if (open(mode,compatmode,extraFlags))
- return true;
- return false;
- }
- offset_t size()
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCsize).append(handle);
- parent->sendRemoteCommand(sendBuffer, replyBuffer, false);
- // Retry using reopen TBD
- offset_t ret;
- replyBuffer.read(ret);
- return ret;
- }
- virtual unsigned __int64 getStatistic(StatisticKind kind)
- {
- switch (kind)
- {
- case StCycleDiskReadIOCycles:
- return ioReadCycles.load(std::memory_order_relaxed);
- case StCycleDiskWriteIOCycles:
- return ioWriteCycles.load(std::memory_order_relaxed);
- case StTimeDiskReadIO:
- return cycle_to_nanosec(ioReadCycles.load(std::memory_order_relaxed));
- case StTimeDiskWriteIO:
- return cycle_to_nanosec(ioWriteCycles.load(std::memory_order_relaxed));
- case StSizeDiskRead:
- return ioReadBytes.load(std::memory_order_relaxed);
- case StSizeDiskWrite:
- return ioWriteBytes.load(std::memory_order_relaxed);
- case StNumDiskReads:
- return ioReads.load(std::memory_order_relaxed);
- case StNumDiskWrites:
- return ioWrites.load(std::memory_order_relaxed);
- case StNumDiskRetries:
- return ioRetries.load(std::memory_order_relaxed);
- }
- return 0;
- }
- size32_t read(offset_t pos, size32_t len, void * data)
- {
- size32_t got;
- MemoryBuffer replyBuffer;
- CCycleTimer timer;
- const void *b;
- try
- {
- b = doRead(pos,len,replyBuffer,got,data);
- }
- catch (...)
- {
- ioReadCycles.fetch_add(timer.elapsedCycles());
- throw;
- }
- ioReadCycles.fetch_add(timer.elapsedCycles());
- ioReadBytes.fetch_add(got);
- ++ioReads;
- if (b!=data)
- memcpy(data,b,got);
- return got;
- }
- virtual void flush()
- {
- }
- const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
- {
- unsigned tries=0;
- for (;;)
- {
- try
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- replyBuffer.clear();
- sendBuffer.append((RemoteFileCommandType)RFCread).append(handle).append(pos).append(len);
- parent->sendRemoteCommand(sendBuffer, replyBuffer,false);
- // kludge dafilesrv versions <= 1.5e don't return error correctly
- if (replyBuffer.length()>len+sizeof(size32_t)+sizeof(unsigned))
- {
- size32_t save = replyBuffer.getPos();
- replyBuffer.reset(len+sizeof(size32_t)+sizeof(unsigned));
- unsigned errCode;
- replyBuffer.read(errCode);
- if (errCode)
- {
- StringBuffer msg;
- parent->ep.getUrlStr(msg.append('[')).append("] ");
- if (replyBuffer.getPos()<replyBuffer.length())
- {
- StringAttr s;
- replyBuffer.read(s);
- msg.append(s);
- }
- else
- msg.append("ERROR #").append(errCode);
- throw createDafsException(errCode, msg.str());
- }
- else
- replyBuffer.reset(save);
- }
- replyBuffer.read(got);
- if ((got>replyBuffer.remaining())||(got>len))
- {
- PROGLOG("Read beyond buffer %d,%d,%d",got,replyBuffer.remaining(),len);
- throw createDafsException(RFSERR_ReadFailed, "Read beyond buffer");
- }
- return replyBuffer.readDirect(got);
- }
- catch (IJSOCK_Exception *e)
- {
- EXCLOG(e,"CRemoteFileIO::read");
- if (++tries > 3)
- {
- ioRetries.fetch_add(tries);
- throw;
- }
- WARNLOG("Retrying read of %s (%d)",parent->queryLocalName(),tries);
- Owned<IException> exc = e;
- if (!reopen())
- {
- ioRetries.fetch_add(tries);
- throw exc.getClear();
- }
- }
- }
- if (tries)
- ioRetries.fetch_add(tries);
- got = 0;
- return NULL;
- }
- size32_t write(offset_t pos, size32_t len, const void * data)
- {
- unsigned tries=0;
- size32_t ret = 0;
- CCycleTimer timer;
- for (;;)
- {
- try
- {
- MemoryBuffer replyBuffer;
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- sendBuffer.append((RemoteFileCommandType)RFCwrite).append(handle).append(pos).append(len).append(len, data);
- parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
- replyBuffer.read(ret);
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- EXCLOG(e,"CRemoteFileIO::write");
- if (++tries > 3)
- {
- ioRetries.fetch_add(tries);
- ioWriteCycles.fetch_add(timer.elapsedCycles());
- throw;
- }
- WARNLOG("Retrying write(%" I64F "d,%d) of %s (%d)",pos,len,parent->queryLocalName(),tries);
- Owned<IException> exc = e;
- if (!reopen())
- {
- ioRetries.fetch_add(tries);
- ioWriteCycles.fetch_add(timer.elapsedCycles());
- throw exc.getClear();
- }
- }
- }
- if (tries)
- ioRetries.fetch_add(tries);
- ioWriteCycles.fetch_add(timer.elapsedCycles());
- ioWriteBytes.fetch_add(ret);
- ++ioWrites;
- if ((ret==(size32_t)-1) || (ret < len))
- throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?");
- return ret;
- }
- offset_t appendFile(IFile *file,offset_t pos,offset_t len)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- const char * fname = file->queryFilename();
- sendBuffer.append((RemoteFileCommandType)RFCappend).append(handle).append(fname).append(pos).append(len);
- parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry not safe
- offset_t ret;
- replyBuffer.read(ret);
- if ((ret==(offset_t)-1) || ((len != ((offset_t)-1)) && (ret < len)))
- throw createDafsException(DISK_FULL_EXCEPTION_CODE,"append failed, disk full?"); // though could be file missing TBD
- return ret;
- }
- void setSize(offset_t size)
- {
- MemoryBuffer sendBuffer;
- initSendBuffer(sendBuffer);
- MemoryBuffer replyBuffer;
- sendBuffer.append((RemoteFileCommandType)RFCsetsize).append(handle).append(size);
- parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
- // retry using reopen TBD
- }
- void setDisconnectOnExit(bool set) { disconnectonexit = set; }
- void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false, bool handleErrCode=true)
- {
- parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy, handleErrCode);
- }
- };
- IFileIO *CRemoteFile::openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags)
- {
- // 0x0, 0x8, 0x10 and 0x20 are only share modes supported in this assert
- // currently only 0x0 (IFSHnone), 0x8 (IFSHread) and 0x10 (IFSHfull) are used so this could be 0xffffffe7
- // note: IFSHfull also includes read sharing (ie write|read)
- assertex(((unsigned)shmode&0xffffffc7)==0);
- compatIFSHmode compatmode;
- unsigned fileflags = (flags>>16) & (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH);
- if (fileflags&S_IXUSR) // this is bit hit and miss but backward compatible
- compatmode = compatIFSHexec;
- else if (fileflags&(S_IWGRP|S_IWOTH))
- compatmode = compatIFSHall;
- else if (shmode&IFSHfull)
- compatmode = compatIFSHwrite;
- else if (((shmode&(IFSHread|IFSHfull))==0) && ((fileflags&(S_IRGRP|S_IROTH))==0))
- compatmode = compatIFSHnone;
- else
- compatmode = compatIFSHread;
- Owned<CRemoteFileIO> res = new CRemoteFileIO(this);
- if (res->open(mode,compatmode,extraFlags))
- return res.getClear();
- return NULL;
- }
- IFileIO * CRemoteFile::open(IFOmode mode,IFEflags extraFlags)
- {
- return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)),extraFlags);
- }
- //---------------------------------------------------------------------------
- void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags)
- {
- CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile);
- if (dstfile&&!dstfile->queryEp().isLocal()) {
- StringBuffer tmpname;
- Owned<IFile> destf;
- RemoteFilename dest;
- if (usetmp) {
- makeTempCopyName(tmpname,dstfile->queryLocalName());
- dest.setPath(dstfile->queryEp(),tmpname.str());
- }
- else
- dest.setPath(dstfile->queryEp(),dstfile->queryLocalName());
- destf.setown(createIFile(dest));
- try {
- // following may fail if new dafilesrv not deployed on src
- copySection(dest,(offset_t)-1,0,(offset_t)-1,progress,copyFlags);
- if (usetmp) {
- StringAttr tail(pathTail(dstfile->queryLocalName()));
- dstfile->remove();
- destf->rename(tail);
- }
- return;
- }
- catch (IException *e)
- {
- StringBuffer s;
- s.appendf("Remote File Copy (%d): ",e->errorCode());
- e->errorMessage(s);
- s.append(", retrying local");
- WARNLOG("%s",s.str());
- e->Release();
- }
- // delete dest
- try {
- destf->remove();
- }
- catch (IException *e)
- {
- EXCLOG(e,"Remote File Copy, Deleting temporary file");
- e->Release();
- }
- }
- // assumption if we get here that source remote, dest local (or equiv)
- class cIntercept: implements ICopyFileIntercept
- {
- MemoryAttr ma;
- MemoryBuffer mb;
- virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz)
- {
- if (ma.length()<sz)
- ma.allocate(sz); // may be not used
- void *buf = ma.bufferBase();
- size32_t got;
- CRemoteFileIO *srcio = QUERYINTERFACE(from,CRemoteFileIO);
- const void *dst;
- if (srcio)
- dst = srcio->doRead(ofs,sz,mb.clear(),got,buf);
- else {
- // shouldn't ever get here if source remote
- got = from->read(ofs, sz, buf);
- dst = buf;
- }
- if (got != 0)
- to->write(ofs, got, dst);
- return got;
- }
- } intercept;
- doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
- }
- ////////////////
- void remoteExtractBlobElements(const SocketEndpoint &ep,const char * prefix, const char * filename, ExtractedBlobArray & extracted)
- {
- Owned<CRemoteFile> file = new CRemoteFile (ep,filename);
- file->remoteExtractBlobElements(prefix, extracted);
- }
- //---------------------------------------------------------------------------
- // Local mount redirect
- struct CLocalMountRec: public CInterface
- {
- IpAddress ip;
- StringAttr dir; // dir path on remote ip
- StringAttr local; // local dir path
- };
- static CIArrayOf<CLocalMountRec> localMounts;
- static CriticalSection localMountCrit;
- void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
- {
- CriticalBlock block(localMountCrit);
- ForEachItemInRev(i,localMounts) {
- CLocalMountRec &mount = localMounts.item(i);
- if (dir==NULL) { // remove all matching mount
- if (!mountdir)
- return;
- if (strcmp(mount.local,mountdir)==0)
- localMounts.remove(i);
- }
- else if (mount.ip.ipequals(ip)&&(strcmp(mount.dir,dir)==0)) {
- if (mountdir) {
- mount.local.set(mountdir);
- return;
- }
- else
- localMounts.remove(i);
- }
- }
- if (dir&&mountdir) {
- CLocalMountRec &mount = *new CLocalMountRec;
- mount.ip.ipset(ip);
- mount.dir.set(dir);
- mount.local.set(mountdir);
- localMounts.append(mount);
- }
- }
- IFile *createFileLocalMount(const IpAddress &ip, const char * filename)
- {
- CriticalBlock block(localMountCrit);
- ForEachItemInRev(i,localMounts) {
- CLocalMountRec &mount = localMounts.item(i);
- if (mount.ip.ipequals(ip)) {
- size32_t bl = mount.dir.length();
- if (isPathSepChar(mount.dir[bl-1]))
- bl--;
- if ((memcmp((void *)filename,(void *)mount.dir.get(),bl)==0)&&(isPathSepChar(filename[bl])||!filename[bl])) { // match
- StringBuffer locpath(mount.local);
- if (filename[bl])
- addPathSepChar(locpath).append(filename+bl+1);
- locpath.replace((PATHSEPCHAR=='\\')?'/':'\\',PATHSEPCHAR);
- return createIFile(locpath.str());
- }
- }
- }
- return NULL;
- }
- IFile * createRemoteFile(SocketEndpoint &ep, const char * filename)
- {
- IFile *ret = createFileLocalMount(ep,filename);
- if (ret)
- return ret;
- return new CRemoteFile(ep, filename);
- }
- void clientDisconnectRemoteFile(IFile *file)
- {
- CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
- if (cfile)
- cfile->disconnect();
- }
- bool clientResetFilename(IFile *file, const char *newname) // returns false if not remote
- {
- CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
- if (!cfile)
- return false;
- cfile->resetLocalFilename(newname);
- return true;
- }
- IFile *createDaliServixFile(const RemoteFilename & file)
- {
- SocketEndpoint ep(file.queryEndpoint());
- setDafsEndpointPort(ep);
- if (ep.isNull())
- return NULL;
- StringBuffer path;
- file.getLocalPath(path);
- return createRemoteFile(ep, path.str());
- }
- void clientDisconnectRemoteIoOnExit(IFileIO *fileio, bool set)
- {
- CRemoteFileIO *cfileio = QUERYINTERFACE(fileio,CRemoteFileIO);
- if (cfileio)
- cfileio->setDisconnectOnExit(set);
- }
- void setDaliServixSocketCaching(bool set)
- {
- clientSetDaliServixSocketCaching(set);
- }
- void disconnectRemoteFile(IFile *file)
- {
- clientDisconnectRemoteFile(file);
- }
- void disconnectRemoteIoOnExit(IFileIO *fileio,bool set)
- {
- clientDisconnectRemoteIoOnExit(fileio,set);
- }
- bool resetRemoteFilename(IFile *file, const char *newname)
- {
- return clientResetFilename(file,newname);
- }
- extern bool clientAsyncCopyFileSection(const char *uuid,
- IFile *from, // expected to be remote
- RemoteFilename &to,
- offset_t toOfs, // -1 created file and copies to start
- offset_t fromOfs,
- offset_t size,
- ICopyFileProgress *progress,
- unsigned timeout) // returns true when done
- {
- CRemoteFile *cfile = QUERYINTERFACE(from,CRemoteFile);
- if (!cfile) {
- // local - do sync
- from->copySection(to,toOfs,fromOfs,size,progress);
- return true;
- }
- return cfile->copySectionAsync(uuid,to,toOfs,fromOfs, size, progress, timeout);
- }
- bool asyncCopyFileSection(const char *uuid, // from genUUID - must be same for subsequent calls
- IFile *from, // expected to be remote
- RemoteFilename &to,
- offset_t toofs, // (offset_t)-1 created file and copies to start
- offset_t fromofs,
- offset_t size, // (offset_t)-1 for all file
- ICopyFileProgress *progress,
- unsigned timeout // 0 to start, non-zero to wait
- )
- {
- return clientAsyncCopyFileSection(uuid,from,to,toofs,fromofs,size,progress,timeout);
- }
- void setRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
- {
- clientSetRemoteFileTimeouts(maxconnecttime,maxreadtime);
- }
- unsigned validateNodes(const SocketEndpointArray &epso,const char *dataDir, const char *mirrorDir, bool chkver, SocketEndpointArray &failures, UnsignedArray &failedcodes, StringArray &failedmessages, const char *filename)
- {
- // used for detecting duff nodes
- IPointerArrayOf<ISocket> sockets;
- // dedup nodes
- SocketEndpointArray eps;
- ForEachItemIn(i1,epso)
- eps.appendUniq(epso.element(i1));
- unsigned to=30*1000;
- unsigned n=eps.ordinality(); // use approx log scale (timeout is long but only for failure situation)
- while (n>1) {
- n/=2;
- to+=30*1000;
- }
- multiConnect(eps,sockets,to);
- ForEachItemIn(i,eps) {
- if (sockets.item(i)==NULL) {
- failures.append(eps.item(i));
- failedcodes.append(DAFS_VALIDATE_CONNECT_FAIL);
- failedmessages.append("Connect failure");
- }
- }
- CriticalSection sect;
- class casyncfor: public CAsyncFor
- {
- const SocketEndpointArray &eps;
- const IPointerArrayOf<ISocket> &sockets;
- SocketEndpointArray &failures;
- StringArray &failedmessages;
- UnsignedArray &failedcodes;
- CriticalSection §
- StringAttr dataDir, mirrorDir;
- bool chkv;
- const char *filename;
- public:
- casyncfor(const SocketEndpointArray &_eps,const IPointerArrayOf<ISocket> &_sockets,const char *_dataDir,const char *_mirrorDir,bool _chkv, const char *_filename,SocketEndpointArray &_failures, StringArray &_failedmessages,UnsignedArray &_failedcodes,CriticalSection &_sect)
- : eps(_eps), sockets(_sockets), failures(_failures),
- failedmessages(_failedmessages), failedcodes(_failedcodes), sect(_sect),
- dataDir(_dataDir), mirrorDir(_mirrorDir)
- {
- chkv = _chkv;
- filename = _filename;
- }
- void Do(unsigned i)
- {
- ISocket *sock = sockets.item(i);
- if (!sock)
- return;
- SocketEndpoint ep = eps.item(i);
- bool iswin;
- unsigned code = 0;
- StringBuffer errstr;
- StringBuffer ver;
- try {
- getRemoteVersion(sock,ver);
- iswin = (strstr(ver.str(),"Windows")!=NULL);
- }
- catch (IException *e)
- {
- code = DAFS_VALIDATE_CONNECT_FAIL;
- e->errorMessage(errstr);
- e->Release();
- }
- if (!code&&chkv) {
- const char *rv = ver.str();
- const char *v = DAFILESRV_VERSIONSTRING;
- while (*v&&(*v!='-')&&(*v==*rv)) {
- v++;
- rv++;
- }
- if (*rv!=*v) {
- if (*rv) {
- while (*rv&&(*rv!='-'))
- rv++;
- while (*v&&(*v!='-'))
- v++;
- StringBuffer wanted(v-DAFILESRV_VERSIONSTRING,DAFILESRV_VERSIONSTRING);
- ver.setLength(rv-ver.str());
- if (strcmp(ver.str(),wanted.str())<0) { // allow >
- code = DAFS_VALIDATE_BAD_VERSION;
- errstr.appendf("Mismatch dafilesrv version ");
- errstr.append(rv-ver.str(),ver.str());
- errstr.append(", wanted ");
- errstr.append(v-DAFILESRV_VERSIONSTRING,DAFILESRV_VERSIONSTRING);
- }
- }
- else {
- code = DAFS_VALIDATE_CONNECT_FAIL;
- errstr.appendf("could not contact dafilesrv");
- }
- }
- }
- if (!code&&(dataDir.get()||mirrorDir.get())) {
- clientAddSocketToCache(ep,sock);
- const char *drivePath = NULL;
- const char *drivePaths[2];
- unsigned drives=0;
- if (mirrorDir.get()) drivePaths[drives++] = mirrorDir.get();
- if (dataDir.get()) drivePaths[drives++] = dataDir.get();
- do
- {
- StringBuffer path(drivePaths[--drives]);
- addPathSepChar(path);
- if (filename)
- path.append(filename);
- else {
- path.append("dafs_");
- genUUID(path);
- path.append(".tmp");
- }
- RemoteFilename rfn;
- rfn.setPath(ep,path);
- Owned<IFile> file = createIFile(rfn);
- size32_t sz;
- StringBuffer ds;
- try {
- StringBuffer fullPath;
- rfn.getRemotePath(fullPath);
- recursiveCreateDirectoryForFile(fullPath);
- Owned<IFileIO> fileio = file->open(IFOcreate);
- CDateTime dt;
- dt.setNow();
- dt.getString(ds);
- sz = ds.length()+1;
- assertex(sz<64);
- fileio->write(0,sz,ds.str());
- }
- catch (IException *e) {
- if (e->errorCode()==DISK_FULL_EXCEPTION_CODE)
- code |= (drivePath==dataDir.get()?DAFS_VALIDATE_DISK_FULL_DATA:DAFS_VALIDATE_DISK_FULL_MIRROR);
- else
- code |= (drivePath==dataDir.get()?DAFS_VALIDATE_WRITE_FAIL_DATA:DAFS_VALIDATE_WRITE_FAIL_MIRROR);
- if (errstr.length())
- errstr.append(',');
- e->errorMessage(errstr);
- e->Release();
- continue; // no use trying read
- }
- try {
- Owned<IFileIO> fileio = file->open(IFOread);
- char buf[64];
- size32_t rd = fileio->read(0,sizeof(buf)-1,buf);
- if ((rd!=sz)||(memcmp(buf,ds.str(),sz)!=0)) {
- StringBuffer s;
- ep.getIpText(s);
- throw MakeStringException(-1,"Data discrepancy on disk read of %s of %s",path.str(),s.str());
- }
- }
- catch (IException *e) {
- code |= (drivePath==dataDir.get()?DAFS_VALIDATE_READ_FAIL_DATA:DAFS_VALIDATE_READ_FAIL_MIRROR);
- if (errstr.length())
- errstr.append(',');
- e->errorMessage(errstr);
- e->Release();
- }
- if (!filename||!*filename) {
- // delete file created
- try {
- file->remove();
- }
- catch (IException *e) {
- e->Release(); // supress error
- }
- }
- }
- while (0 != drives);
- }
- if (code) {
- CriticalBlock block(sect);
- failures.append(ep);
- failedcodes.append(code);
- failedmessages.append(errstr.str());
- }
- }
- } afor(eps,sockets,dataDir,mirrorDir,chkver,filename,failures,failedmessages,failedcodes,sect);
- afor.For(eps.ordinality(), 10, false, true);
- return failures.ordinality();
- }
- static PointerArrayOf<SharedObject> *hookDlls;
- static void installFileHook(const char *hookFileSpec);
- extern DAFSCLIENT_API void installFileHooks(const char *hookFileSpec)
- {
- if (!hookDlls)
- hookDlls = new PointerArrayOf<SharedObject>;
- const char * cursor = hookFileSpec;
- for (;*cursor;)
- {
- StringBuffer file;
- while (*cursor && *cursor != ENVSEPCHAR)
- file.append(*cursor++);
- if(*cursor)
- cursor++;
- if(!file.length())
- continue;
- installFileHook(file);
- }
- }
- typedef void (*HookInstallFunction)();
- static void installFileHook(const char *hookFile)
- {
- StringBuffer dirPath, dirTail, absolutePath;
- splitFilename(hookFile, &dirPath, &dirPath, &dirTail, &dirTail);
- makeAbsolutePath(dirPath.str(), absolutePath);
- if (!containsFileWildcard(dirTail))
- {
- addPathSepChar(absolutePath).append(dirTail);
- Owned<IFile> file = createIFile(absolutePath);
- if (file->isDirectory() == foundYes)
- {
- installFileHooks(addPathSepChar(absolutePath).append('*'));
- }
- else if (file->isFile() == foundYes)
- {
- HookInstallFunction hookInstall;
- SharedObject *so = new SharedObject(); // MORE - this leaks! Kind-of deliberate right now...
- if (so->load(file->queryFilename(), false) &&
- (hookInstall = (HookInstallFunction) GetSharedProcedure(so->getInstanceHandle(), "installFileHook")) != NULL)
- {
- hookInstall();
- hookDlls->append(so);
- }
- else
- {
- so->unload();
- delete so;
- DBGLOG("File hook library %s could not be loaded", hookFile);
- }
- }
- else
- {
- DBGLOG("File hook library %s not found", hookFile);
- }
- }
- else
- {
- Owned<IDirectoryIterator> dir = createDirectoryIterator(absolutePath, dirTail);
- ForEach(*dir)
- {
- const char *name = dir->query().queryFilename();
- if (name && *name && *name != '.')
- installFileHook(name);
- }
- }
- }
- // Should be called before closedown, ideally. MODEXIT tries to mop up but may be too late to do so cleanly
- extern DAFSCLIENT_API void removeFileHooks()
- {
- if (hookDlls)
- {
- ForEachItemIn(idx, *hookDlls)
- {
- SharedObject *so = hookDlls->item(idx);
- HookInstallFunction hookInstall = (HookInstallFunction) GetSharedProcedure(so->getInstanceHandle(), "removeFileHook");
- if (hookInstall)
- hookInstall();
- delete so;
- }
- delete hookDlls;
- hookDlls = NULL;
- }
- }
- MODULE_INIT(INIT_PRIORITY_DAFSCLIENT)
- {
- if(!DaliServixIntercept)
- {
- DaliServixIntercept = new CDaliServixIntercept;
- addIFileCreateHook(DaliServixIntercept);
- }
- return true;
- }
- MODULE_EXIT()
- {
- if(DaliServixIntercept)
- {
- // delete ConnectionTable; // too late to delete (jsocket closed down)
- removeIFileCreateHook(DaliServixIntercept);
- ::Release(DaliServixIntercept);
- DaliServixIntercept = NULL;
- }
- removeFileHooks();
- }
- IDaFileSrvHook *queryDaFileSrvHook()
- {
- return DaliServixIntercept;
- }
- void enableForceRemoteReads()
- {
- const char *forceRemotePattern = queryEnvironmentConf().queryProp("forceRemotePattern");
- if (!isEmptyString(forceRemotePattern))
- queryDaFileSrvHook()->forceRemote(forceRemotePattern);
- }
- bool testForceRemote(const char *path)
- {
- const char *forceRemotePattern = queryEnvironmentConf().queryProp("forceRemotePattern");
- return !isEmptyString(forceRemotePattern) && WildMatch(path, forceRemotePattern, false);
- }
- //// legacy implementation of streaming implementations, to be replaced by dafsstream.*
- ///
- //
- class CRemoteFilteredFileIOBase : public CRemoteBase, implements IRemoteFileIO
- {
- typedef CRemoteBase PARENT;
- public:
- IMPLEMENT_IINTERFACE_O_USING(CRemoteBase);
- // Really a stream, but life (maybe) easier elsewhere if looks like a file
- // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
- CRemoteFilteredFileIOBase(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
- : CRemoteBase(ep, filename)
- {
- // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
- openRequest();
- if (queryOutputCompressionDefault())
- {
- expander.setown(getExpander(queryOutputCompressionDefault()));
- if (expander)
- {
- expandMb.setEndian(__BIG_ENDIAN);
- request.appendf("\"outputCompression\" : \"%s\",\n", queryOutputCompressionDefault());
- }
- else
- WARNLOG("Failed to created compression decompressor for: %s", queryOutputCompressionDefault());
- }
- request.appendf("\"format\" : \"binary\",\n"
- "\"node\" : {\n"
- " \"fileName\" : \"%s\"", filename);
- if (chooseN)
- request.appendf(",\n \"chooseN\" : \"%" I64F "u\"", chooseN);
- if (fieldFilters.numFilterFields())
- {
- request.append(",\n \"keyFilter\" : [\n ");
- for (unsigned idx=0; idx < fieldFilters.numFilterFields(); idx++)
- {
- auto &filter = fieldFilters.queryFilter(idx);
- StringBuffer filterString;
- filter.serialize(filterString);
- if (idx)
- request.append(",\n ");
- request.append("\"");
- encodeJSON(request, filterString.length(), filterString.str());
- request.append("\"");
- }
- request.append("\n ]");
- }
- MemoryBuffer actualTypeInfo;
- if (!dumpTypeInfo(actualTypeInfo, actual->querySerializedDiskMeta()->queryTypeInfo()))
- throw createDafsException(DAFSERR_cmdstream_unsupported_recfmt, "Format not supported by remote read");
- request.append(",\n \"inputBin\" : \"");
- JBASE64_Encode(actualTypeInfo.toByteArray(), actualTypeInfo.length(), request, false);
- request.append("\"");
- if (actual != projected)
- {
- MemoryBuffer projectedTypeInfo;
- dumpTypeInfo(projectedTypeInfo, projected->querySerializedDiskMeta()->queryTypeInfo());
- if (actualTypeInfo.length() != projectedTypeInfo.length() ||
- memcmp(actualTypeInfo.toByteArray(), projectedTypeInfo.toByteArray(), actualTypeInfo.length()))
- {
- request.append(",\n \"outputBin\": \"");
- JBASE64_Encode(projectedTypeInfo.toByteArray(), projectedTypeInfo.length(), request, false);
- request.append("\"");
- }
- }
- bufPos = 0;
- }
- virtual size32_t read(offset_t pos, size32_t len, void * data) override
- {
- assertex(pos == bufPos); // Must read sequentially
- if (!bufRemaining && !eof)
- refill();
- if (eof)
- return 0;
- if (len > bufRemaining)
- len = bufRemaining;
- bufPos += len;
- bufRemaining -= len;
- memcpy(data, reply.readDirect(len), len);
- return len;
- }
- virtual offset_t size() override { return -1; }
- virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); }
- virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override { throwUnexpected(); }
- virtual void setSize(offset_t size) override { throwUnexpected(); }
- virtual void flush() override { throwUnexpected(); }
- virtual void close() override
- {
- PARENT::close(handle);
- handle = 0;
- }
- virtual unsigned __int64 getStatistic(StatisticKind kind) override
- {
- /* NB: Would need new stat. categories added for this to make sense,
- * but this class is implemented as a IFileIO for convenience for now,
- * it may be refactored into another form later.
- */
- return 0;
- }
- // IRemoteFileIO
- virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) override
- {
- virtualFields[fieldName] = fieldValue;
- }
- virtual void ensureAvailable() override
- {
- if (firstRequest)
- handleFirstRequest();
- }
- protected:
- StringBuffer &openRequest()
- {
- return request.append("{\n");
- }
- StringBuffer &closeRequest()
- {
- return request.append("\n }\n");
- }
- void addVirtualFields()
- {
- request.append(", \n \"virtualFields\" : {\n");
- bool first=true;
- for (auto &e : virtualFields)
- {
- if (!first)
- request.append(",\n");
- request.appendf(" \"%s\" : \"%s\"", e.first.c_str(), e.second.c_str());
- first = false;
- }
- request.append(" }");
- }
- void handleFirstRequest()
- {
- firstRequest = false;
- addVirtualFields();
- closeRequest();
- sendRequest(0, nullptr);
- }
- void refill()
- {
- if (firstRequest)
- {
- handleFirstRequest();
- return;
- }
- size32_t cursorLength;
- reply.read(cursorLength);
- if (!cursorLength)
- {
- eof = true;
- return;
- }
- MemoryBuffer mrequest;
- MemoryBuffer newReply;
- initSendBuffer(mrequest);
- mrequest.append((RemoteFileCommandType)RFCStreamRead);
- VStringBuffer json("{ \"handle\" : %u, \"format\" : \"binary\" }", handle);
- mrequest.append(json.length(), json.str());
- sendRemoteCommand(mrequest, newReply);
- unsigned newHandle;
- newReply.read(newHandle);
- if (newHandle == handle)
- {
- reply.swapWith(newReply);
- reply.read(bufRemaining);
- eof = (bufRemaining == 0);
- if (expander)
- {
- size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
- expandMb.clear().reserve(expandedSz);
- expander->expand(expandMb.bufferBase());
- expandMb.swapWith(reply);
- }
- }
- else
- {
- assertex(newHandle == 0);
- sendRequest(cursorLength, reply.readDirect(cursorLength));
- }
- }
- void sendRequest(unsigned cursorLen, const void *cursorData)
- {
- MemoryBuffer mrequest;
- initSendBuffer(mrequest);
- mrequest.append((RemoteFileCommandType)RFCStreamRead);
- mrequest.append(request.length(), request.str());
- if (cursorLen)
- {
- StringBuffer cursorInfo;
- cursorInfo.append(",\"cursorBin\": \"");
- JBASE64_Encode(cursorData, cursorLen, cursorInfo, false);
- cursorInfo.append("\"\n");
- mrequest.append(cursorInfo.length(), cursorInfo.str());
- }
- if (TF_TRACE_FULL)
- PROGLOG("req = <%s}>", request.str());
- mrequest.append(3, " \n}");
- sendRemoteCommand(mrequest, reply);
- reply.read(handle);
- reply.read(bufRemaining);
- eof = (bufRemaining == 0);
- if (expander)
- {
- size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
- expandMb.clear().reserve(expandedSz);
- expander->expand(expandMb.bufferBase());
- expandMb.swapWith(reply);
- }
- }
- StringBuffer request;
- MemoryBuffer reply;
- unsigned handle = 0;
- size32_t bufRemaining = 0;
- offset_t bufPos = 0;
- bool eof = false;
- bool firstRequest = true;
- std::unordered_map<std::string, std::string> virtualFields;
- Owned<IExpander> expander;
- MemoryBuffer expandMb;
- };
- class CRemoteFilteredFileIO : public CRemoteFilteredFileIOBase
- {
- public:
- // Really a stream, but life (maybe) easier elsewhere if looks like a file
- // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
- CRemoteFilteredFileIO(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
- : CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
- {
- // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
- request.appendf(",\n \"kind\" : \"diskread\",\n"
- " \"compressed\" : \"%s\",\n"
- " \"inputGrouped\" : \"%s\",\n"
- " \"outputGrouped\" : \"%s\"", boolToStr(compressed), boolToStr(grouped), boolToStr(grouped));
- }
- };
- class CRemoteFilteredRowStream : public CRemoteFilteredFileIO, implements IRowStream
- {
- public:
- CRemoteFilteredRowStream(const RtlRecord &_recInfo, SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped)
- : CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, 0), recInfo(_recInfo)
- {
- }
- virtual const byte *queryNextRow() // NOTE - rows returned must NOT be freed
- {
- if (!bufRemaining && !eof)
- refill();
- if (eof)
- return nullptr;
- unsigned len = recInfo.getRecordSize(reply.readDirect(0));
- bufPos += len;
- bufRemaining -= len;
- return reply.readDirect(len);
- }
- virtual void stop() override
- {
- close();
- eof = true;
- }
- protected:
- const RtlRecord &recInfo;
- };
- static StringAttr remoteOutputCompressionDefault;
- void setRemoteOutputCompressionDefault(const char *type)
- {
- if (!isEmptyString(type))
- remoteOutputCompressionDefault.set(type);
- }
- const char *queryOutputCompressionDefault() { return remoteOutputCompressionDefault; }
- extern IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
- {
- try
- {
- return new CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, chooseN);
- }
- catch (IException *e)
- {
- EXCLOG(e, nullptr);
- e->Release();
- }
- return nullptr;
- }
- class CRemoteFilteredKeyIO : public CRemoteFilteredFileIOBase
- {
- public:
- // Really a stream, but life (maybe) easier elsewhere if looks like a file
- // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
- CRemoteFilteredKeyIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
- : CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
- {
- request.appendf(",\n \"kind\" : \"indexread\"");
- request.appendf(",\n \"crc\" : \"%u\"", crc);
- }
- };
- class CRemoteFilteredKeyCountIO : public CRemoteFilteredFileIOBase
- {
- public:
- // Really a stream, but life (maybe) easier elsewhere if looks like a file
- // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
- CRemoteFilteredKeyCountIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, const RowFilter &fieldFilters, unsigned __int64 rowLimit)
- : CRemoteFilteredFileIOBase(ep, filename, actual, actual, fieldFilters, rowLimit)
- {
- request.appendf(",\n \"kind\" : \"indexcount\"");
- request.appendf(",\n \"crc\" : \"%u\"", crc);
- }
- };
- class CRemoteKey : public CSimpleInterfaceOf<IIndexLookup>
- {
- Owned<IRemoteFileIO> iRemoteFileIO;
- offset_t pos = 0;
- Owned<ISourceRowPrefetcher> prefetcher;
- CThorContiguousRowBuffer prefetchBuffer;
- Owned<ISerialStream> strm;
- bool pending = false;
- SocketEndpoint ep;
- StringAttr filename;
- unsigned crc;
- Linked<IOutputMetaData> actual, projected;
- RowFilter fieldFilters;
- public:
- CRemoteKey(SocketEndpoint &_ep, const char *_filename, unsigned _crc, IOutputMetaData *_actual, IOutputMetaData *_projected, const RowFilter &_fieldFilters, unsigned __int64 rowLimit)
- : ep(_ep), filename(_filename), crc(_crc), actual(_actual), projected(_projected)
- {
- for (unsigned f=0; f<_fieldFilters.numFilterFields(); f++)
- fieldFilters.addFilter(OLINK(_fieldFilters.queryFilter(f)));
- iRemoteFileIO.setown(new CRemoteFilteredKeyIO(ep, filename, crc, actual, projected, fieldFilters, rowLimit));
- if (!iRemoteFileIO)
- throwStringExceptionV(DAFSERR_cmdstream_openfailure, "Unable to open remote key part: '%s'", filename.get());
- strm.setown(createFileSerialStream(iRemoteFileIO));
- prefetcher.setown(projected->createDiskPrefetcher());
- assertex(prefetcher);
- prefetchBuffer.setStream(strm);
- }
- // IIndexLookup
- virtual void ensureAvailable() override
- {
- iRemoteFileIO->ensureAvailable(); // will throw an exception if fails
- }
- virtual unsigned __int64 getCount() override
- {
- return checkCount(0);
- }
- virtual unsigned __int64 checkCount(unsigned __int64 limit) override
- {
- Owned<IFileIO> iFileIO = new CRemoteFilteredKeyCountIO(ep, filename, crc, actual, fieldFilters, limit);
- unsigned __int64 result;
- iFileIO->read(0, sizeof(result), &result);
- return result;
- }
- virtual const void *nextKey() override
- {
- if (pending)
- prefetchBuffer.finishedRow();
- if (prefetchBuffer.eos())
- return nullptr;
- prefetcher->readAhead(prefetchBuffer);
- pending = true;
- return prefetchBuffer.queryRow();
- }
- virtual unsigned querySeeks() const override { return 0; } // not sure how best to handle these, perhaps should log/record somewhere on server-side
- virtual unsigned queryScans() const override { return 0; }
- virtual unsigned querySkips() const override { return 0; }
- };
- extern IIndexLookup *createRemoteFilteredKey(SocketEndpoint &ep, const char * filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
- {
- try
- {
- return new CRemoteKey(ep, filename, crc, actual, projected, fieldFilters, chooseN);
- }
- catch (IException *e)
- {
- EXCLOG(e, nullptr);
- e->Release();
- }
- return nullptr;
- }
|