sockfile.cpp 235 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514551555165517551855195520552155225523552455255526552755285529553055315532553355345535553655375538553955405541554255435544554555465547554855495550555155525553555455555556555755585559556055615562556355645565556655675568556955705571557255735574557555765577557855795580558155825583558455855586558755885589559055915592559355945595559655975598559956005601560256035604560556065607560856095610561156125613561456155616561756185619562056215622562356245625562656275628562956305631563256335634563556365637563856395640564156425643564456455646564756485649565056515652565356545655565656575658565956605661566256635664566556665667566856695670567156725673567456755676567756785679568056815682568356845685568656875688568956905691569256935694569556965697569856995700570157025703570457055706570757085709571057115712571357145715571657175718571957205721572257235724572557265727572857295730573157325733573457355736573757385739574057415742574357445745574657475748574957505751575257535754575557565757575857595760576157625763576457655766576757685769577057715772577357745775577657775778577957805781578257835784578557865787578857895790579157925793579457955796579757985799580058015802580358045805580658075808580958105811581258135814581558165817581858195820582158225823582458255826582758285829583058315832583358345835583658375838583958405841584258435844584558465847584858495850585158525853585458555856585758585859586058615862586358645865586658675868586958705871587258735874587558765877587858795880588158825883588458855886588758885889589058915892589358945895589658975898589959005901590259035904590559065907590859095910591159125913591459155916591759185919592059215922592359245925592659275928592959305931593259335934593559365937593859395940594159425943594459455946594759485949595059515952595359545955595659575958595959605961596259635964596559665967596859695970597159725973597459755976597759785979598059815982598359845985598659875988598959905991599259935994599559965997599859996000600160026003600460056006600760086009601060116012601360146015601660176018601960206021602260236024602560266027602860296030603160326033603460356036603760386039604060416042604360446045604660476048604960506051605260536054605560566057605860596060606160626063606460656066606760686069607060716072607360746075607660776078607960806081608260836084608560866087608860896090609160926093609460956096609760986099610061016102610361046105610661076108610961106111611261136114611561166117611861196120612161226123612461256126612761286129613061316132613361346135613661376138613961406141614261436144614561466147614861496150615161526153615461556156615761586159616061616162616361646165616661676168616961706171617261736174617561766177617861796180618161826183618461856186618761886189619061916192619361946195619661976198619962006201620262036204620562066207620862096210621162126213621462156216621762186219622062216222622362246225622662276228622962306231623262336234623562366237623862396240624162426243624462456246624762486249625062516252625362546255625662576258625962606261626262636264626562666267626862696270627162726273627462756276627762786279628062816282628362846285628662876288628962906291629262936294629562966297629862996300630163026303630463056306630763086309631063116312631363146315631663176318631963206321632263236324632563266327632863296330633163326333633463356336633763386339634063416342634363446345634663476348634963506351635263536354635563566357635863596360636163626363636463656366636763686369637063716372637363746375637663776378637963806381638263836384638563866387638863896390639163926393639463956396639763986399640064016402640364046405640664076408640964106411641264136414641564166417641864196420642164226423642464256426642764286429643064316432643364346435643664376438643964406441644264436444644564466447644864496450645164526453645464556456645764586459646064616462646364646465646664676468646964706471647264736474647564766477647864796480648164826483648464856486648764886489649064916492649364946495649664976498649965006501650265036504650565066507650865096510651165126513651465156516651765186519652065216522652365246525652665276528652965306531653265336534653565366537653865396540654165426543654465456546654765486549655065516552655365546555655665576558655965606561656265636564656565666567656865696570657165726573657465756576657765786579658065816582658365846585658665876588658965906591659265936594659565966597659865996600660166026603660466056606660766086609661066116612661366146615661666176618661966206621662266236624662566266627662866296630663166326633663466356636663766386639664066416642664366446645664666476648664966506651665266536654665566566657665866596660666166626663666466656666666766686669667066716672667366746675667666776678667966806681668266836684668566866687668866896690669166926693669466956696669766986699670067016702670367046705670667076708670967106711671267136714671567166717671867196720672167226723672467256726672767286729673067316732673367346735673667376738673967406741674267436744674567466747674867496750675167526753675467556756675767586759676067616762676367646765676667676768676967706771677267736774677567766777677867796780678167826783678467856786678767886789679067916792679367946795679667976798679968006801680268036804680568066807680868096810681168126813681468156816681768186819682068216822682368246825682668276828682968306831683268336834683568366837683868396840684168426843684468456846684768486849685068516852685368546855685668576858685968606861686268636864686568666867686868696870687168726873687468756876687768786879688068816882688368846885688668876888688968906891689268936894689568966897689868996900690169026903690469056906690769086909691069116912691369146915691669176918691969206921692269236924692569266927692869296930693169326933693469356936693769386939694069416942694369446945694669476948694969506951695269536954695569566957695869596960696169626963696469656966696769686969697069716972697369746975697669776978697969806981698269836984698569866987698869896990699169926993699469956996699769986999700070017002700370047005700670077008700970107011701270137014701570167017701870197020702170227023702470257026702770287029703070317032703370347035703670377038703970407041704270437044704570467047704870497050705170527053705470557056705770587059706070617062706370647065706670677068706970707071707270737074707570767077707870797080708170827083708470857086708770887089709070917092709370947095709670977098709971007101710271037104710571067107710871097110711171127113711471157116711771187119712071217122712371247125712671277128
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // todo look at IRemoteFileServer stop
  14. #include "platform.h"
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jmisc.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "securesocket.hpp"
  24. #include "sockfile.hpp"
  25. #include "portlist.h"
  26. #include "jsocket.hpp"
  27. #include "jencrypt.hpp"
  28. #include "jset.hpp"
  29. #include "jhtree.hpp"
  30. #include "remoteerr.hpp"
  31. #include <atomic>
  32. #include "rtldynfield.hpp"
  33. #include "rtlds_imp.hpp"
  34. #include "rtlread_imp.hpp"
  35. #include "rtlrecord.hpp"
  36. #include "eclhelper_dyn.hpp"
  37. #include "rtlcommon.hpp"
  38. #include "rtlformat.hpp"
  39. #define SOCKET_CACHE_MAX 500
  40. #define MIN_KEYFILTSUPPORT_VERSION 20
  41. #ifdef _DEBUG
  42. //#define SIMULATE_PACKETLOSS 1
  43. #endif
  44. #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger)
  45. #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed
  46. #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day
  47. static const unsigned __int64 defaultFileStreamChooseN = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
  48. static const unsigned __int64 defaultFileStreamSkipN = 0;
  49. static const unsigned __int64 defaultFileStreamRowLimit = (unsigned __int64) -1;
  50. static const unsigned __int64 defaultDaFSNumRecs = 100;
  51. enum OutputFormat { outFmt_Binary, outFmt_Xml, outFmt_Json };
  52. #if SIMULATE_PACKETLOSS
  53. #define TESTING_FAILURE_RATE_LOST_SEND 10 // per 1000
  54. #define TESTING_FAILURE_RATE_LOST_RECV 10 // per 1000
  55. #define DUMMY_TIMEOUT_MAX (1000*10)
  56. static bool errorSimulationOn = true;
  57. static ISocket *timeoutreadsock = NULL; // used to trigger
  58. struct dummyReadWrite
  59. {
  60. class X
  61. {
  62. dummyReadWrite *parent;
  63. public:
  64. X(dummyReadWrite *_parent)
  65. {
  66. parent = _parent;
  67. }
  68. ~X()
  69. {
  70. delete parent;
  71. }
  72. };
  73. class TimeoutSocketException: public CInterface, public IJSOCK_Exception
  74. {
  75. public:
  76. IMPLEMENT_IINTERFACE;
  77. TimeoutSocketException()
  78. {
  79. }
  80. virtual ~TimeoutSocketException()
  81. {
  82. }
  83. int errorCode() const { return JSOCKERR_timeout_expired; }
  84. StringBuffer & errorMessage(StringBuffer &str) const
  85. {
  86. return str.append("timeout expired");
  87. }
  88. MessageAudience errorAudience() const
  89. {
  90. return MSGAUD_user;
  91. }
  92. };
  93. ISocket *sock;
  94. dummyReadWrite(ISocket *_sock)
  95. {
  96. sock = _sock;
  97. }
  98. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, time_t timeout)
  99. {
  100. X x(this);
  101. unsigned t = msTick();
  102. unsigned r = getRandom();
  103. bool timeoutread = (timeoutreadsock==sock);
  104. timeoutreadsock=NULL;
  105. if (!timeoutread)
  106. sock->readtms(buf, min_size, max_size, size_read, timeout);
  107. if (timeoutread||(errorSimulationOn&&(TESTING_FAILURE_RATE_LOST_RECV>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_RECV))) {
  108. PrintStackReport();
  109. if (timeoutread)
  110. PROGLOG("** Simulate timeout");
  111. else
  112. PROGLOG("** Simulate Packet loss (size %d,%d)",min_size,max_size);
  113. if (timeout>DUMMY_TIMEOUT_MAX)
  114. timeout = DUMMY_TIMEOUT_MAX;
  115. t = msTick()-t;
  116. if (t<timeout)
  117. Sleep(timeout-t);
  118. IJSOCK_Exception *e = new TimeoutSocketException;
  119. throw e;
  120. }
  121. }
  122. size32_t write(void const* buf, size32_t size)
  123. {
  124. X x(this);
  125. timeoutreadsock=NULL;
  126. unsigned r = getRandom();
  127. if (errorSimulationOn&&(TESTING_FAILURE_RATE_LOST_SEND>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_SEND)) {
  128. PrintStackReport();
  129. PROGLOG("** Simulate Packet loss (size %d)",size);
  130. timeoutreadsock=sock;
  131. return size;
  132. }
  133. return sock->write(buf,size);
  134. }
  135. };
  136. #define SOCKWRITE(sock) (new dummyReadWrite(sock))->write
  137. #define SOCKREADTMS(sock) (new dummyReadWrite(sock))->readtms
  138. #else
  139. #define SOCKWRITE(sock) sock->write
  140. #define SOCKREADTMS(sock) sock->readtms
  141. #endif
  142. // backward compatible modes
  143. typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
  144. static const char *VERSTRING= "DS V2.1" // dont forget FILESRV_VERSION in header
  145. #ifdef _WIN32
  146. "Windows ";
  147. #else
  148. "Linux ";
  149. #endif
  150. typedef unsigned char RemoteFileCommandType;
  151. typedef int RemoteFileIOHandle;
  152. static unsigned maxConnectTime = 0;
  153. static unsigned maxReceiveTime = 0;
  154. //Security and default port attributes
  155. static class _securitySettings
  156. {
  157. public:
  158. DAFSConnectCfg connectMethod;
  159. unsigned short daFileSrvPort;
  160. unsigned short daFileSrvSSLPort;
  161. const char * certificate;
  162. const char * privateKey;
  163. const char * passPhrase;
  164. _securitySettings()
  165. {
  166. queryDafsSecSettings(&connectMethod, &daFileSrvPort, &daFileSrvSSLPort, &certificate, &privateKey, &passPhrase);
  167. }
  168. } securitySettings;
  169. static CriticalSection secureContextCrit;
  170. static Owned<ISecureSocketContext> secureContextServer;
  171. static Owned<ISecureSocketContext> secureContextClient;
  172. #ifdef _USE_OPENSSL
  173. static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
  174. {
  175. {
  176. CriticalBlock b(secureContextCrit);
  177. if (type == ServerSocket)
  178. {
  179. if (!secureContextServer)
  180. secureContextServer.setown(createSecureSocketContextEx(securitySettings.certificate, securitySettings.privateKey, securitySettings.passPhrase, type));
  181. }
  182. else if (!secureContextClient)
  183. secureContextClient.setown(createSecureSocketContext(type));
  184. }
  185. int loglevel = SSLogNormal;
  186. #ifdef _DEBUG
  187. loglevel = SSLogMax;
  188. #endif
  189. if (type == ServerSocket)
  190. return secureContextServer->createSecureSocket(sock, loglevel);
  191. else
  192. return secureContextClient->createSecureSocket(sock, loglevel);
  193. }
  194. #endif
  195. void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  196. {
  197. maxConnectTime = maxconnecttime;
  198. maxReceiveTime = maxreadtime;
  199. }
  200. struct sRFTM
  201. {
  202. CTimeMon *timemon;
  203. sRFTM(unsigned limit) { timemon = limit ? new CTimeMon(limit) : NULL; }
  204. ~sRFTM() { delete timemon; }
  205. };
  206. const char *remoteServerVersionString() { return VERSTRING; }
  207. static bool AuthenticationEnabled = true;
  208. bool enableDafsAuthentication(bool on)
  209. {
  210. bool ret = AuthenticationEnabled;
  211. AuthenticationEnabled = on;
  212. return ret;
  213. }
  214. #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
  215. #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
  216. #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
  217. // (increased when waiting for large block)
  218. #define DAFS_CONNECT_FAIL_RETRY_TIME (1000*60*15)
  219. #ifdef SIMULATE_PACKETLOSS
  220. #define NORMAL_RETRIES (1)
  221. #define LENGTHY_RETRIES (1)
  222. #else
  223. #define NORMAL_RETRIES (3)
  224. #define LENGTHY_RETRIES (12)
  225. #endif
  226. #ifdef _DEBUG
  227. static byte traceFlags=0x30;
  228. #else
  229. static byte traceFlags=0x20;
  230. #endif
  231. #define TF_TRACE (traceFlags&1)
  232. #define TF_TRACE_PRE_IO (traceFlags&2)
  233. #define TF_TRACE_FULL (traceFlags&4)
  234. #define TF_TRACE_CLIENT_CONN (traceFlags&8)
  235. #define TF_TRACE_TREE_COPY (traceFlags&0x10)
  236. #define TF_TRACE_CLIENT_STATS (traceFlags&0x20)
  237. enum {
  238. RFCopenIO, // 0
  239. RFCcloseIO,
  240. RFCread,
  241. RFCwrite,
  242. RFCsize,
  243. RFCexists,
  244. RFCremove,
  245. RFCrename,
  246. RFCgetver,
  247. RFCisfile,
  248. RFCisdirectory, // 10
  249. RFCisreadonly,
  250. RFCsetreadonly,
  251. RFCgettime,
  252. RFCsettime,
  253. RFCcreatedir,
  254. RFCgetdir,
  255. RFCstop,
  256. RFCexec, // legacy cmd removed
  257. RFCdummy1, // legacy placeholder
  258. RFCredeploy, // 20
  259. RFCgetcrc,
  260. RFCmove,
  261. // 1.5 features below
  262. RFCsetsize,
  263. RFCextractblobelements,
  264. RFCcopy,
  265. RFCappend,
  266. RFCmonitordir,
  267. RFCsettrace,
  268. RFCgetinfo,
  269. RFCfirewall, // not used currently // 30
  270. RFCunlock,
  271. RFCunlockreply,
  272. RFCinvalid,
  273. RFCcopysection,
  274. // 1.7e
  275. RFCtreecopy,
  276. // 1.7e - 1
  277. RFCtreecopytmp,
  278. // 1.8
  279. RFCsetthrottle, // legacy version
  280. // 1.9
  281. RFCsetthrottle2,
  282. RFCsetfileperms,
  283. // 2.0
  284. RFCreadfilteredindex,
  285. RFCreadfilteredindexcount,
  286. RFCreadfilteredindexblob,
  287. RFCmaxnormal,
  288. RFCStreamRead = '{',
  289. RFCmax,
  290. RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
  291. };
  292. #define RFCText(cmd) #cmd
  293. const char *RFCStrings[] =
  294. {
  295. RFCText(RFCopenIO),
  296. RFCText(RFCcloseIO),
  297. RFCText(RFCread),
  298. RFCText(RFCwrite),
  299. RFCText(RFCsize),
  300. RFCText(RFCexists),
  301. RFCText(RFCremove),
  302. RFCText(RFCrename),
  303. RFCText(RFCgetver),
  304. RFCText(RFCisfile),
  305. RFCText(RFCisdirectory),
  306. RFCText(RFCisreadonly),
  307. RFCText(RFCsetreadonly),
  308. RFCText(RFCgettime),
  309. RFCText(RFCsettime),
  310. RFCText(RFCcreatedir),
  311. RFCText(RFCgetdir),
  312. RFCText(RFCstop),
  313. RFCText(RFCexec),
  314. RFCText(RFCdummy1),
  315. RFCText(RFCredeploy),
  316. RFCText(RFCgetcrc),
  317. RFCText(RFCmove),
  318. RFCText(RFCsetsize),
  319. RFCText(RFCextractblobelements),
  320. RFCText(RFCcopy),
  321. RFCText(RFCappend),
  322. RFCText(RFCmonitordir),
  323. RFCText(RFCsettrace),
  324. RFCText(RFCgetinfo),
  325. RFCText(RFCfirewall),
  326. RFCText(RFCunlock),
  327. RFCText(RFCunlockreply),
  328. RFCText(RFCinvalid),
  329. RFCText(RFCcopysection),
  330. RFCText(RFCtreecopy),
  331. RFCText(RFCtreecopytmp),
  332. RFCText(RFCsetthrottle), // legacy version
  333. RFCText(RFCsetthrottle2),
  334. RFCText(RFCsetfileperms),
  335. RFCText(RFCreadfilteredindex),
  336. RFCText(RFCreadfilteredcount),
  337. RFCText(RFCreadfilteredblob),
  338. RFCText(RFCunknown),
  339. };
  340. static const char *getRFCText(RemoteFileCommandType cmd)
  341. {
  342. if (cmd==RFCStreamRead)
  343. return "RFCStreamRead";
  344. else
  345. {
  346. if (cmd > RFCmaxnormal)
  347. cmd = RFCmaxnormal;
  348. return RFCStrings[cmd];
  349. }
  350. }
  351. static const char *getRFSERRText(unsigned err)
  352. {
  353. switch (err)
  354. {
  355. case RFSERR_InvalidCommand:
  356. return "RFSERR_InvalidCommand";
  357. case RFSERR_NullFileIOHandle:
  358. return "RFSERR_NullFileIOHandle";
  359. case RFSERR_InvalidFileIOHandle:
  360. return "RFSERR_InvalidFileIOHandle";
  361. case RFSERR_TimeoutFileIOHandle:
  362. return "RFSERR_TimeoutFileIOHandle";
  363. case RFSERR_OpenFailed:
  364. return "RFSERR_OpenFailed";
  365. case RFSERR_ReadFailed:
  366. return "RFSERR_ReadFailed";
  367. case RFSERR_WriteFailed:
  368. return "RFSERR_WriteFailed";
  369. case RFSERR_RenameFailed:
  370. return "RFSERR_RenameFailed";
  371. case RFSERR_ExistsFailed:
  372. return "RFSERR_ExistsFailed";
  373. case RFSERR_RemoveFailed:
  374. return "RFSERR_RemoveFailed";
  375. case RFSERR_CloseFailed:
  376. return "RFSERR_CloseFailed";
  377. case RFSERR_IsFileFailed:
  378. return "RFSERR_IsFileFailed";
  379. case RFSERR_IsDirectoryFailed:
  380. return "RFSERR_IsDirectoryFailed";
  381. case RFSERR_IsReadOnlyFailed:
  382. return "RFSERR_IsReadOnlyFailed";
  383. case RFSERR_SetReadOnlyFailed:
  384. return "RFSERR_SetReadOnlyFailed";
  385. case RFSERR_GetTimeFailed:
  386. return "RFSERR_GetTimeFailed";
  387. case RFSERR_SetTimeFailed:
  388. return "RFSERR_SetTimeFailed";
  389. case RFSERR_CreateDirFailed:
  390. return "RFSERR_CreateDirFailed";
  391. case RFSERR_GetDirFailed:
  392. return "RFSERR_GetDirFailed";
  393. case RFSERR_GetCrcFailed:
  394. return "RFSERR_GetCrcFailed";
  395. case RFSERR_MoveFailed:
  396. return "RFSERR_MoveFailed";
  397. case RFSERR_ExtractBlobElementsFailed:
  398. return "RFSERR_ExtractBlobElementsFailed";
  399. case RFSERR_CopyFailed:
  400. return "RFSERR_CopyFailed";
  401. case RFSERR_AppendFailed:
  402. return "RFSERR_AppendFailed";
  403. case RFSERR_AuthenticateFailed:
  404. return "RFSERR_AuthenticateFailed";
  405. case RFSERR_CopySectionFailed:
  406. return "RFSERR_CopySectionFailed";
  407. case RFSERR_TreeCopyFailed:
  408. return "RFSERR_TreeCopyFailed";
  409. case RAERR_InvalidUsernamePassword:
  410. return "RAERR_InvalidUsernamePassword";
  411. case RFSERR_MasterSeemsToHaveDied:
  412. return "RFSERR_MasterSeemsToHaveDied";
  413. case RFSERR_TimeoutWaitSlave:
  414. return "RFSERR_TimeoutWaitSlave";
  415. case RFSERR_TimeoutWaitConnect:
  416. return "RFSERR_TimeoutWaitConnect";
  417. case RFSERR_TimeoutWaitMaster:
  418. return "RFSERR_TimeoutWaitMaster";
  419. case RFSERR_NoConnectSlave:
  420. return "RFSERR_NoConnectSlave";
  421. case RFSERR_NoConnectSlaveXY:
  422. return "RFSERR_NoConnectSlaveXY";
  423. case RFSERR_VersionMismatch:
  424. return "RFSERR_VersionMismatch";
  425. case RFSERR_SetThrottleFailed:
  426. return "RFSERR_SetThrottleFailed";
  427. case RFSERR_MaxQueueRequests:
  428. return "RFSERR_MaxQueueRequests";
  429. case RFSERR_KeyIndexFailed:
  430. return "RFSERR_MaxQueueRequests";
  431. }
  432. return "RFSERR_Unknown";
  433. }
  434. #define ThrottleText(throttleClass) #throttleClass
  435. const char *ThrottleStrings[] =
  436. {
  437. ThrottleText(ThrottleStd),
  438. ThrottleText(ThrottleSlow),
  439. };
  440. // very high upper limits that configure can't exceed
  441. #define THROTTLE_MAX_LIMIT 1000000
  442. #define THROTTLE_MAX_DELAYMS 3600000
  443. #define THROTTLE_MAX_CPUTHRESHOLD 100
  444. #define THROTTLE_MAX_QUEUELIMIT 10000000
  445. static const char *getThrottleClassText(ThrottleClass throttleClass) { return ThrottleStrings[throttleClass]; }
  446. typedef enum { ACScontinue, ACSdone, ACSerror} AsyncCommandStatus;
  447. typedef byte OnceKey[16];
  448. static void genOnce(OnceKey &key)
  449. {
  450. static __int64 inc=0;
  451. *(unsigned *)&key[0] = getRandom();
  452. *(__int64 *)&key[4] = ++inc;
  453. *(unsigned *)&key[12] = getRandom();
  454. }
  455. static void mergeOnce(OnceKey &key,size32_t sz,const void *data)
  456. {
  457. assertex(sz<=sizeof(OnceKey));
  458. const byte *p = (const byte *)data;
  459. while (sz)
  460. key[--sz] ^= *(p++);
  461. }
  462. //---------------------------------------------------------------------------
  463. class DECL_EXCEPTION CDafsException: public IDAFS_Exception, public CInterface
  464. {
  465. int errcode;
  466. StringAttr msg;
  467. public:
  468. IMPLEMENT_IINTERFACE;
  469. CDafsException(int code,const char *_msg)
  470. : errcode(code), msg(_msg)
  471. {
  472. };
  473. int errorCode() const
  474. {
  475. return errcode;
  476. }
  477. StringBuffer & errorMessage(StringBuffer &str) const
  478. {
  479. return str.append(msg);
  480. }
  481. MessageAudience errorAudience() const
  482. {
  483. return MSGAUD_user;
  484. }
  485. };
  486. static IDAFS_Exception *createDafsException(int code,const char *msg)
  487. {
  488. return new CDafsException(code,msg);
  489. }
  490. void setDafsEndpointPort(SocketEndpoint &ep)
  491. {
  492. // odd kludge (don't do this at home)
  493. byte ipb[4];
  494. if (ep.getNetAddress(sizeof(ipb),&ipb)==sizeof(ipb)) {
  495. if ((ipb[0]==255)&&(ipb[1]==255)) {
  496. ep.port = (((unsigned)ipb[2])<<8)+ipb[3];
  497. ep.ipset(queryLocalIP());
  498. }
  499. }
  500. if (ep.port==0)
  501. {
  502. if ( (securitySettings.connectMethod == SSLNone) || (securitySettings.connectMethod == UnsecureFirst) )
  503. ep.port = securitySettings.daFileSrvPort;
  504. else
  505. ep.port = securitySettings.daFileSrvSSLPort;
  506. }
  507. }
  508. inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff)
  509. {
  510. buff.setEndian(__BIG_ENDIAN); // transfer as big endian...
  511. buff.append((unsigned)0); // reserve space for length prefix
  512. return buff;
  513. }
  514. inline void sendBuffer(ISocket * socket, MemoryBuffer & src, bool testSocketFlag=false)
  515. {
  516. unsigned length = src.length() - sizeof(unsigned);
  517. byte * buffer = (byte *)src.toByteArray();
  518. if (TF_TRACE_FULL)
  519. PROGLOG("sendBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]);
  520. if (testSocketFlag)
  521. length |= 0x80000000;
  522. _WINCPYREV(buffer, &length, sizeof(unsigned));
  523. SOCKWRITE(socket)(buffer, src.length());
  524. }
  525. inline size32_t receiveBufferSize(ISocket * socket, unsigned numtries=NORMAL_RETRIES,CTimeMon *timemon=NULL)
  526. {
  527. unsigned timeout = SERVER_TIMEOUT;
  528. if (numtries==0) {
  529. numtries = 1;
  530. timeout = 10*1000; // 10s
  531. }
  532. while (numtries--) {
  533. try {
  534. if (timemon) {
  535. unsigned remaining;
  536. if (timemon->timedout(&remaining)||(remaining<10))
  537. remaining = 10;
  538. if (remaining<timeout)
  539. timeout = remaining;
  540. }
  541. size32_t szread;
  542. size32_t gotLength;
  543. SOCKREADTMS(socket)(&gotLength, sizeof(gotLength), sizeof(gotLength), szread, timeout);
  544. _WINREV(gotLength);
  545. if (TF_TRACE_FULL)
  546. PROGLOG("receiveBufferSized %d",gotLength);
  547. return gotLength;
  548. }
  549. catch (IJSOCK_Exception *e) {
  550. if ((numtries==0)||(e->errorCode()!=JSOCKERR_timeout_expired)||(timemon&&timemon->timedout())) {
  551. throw;
  552. }
  553. StringBuffer err;
  554. char peername[256];
  555. socket->peer_name(peername,sizeof(peername)-1);
  556. WARNLOG("Remote connection %s: %s",peername,e->errorMessage(err).str()); // why no peername
  557. e->Release();
  558. Sleep(500+getRandom()%1000); // ~1s
  559. }
  560. }
  561. return 0;
  562. }
  563. static void flush(ISocket *socket)
  564. {
  565. MemoryBuffer sendbuf;
  566. initSendBuffer(sendbuf);
  567. sendbuf.append((RemoteFileCommandType)RFCgetver);
  568. sendbuf.append((unsigned)RFCgetver);
  569. MemoryBuffer reply;
  570. size32_t totread=0;
  571. try {
  572. sendBuffer(socket, sendbuf);
  573. char buf[1024];
  574. for (;;) {
  575. Sleep(1000); // breathe
  576. size32_t szread;
  577. SOCKREADTMS(socket)(buf, 1, sizeof(buf), szread, 1000*60);
  578. totread += szread;
  579. }
  580. }
  581. catch (IJSOCK_Exception *e) {
  582. if (totread)
  583. PROGLOG("%d bytes discarded",totread);
  584. if (e->errorCode()!=JSOCKERR_timeout_expired)
  585. EXCLOG(e,"flush");
  586. e->Release();
  587. }
  588. }
  589. inline void receiveBuffer(ISocket * socket, MemoryBuffer & tgt, unsigned numtries=1, size32_t maxsz=0x7fffffff)
  590. // maxsz is a guess at a resonable upper max to catch where protocol error
  591. {
  592. sRFTM tm(maxReceiveTime);
  593. size32_t gotLength = receiveBufferSize(socket, numtries,tm.timemon);
  594. if (gotLength) {
  595. size32_t origlen = tgt.length();
  596. try {
  597. if (gotLength>maxsz) {
  598. StringBuffer msg;
  599. msg.appendf("receiveBuffer maximum block size exceeded %d/%d",gotLength,maxsz);
  600. PrintStackReport();
  601. throw createDafsException(DAFSERR_protocol_failure,msg.str());
  602. }
  603. unsigned timeout = SERVER_TIMEOUT*(numtries?numtries:1);
  604. if (tm.timemon) {
  605. unsigned remaining;
  606. if (tm.timemon->timedout(&remaining)||(remaining<10))
  607. remaining = 10;
  608. if (remaining<timeout)
  609. timeout = remaining;
  610. }
  611. size32_t szread;
  612. SOCKREADTMS(socket)((gotLength<4000)?tgt.reserve(gotLength):tgt.reserveTruncate(gotLength), gotLength, gotLength, szread, timeout);
  613. }
  614. catch (IJSOCK_Exception *e) {
  615. if (e->errorCode()!=JSOCKERR_timeout_expired) {
  616. EXCLOG(e,"receiveBuffer(1)");
  617. PrintStackReport();
  618. if (!tm.timemon||!tm.timemon->timedout())
  619. flush(socket);
  620. }
  621. else {
  622. EXCLOG(e,"receiveBuffer");
  623. PrintStackReport();
  624. }
  625. tgt.setLength(origlen);
  626. throw;
  627. }
  628. catch (IException *e) {
  629. EXCLOG(e,"receiveBuffer(2)");
  630. PrintStackReport();
  631. if (!tm.timemon||!tm.timemon->timedout())
  632. flush(socket);
  633. tgt.setLength(origlen);
  634. throw;
  635. }
  636. }
  637. tgt.setEndian(__BIG_ENDIAN);
  638. }
  639. struct CConnectionRec
  640. {
  641. SocketEndpoint ep;
  642. unsigned tick;
  643. IArrayOf<ISocket> socks; // relies on isShared
  644. };
  645. //---------------------------------------------------------------------------
  646. // Local mount redirect
  647. struct CLocalMountRec: public CInterface
  648. {
  649. IpAddress ip;
  650. StringAttr dir; // dir path on remote ip
  651. StringAttr local; // local dir path
  652. };
  653. static CIArrayOf<CLocalMountRec> localMounts;
  654. static CriticalSection localMountCrit;
  655. void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  656. {
  657. CriticalBlock block(localMountCrit);
  658. ForEachItemInRev(i,localMounts) {
  659. CLocalMountRec &mount = localMounts.item(i);
  660. if (dir==NULL) { // remove all matching mount
  661. if (!mountdir)
  662. return;
  663. if (strcmp(mount.local,mountdir)==0)
  664. localMounts.remove(i);
  665. }
  666. else if (mount.ip.ipequals(ip)&&(strcmp(mount.dir,dir)==0)) {
  667. if (mountdir) {
  668. mount.local.set(mountdir);
  669. return;
  670. }
  671. else
  672. localMounts.remove(i);
  673. }
  674. }
  675. if (dir&&mountdir) {
  676. CLocalMountRec &mount = *new CLocalMountRec;
  677. mount.ip.ipset(ip);
  678. mount.dir.set(dir);
  679. mount.local.set(mountdir);
  680. localMounts.append(mount);
  681. }
  682. }
  683. IFile *createFileLocalMount(const IpAddress &ip, const char * filename)
  684. {
  685. CriticalBlock block(localMountCrit);
  686. ForEachItemInRev(i,localMounts) {
  687. CLocalMountRec &mount = localMounts.item(i);
  688. if (mount.ip.ipequals(ip)) {
  689. size32_t bl = mount.dir.length();
  690. if (isPathSepChar(mount.dir[bl-1]))
  691. bl--;
  692. if ((memcmp((void *)filename,(void *)mount.dir.get(),bl)==0)&&(isPathSepChar(filename[bl])||!filename[bl])) { // match
  693. StringBuffer locpath(mount.local);
  694. if (filename[bl])
  695. addPathSepChar(locpath).append(filename+bl+1);
  696. locpath.replace((PATHSEPCHAR=='\\')?'/':'\\',PATHSEPCHAR);
  697. return createIFile(locpath.str());
  698. }
  699. }
  700. }
  701. return NULL;
  702. }
  703. //---------------------------------------------------------------------------
  704. static class CConnectionTable: public SuperHashTableOf<CConnectionRec,SocketEndpoint>
  705. {
  706. void onAdd(void *) {}
  707. void onRemove(void *e)
  708. {
  709. CConnectionRec *r=(CConnectionRec *)e;
  710. delete r;
  711. }
  712. unsigned getHashFromElement(const void *e) const
  713. {
  714. const CConnectionRec &elem=*(const CConnectionRec *)e;
  715. return elem.ep.hash(0);
  716. }
  717. unsigned getHashFromFindParam(const void *fp) const
  718. {
  719. return ((const SocketEndpoint *)fp)->hash(0);
  720. }
  721. const void * getFindParam(const void *p) const
  722. {
  723. const CConnectionRec &elem=*(const CConnectionRec *)p;
  724. return (void *)&elem.ep;
  725. }
  726. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  727. {
  728. return ((CConnectionRec *)et)->ep.equals(*(SocketEndpoint *)fp);
  729. }
  730. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CConnectionRec,SocketEndpoint);
  731. unsigned numsockets;
  732. public:
  733. static CriticalSection crit;
  734. CConnectionTable()
  735. {
  736. numsockets = 0;
  737. }
  738. ~CConnectionTable() {
  739. _releaseAll();
  740. }
  741. ISocket *lookup(const SocketEndpoint &ep)
  742. {
  743. // always called from crit block
  744. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  745. if (r) {
  746. ForEachItemIn(i,r->socks) {
  747. ISocket *s = &r->socks.item(i);
  748. if (!QUERYINTERFACE(s, CInterface)->IsShared()) {
  749. r->tick = msTick();
  750. s->Link();
  751. return s;
  752. }
  753. }
  754. }
  755. return NULL;
  756. }
  757. void addLink(SocketEndpoint &ep,ISocket *sock)
  758. {
  759. // always called from crit block
  760. while (numsockets>=SOCKET_CACHE_MAX) {
  761. // find oldest
  762. CConnectionRec *c = NULL;
  763. unsigned oldest = 0;
  764. CConnectionRec *old = NULL;
  765. unsigned oldi;
  766. unsigned now = msTick();
  767. for (;;) {
  768. c = (CConnectionRec *)SuperHashTableOf<CConnectionRec,SocketEndpoint>::next(c);
  769. if (!c)
  770. break;
  771. ForEachItemIn(i,c->socks) {
  772. ISocket *s = &c->socks.item(i);
  773. if (!QUERYINTERFACE(s, CInterface)->IsShared()) { // candidate to remove
  774. unsigned t = now-c->tick;
  775. if (t>oldest) {
  776. oldest = t;
  777. old = c;
  778. oldi = i;
  779. }
  780. }
  781. }
  782. }
  783. if (!old)
  784. return;
  785. old->socks.remove(oldi);
  786. numsockets--;
  787. }
  788. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  789. if (!r) {
  790. r = new CConnectionRec;
  791. r->ep = ep;
  792. SuperHashTableOf<CConnectionRec,SocketEndpoint>::add(*r);
  793. }
  794. sock->Link();
  795. r->socks.append(*sock);
  796. numsockets++;
  797. r->tick = msTick();
  798. }
  799. void remove(SocketEndpoint &ep,ISocket *sock)
  800. {
  801. // always called from crit block
  802. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  803. if (r)
  804. if (r->socks.zap(*sock)&&numsockets)
  805. numsockets--;
  806. }
  807. } *ConnectionTable = NULL;
  808. CriticalSection CConnectionTable::crit;
  809. void clientSetDaliServixSocketCaching(bool on)
  810. {
  811. CriticalBlock block(CConnectionTable::crit);
  812. if (on) {
  813. if (!ConnectionTable)
  814. ConnectionTable = new CConnectionTable;
  815. }
  816. else {
  817. delete ConnectionTable;
  818. ConnectionTable = NULL;
  819. }
  820. }
  821. //---------------------------------------------------------------------------
  822. // TreeCopy
  823. #define TREECOPY_CACHE_SIZE 50
  824. struct CTreeCopyItem: public CInterface
  825. {
  826. StringAttr net;
  827. StringAttr mask;
  828. offset_t sz; // original size
  829. CDateTime dt; // original date
  830. RemoteFilenameArray loc; // locations for file - 0 is original
  831. Owned<IBitSet> busy;
  832. unsigned lastused;
  833. CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  834. : net(_net), mask(_mask)
  835. {
  836. loc.append(orig);
  837. dt.set(_dt);
  838. sz = _sz;
  839. busy.setown(createThreadSafeBitSet());
  840. lastused = msTick();
  841. }
  842. bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  843. {
  844. if (!orig.equals(loc.item(0)))
  845. return false;
  846. if (strcmp(_net,net)!=0)
  847. return false;
  848. if (strcmp(_mask,mask)!=0)
  849. return false;
  850. if (sz!=_sz)
  851. return false;
  852. return (dt.equals(_dt,false));
  853. }
  854. };
  855. static CIArrayOf<CTreeCopyItem> treeCopyArray;
  856. static CriticalSection treeCopyCrit;
  857. static unsigned treeCopyWaiting=0;
  858. static Semaphore treeCopySem;
  859. #define DEBUGSAMEIP false
  860. static void cleanupSocket(ISocket *sock)
  861. {
  862. if (!sock)
  863. return;
  864. try
  865. {
  866. sock->shutdown();
  867. }
  868. catch (IException *e)
  869. {
  870. e->Release();
  871. }
  872. try
  873. {
  874. sock->close();
  875. }
  876. catch (IException *e)
  877. {
  878. e->Release();
  879. }
  880. }
  881. //---------------------------------------------------------------------------
  882. class CRemoteBase: public CInterface
  883. {
  884. Owned<ISocket> socket;
  885. static SocketEndpoint lastfailep;
  886. static unsigned lastfailtime;
  887. DAFSConnectCfg connectMethod;
  888. void connectSocket(SocketEndpoint &ep, unsigned localConnectTime=0, unsigned localRetries=0)
  889. {
  890. unsigned retries = 3;
  891. if (localConnectTime)
  892. {
  893. if (localRetries)
  894. retries = localRetries;
  895. if (localConnectTime > maxConnectTime)
  896. localConnectTime = maxConnectTime;
  897. }
  898. else
  899. localConnectTime = maxConnectTime;
  900. sRFTM tm(localConnectTime);
  901. // called in CConnectionTable::crit
  902. if (ep.equals(lastfailep)) {
  903. if (msTick()-lastfailtime<DAFS_CONNECT_FAIL_RETRY_TIME) {
  904. StringBuffer msg("Failed to connect (host marked down) to dafilesrv/daliservix on ");
  905. ep.getUrlStr(msg);
  906. throw createDafsException(DAFSERR_connection_failed,msg.str());
  907. }
  908. lastfailep.set(NULL);
  909. retries = 1; // on probation
  910. }
  911. while(retries--) {
  912. CriticalUnblock unblock(CConnectionTable::crit); // allow others to connect
  913. StringBuffer eps;
  914. if (TF_TRACE_CLIENT_CONN) {
  915. ep.getUrlStr(eps);
  916. if (ep.port == securitySettings.daFileSrvSSLPort)
  917. PROGLOG("Connecting SECURE to %s", eps.str());
  918. else
  919. PROGLOG("Connecting to %s", eps.str());
  920. //PrintStackReport();
  921. }
  922. bool ok = true;
  923. try {
  924. if (tm.timemon) {
  925. unsigned remaining;
  926. if (tm.timemon->timedout(&remaining))
  927. throwJSocketException(JSOCKERR_connection_failed);
  928. socket.setown(ISocket::connect_timeout(ep,remaining));
  929. }
  930. else
  931. socket.setown(ISocket::connect(ep));
  932. if (ep.port == securitySettings.daFileSrvSSLPort)
  933. {
  934. #ifdef _USE_OPENSSL
  935. Owned<ISecureSocket> ssock;
  936. try
  937. {
  938. ssock.setown(createSecureSocket(socket.getClear(), ClientSocket));
  939. int status = ssock->secure_connect();
  940. if (status < 0)
  941. throw createDafsException(DAFSERR_connection_failed, "Failure to establish secure connection");
  942. socket.setown(ssock.getLink());
  943. }
  944. catch (IException *e)
  945. {
  946. cleanupSocket(ssock);
  947. ssock.clear();
  948. cleanupSocket(socket);
  949. socket.clear();
  950. StringBuffer eMsg;
  951. e->errorMessage(eMsg);
  952. e->Release();
  953. throw createDafsException(DAFSERR_connection_failed, eMsg.str());
  954. }
  955. #else
  956. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
  957. #endif
  958. }
  959. }
  960. catch (IJSOCK_Exception *e) {
  961. ok = false;
  962. if (!retries||(tm.timemon&&tm.timemon->timedout())) {
  963. if (e->errorCode()==JSOCKERR_connection_failed) {
  964. lastfailep.set(ep);
  965. lastfailtime = msTick();
  966. e->Release();
  967. StringBuffer msg("Failed to connect (setting host down) to dafilesrv/daliservix on ");
  968. ep.getUrlStr(msg);
  969. throw createDafsException(DAFSERR_connection_failed,msg.str());
  970. }
  971. throw;
  972. }
  973. StringBuffer err;
  974. WARNLOG("Remote file connect %s",e->errorMessage(err).str());
  975. e->Release();
  976. }
  977. if (ok) {
  978. if (TF_TRACE_CLIENT_CONN) {
  979. PROGLOG("Connected to %s",eps.str());
  980. }
  981. if (AuthenticationEnabled) {
  982. try {
  983. sendAuthentication(ep); // this will log error
  984. break;
  985. }
  986. catch (IJSOCK_Exception *e) {
  987. StringBuffer err;
  988. WARNLOG("Remote file authenticate %s for %s ",e->errorMessage(err).str(),ep.getUrlStr(eps.clear()).str());
  989. e->Release();
  990. if (!retries)
  991. break; // MCK - is this a warning or an error ? If an error, should we close and throw here ?
  992. }
  993. }
  994. else
  995. break;
  996. }
  997. bool timeExpired = false;
  998. unsigned sleeptime = getRandom()%3000+1000;
  999. if (tm.timemon)
  1000. {
  1001. unsigned remaining;
  1002. if (tm.timemon->timedout(&remaining))
  1003. timeExpired = true;
  1004. else
  1005. {
  1006. if (remaining/2<sleeptime)
  1007. sleeptime = remaining/2;
  1008. }
  1009. }
  1010. if (!timeExpired)
  1011. {
  1012. Sleep(sleeptime); // prevent multiple retries beating
  1013. if (ep.port == securitySettings.daFileSrvSSLPort)
  1014. PROGLOG("Retrying SECURE connect");
  1015. else
  1016. PROGLOG("Retrying connect");
  1017. }
  1018. }
  1019. if (ConnectionTable)
  1020. ConnectionTable->addLink(ep,socket);
  1021. }
  1022. void killSocket(SocketEndpoint &tep)
  1023. {
  1024. CriticalBlock block2(CConnectionTable::crit); // this is nested with crit
  1025. if (socket) {
  1026. try {
  1027. Owned<ISocket> s = socket.getClear();
  1028. if (ConnectionTable)
  1029. ConnectionTable->remove(tep,s);
  1030. }
  1031. catch (IJSOCK_Exception *e) {
  1032. e->Release(); // ignore errors closing
  1033. }
  1034. Sleep(getRandom()%1000*5+500); // prevent multiple beating
  1035. }
  1036. }
  1037. protected: friend class CRemoteFileIO;
  1038. StringAttr filename;
  1039. CriticalSection crit;
  1040. SocketEndpoint ep;
  1041. void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false, bool handleErrCode=true)
  1042. {
  1043. CriticalBlock block(crit); // serialize commands on same file
  1044. SocketEndpoint tep(ep);
  1045. setDafsEndpointPort(tep);
  1046. unsigned nretries = retry?3:0;
  1047. Owned<IJSOCK_Exception> firstexc; // when retrying return first error if fails
  1048. for (;;) {
  1049. try {
  1050. if (socket) {
  1051. sendBuffer(socket, src);
  1052. receiveBuffer(socket, reply, lengthy?LENGTHY_RETRIES:NORMAL_RETRIES);
  1053. break;
  1054. }
  1055. }
  1056. catch (IJSOCK_Exception *e) {
  1057. if (!nretries--) {
  1058. if (firstexc) {
  1059. e->Release();
  1060. e = firstexc.getClear();
  1061. }
  1062. killSocket(tep);
  1063. throw e;
  1064. }
  1065. StringBuffer str;
  1066. e->errorMessage(str);
  1067. WARNLOG("Remote File: %s, retrying (%d)",str.str(),nretries);
  1068. if (firstexc)
  1069. e->Release();
  1070. else
  1071. firstexc.setown(e);
  1072. killSocket(tep);
  1073. }
  1074. CriticalBlock block2(CConnectionTable::crit); // this is nested with crit
  1075. if (ConnectionTable) {
  1076. socket.setown(ConnectionTable->lookup(tep));
  1077. if (socket) {
  1078. // validate existing socket by sending an 'exists' command with short time out
  1079. // (use exists for backward compatibility)
  1080. bool ok = false;
  1081. try {
  1082. MemoryBuffer sendbuf;
  1083. initSendBuffer(sendbuf);
  1084. MemoryBuffer replybuf;
  1085. sendbuf.append((RemoteFileCommandType)RFCexists).append(filename);
  1086. sendBuffer(socket, sendbuf);
  1087. receiveBuffer(socket, replybuf, 0, 1024);
  1088. ok = true;
  1089. }
  1090. catch (IException *e) {
  1091. e->Release();
  1092. }
  1093. if (!ok)
  1094. killSocket(tep);
  1095. }
  1096. }
  1097. if (!socket)
  1098. {
  1099. bool doConnect = true;
  1100. if (connectMethod == SSLFirst || connectMethod == UnsecureFirst)
  1101. {
  1102. // MCK - could maintain a list of 100 or so previous endpoints and if connection failed
  1103. // then mark port down for a delay (like 15 min above) to avoid having to try every time ...
  1104. try
  1105. {
  1106. connectSocket(tep, 5000, 1);
  1107. doConnect = false;
  1108. }
  1109. catch (IDAFS_Exception *e)
  1110. {
  1111. if (e->errorCode() == DAFSERR_connection_failed)
  1112. {
  1113. unsigned prevPort = tep.port;
  1114. if (prevPort == securitySettings.daFileSrvSSLPort)
  1115. tep.port = securitySettings.daFileSrvPort;
  1116. else
  1117. tep.port = securitySettings.daFileSrvSSLPort;
  1118. WARNLOG("Connect failed on port %d, retrying on port %d", prevPort, tep.port);
  1119. doConnect = true;
  1120. e->Release();
  1121. }
  1122. else
  1123. throw e;
  1124. }
  1125. }
  1126. if (doConnect)
  1127. connectSocket(tep);
  1128. }
  1129. }
  1130. if (!handleErrCode)
  1131. return;
  1132. unsigned errCode;
  1133. reply.read(errCode);
  1134. if (errCode) {
  1135. StringBuffer msg;
  1136. if (filename.get())
  1137. msg.append(filename);
  1138. ep.getUrlStr(msg.append('[')).append("] ");
  1139. size32_t pos = reply.getPos();
  1140. if (pos<reply.length()) {
  1141. size32_t len = reply.length()-pos;
  1142. const byte *rest = reply.readDirect(len);
  1143. if (errCode==RFSERR_InvalidCommand) {
  1144. const char *s = (const char *)rest;
  1145. const char *e = (const char *)rest+len;
  1146. while (*s&&(s!=e))
  1147. s++;
  1148. msg.append(s-(const char *)rest,(const char *)rest);
  1149. }
  1150. else if (len&&(rest[len-1]==0))
  1151. msg.append((const char *)rest);
  1152. else {
  1153. msg.appendf("extra data[%d]",len);
  1154. for (unsigned i=0;(i<16)&&(i<len);i++)
  1155. msg.appendf(" %2x",(int)rest[i]);
  1156. }
  1157. }
  1158. else if(errCode == 8209)
  1159. msg.append("Failed to open directory.");
  1160. else
  1161. msg.append("ERROR #").append(errCode);
  1162. #ifdef _DEBUG
  1163. ERRLOG("%s",msg.str());
  1164. PrintStackReport();
  1165. #endif
  1166. throw createDafsException(errCode,msg.str());
  1167. }
  1168. }
  1169. void sendRemoteCommand(MemoryBuffer & src, bool retry)
  1170. {
  1171. MemoryBuffer reply;
  1172. sendRemoteCommand(src, reply, retry);
  1173. }
  1174. void throwUnauthenticated(const IpAddress &ip,const char *user,unsigned err=0)
  1175. {
  1176. if (err==0)
  1177. err = RFSERR_AuthenticateFailed;
  1178. StringBuffer msg;
  1179. msg.appendf("Authentication for %s on ",user);
  1180. ip.getIpText(msg);
  1181. msg.append(" failed");
  1182. throw createDafsException(err, msg.str());
  1183. }
  1184. void sendAuthentication(const IpAddress &serverip)
  1185. {
  1186. // send my sig
  1187. // first send my sig which if stream unencrypted will get returned as a bad command
  1188. OnceKey oncekey;
  1189. genOnce(oncekey);
  1190. MemoryBuffer sendbuf;
  1191. initSendBuffer(sendbuf);
  1192. MemoryBuffer replybuf;
  1193. MemoryBuffer encbuf; // because aesEncrypt clears input
  1194. sendbuf.append((RemoteFileCommandType)RFCunlock).append(sizeof(oncekey),&oncekey);
  1195. try {
  1196. sendBuffer(socket, sendbuf);
  1197. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  1198. }
  1199. catch (IException *e)
  1200. {
  1201. EXCLOG(e,"Remote file - sendAuthentication(1)");
  1202. throw;
  1203. }
  1204. unsigned errCode;
  1205. replybuf.read(errCode);
  1206. if (errCode!=0) // no authentication required
  1207. return;
  1208. SocketEndpoint ep;
  1209. ep.setLocalHost(0);
  1210. byte ipdata[16];
  1211. size32_t ipds = ep.getNetAddress(sizeof(ipdata),&ipdata);
  1212. mergeOnce(oncekey,ipds,&ipdata);
  1213. StringBuffer username;
  1214. StringBuffer password;
  1215. IPasswordProvider * pp = queryPasswordProvider();
  1216. if (pp)
  1217. pp->getPassword(serverip, username, password);
  1218. if (!username.length())
  1219. username.append("sds_system"); // default account (note if exists should have restricted access!)
  1220. if (!password.length())
  1221. password.append("sds_man");
  1222. if (replybuf.remaining()<=sizeof(size32_t))
  1223. throwUnauthenticated(serverip,username.str());
  1224. size32_t bs;
  1225. replybuf.read(bs);
  1226. if (replybuf.remaining()<bs)
  1227. throwUnauthenticated(serverip,username.str());
  1228. MemoryBuffer skeybuf;
  1229. aesDecrypt(&oncekey,sizeof(oncekey),replybuf.readDirect(bs),bs,skeybuf);
  1230. if (skeybuf.remaining()<sizeof(OnceKey))
  1231. throwUnauthenticated(serverip,username.str());
  1232. OnceKey sokey;
  1233. skeybuf.read(sizeof(OnceKey),&sokey);
  1234. // now we have the key to use to send user/password
  1235. MemoryBuffer tosend;
  1236. tosend.append((byte)2).append(username).append(password);
  1237. initSendBuffer(sendbuf.clear());
  1238. sendbuf.append((RemoteFileCommandType)RFCunlockreply);
  1239. aesEncrypt(&sokey, sizeof(oncekey), tosend.toByteArray(), tosend.length(), encbuf);
  1240. sendbuf.append(encbuf.length());
  1241. sendbuf.append(encbuf);
  1242. try {
  1243. sendBuffer(socket, sendbuf);
  1244. receiveBuffer(socket, replybuf.clear(), NORMAL_RETRIES, 1024);
  1245. }
  1246. catch (IException *e)
  1247. {
  1248. EXCLOG(e,"Remote file - sendAuthentication(2)");
  1249. throw;
  1250. }
  1251. replybuf.read(errCode);
  1252. if (errCode==0) // suceeded!
  1253. return;
  1254. throwUnauthenticated(serverip,username.str(),errCode);
  1255. }
  1256. public:
  1257. SocketEndpoint &queryEp() { return ep; };
  1258. CRemoteBase(const SocketEndpoint &_ep, const char * _filename)
  1259. : filename(_filename)
  1260. {
  1261. ep = _ep;
  1262. connectMethod = securitySettings.connectMethod;
  1263. }
  1264. void disconnect()
  1265. {
  1266. CriticalBlock block(crit);
  1267. CriticalBlock block2(CConnectionTable::crit); // this shouldn't ever block
  1268. if (socket)
  1269. {
  1270. ISocket *s = socket.getClear();
  1271. if (ConnectionTable)
  1272. {
  1273. SocketEndpoint tep(ep);
  1274. setDafsEndpointPort(tep);
  1275. ConnectionTable->remove(tep,s);
  1276. }
  1277. ::Release(s);
  1278. }
  1279. }
  1280. const char *queryLocalName()
  1281. {
  1282. return filename;
  1283. }
  1284. };
  1285. SocketEndpoint CRemoteBase::lastfailep;
  1286. unsigned CRemoteBase::lastfailtime;
  1287. //---------------------------------------------------------------------------
  1288. class CRemoteDirectoryIterator : implements IDirectoryDifferenceIterator, public CInterface
  1289. {
  1290. Owned<IFile> cur;
  1291. bool curvalid;
  1292. bool curisdir;
  1293. StringAttr curname;
  1294. CDateTime curdt;
  1295. __int64 cursize;
  1296. StringAttr dir;
  1297. SocketEndpoint ep;
  1298. byte *flags;
  1299. unsigned numflags;
  1300. unsigned curidx;
  1301. unsigned mask;
  1302. MemoryBuffer buf;
  1303. public:
  1304. static CriticalSection crit;
  1305. CRemoteDirectoryIterator(SocketEndpoint &_ep,const char *_dir)
  1306. : dir(_dir)
  1307. {
  1308. // an extended difference iterator starts with 2 (for bwd compatibility)
  1309. ep = _ep;
  1310. curisdir = false;
  1311. curvalid = false;
  1312. cursize = 0;
  1313. curidx = (unsigned)-1;
  1314. mask = 0;
  1315. numflags = 0;
  1316. flags = NULL;
  1317. }
  1318. bool appendBuf(MemoryBuffer &_buf)
  1319. {
  1320. buf.setSwapEndian(_buf.needSwapEndian());
  1321. byte hdr;
  1322. _buf.read(hdr);
  1323. if (hdr==2) {
  1324. _buf.read(numflags);
  1325. flags = (byte *)malloc(numflags);
  1326. _buf.read(numflags,flags);
  1327. }
  1328. else {
  1329. buf.append(hdr);
  1330. flags = NULL;
  1331. numflags = 0;
  1332. }
  1333. size32_t rest = _buf.length()-_buf.getPos();
  1334. const byte *rb = (const byte *)_buf.readDirect(rest);
  1335. bool ret = true;
  1336. // At the last byte of the rb (rb[rest-1]) is the stream live flag
  1337. // True if the stream has more data
  1338. // False at the end of stream
  1339. // The previous byte (rb[rest-2]) is the flag to signal there are more
  1340. // valid entries in this block
  1341. // True if there are valid directory entry follows this flag
  1342. // False if there are no more valid entry in this block aka end of block
  1343. // If there is more data in the stream, the end of block flag should be removed
  1344. if (rest&&(rb[rest-1]!=0))
  1345. {
  1346. rest--; // remove stream live flag
  1347. if(rest && (0 == rb[rest-1]))
  1348. rest--; //Remove end of block flag
  1349. ret = false; // continuation
  1350. }
  1351. buf.append(rest,rb);
  1352. return ret;
  1353. }
  1354. ~CRemoteDirectoryIterator()
  1355. {
  1356. free(flags);
  1357. }
  1358. IMPLEMENT_IINTERFACE
  1359. bool first()
  1360. {
  1361. curidx = (unsigned)-1;
  1362. buf.reset();
  1363. return next();
  1364. }
  1365. bool next()
  1366. {
  1367. for (;;) {
  1368. curidx++;
  1369. cur.clear();
  1370. curdt.clear();
  1371. curname.clear();
  1372. cursize = 0;
  1373. curisdir = false;
  1374. if (buf.getPos()>=buf.length())
  1375. return false;
  1376. byte isValidEntry;
  1377. buf.read(isValidEntry);
  1378. curvalid = isValidEntry!=0;
  1379. if (!curvalid)
  1380. return false;
  1381. buf.read(curisdir);
  1382. buf.read(cursize);
  1383. curdt.deserialize(buf);
  1384. buf.read(curname);
  1385. // kludge for bug in old linux jlibs
  1386. if (strchr(curname,'\\')&&(getPathSepChar(dir)=='/')) {
  1387. StringBuffer temp(curname);
  1388. temp.replace('\\','/');
  1389. curname.set(temp.str());
  1390. }
  1391. if ((mask==0)||(getFlags()&mask))
  1392. break;
  1393. }
  1394. return true;
  1395. }
  1396. bool isValid()
  1397. {
  1398. return curvalid;
  1399. }
  1400. IFile & query()
  1401. {
  1402. if (!cur) {
  1403. StringBuffer full(dir);
  1404. addPathSepChar(full).append(curname);
  1405. if (ep.isNull())
  1406. cur.setown(createIFile(full.str()));
  1407. else {
  1408. RemoteFilename rfn;
  1409. rfn.setPath(ep,full.str());
  1410. cur.setown(createIFile(rfn));
  1411. }
  1412. }
  1413. return *cur;
  1414. }
  1415. StringBuffer &getName(StringBuffer &buf)
  1416. {
  1417. return buf.append(curname);
  1418. }
  1419. bool isDir()
  1420. {
  1421. return curisdir;
  1422. }
  1423. __int64 getFileSize()
  1424. {
  1425. if (curisdir)
  1426. return -1;
  1427. return cursize;
  1428. }
  1429. bool getModifiedTime(CDateTime &ret)
  1430. {
  1431. ret = curdt;
  1432. return true;
  1433. }
  1434. void setMask(unsigned _mask)
  1435. {
  1436. mask = _mask;
  1437. }
  1438. virtual unsigned getFlags()
  1439. {
  1440. if (flags&&(curidx<numflags))
  1441. return flags[curidx];
  1442. return 0;
  1443. }
  1444. static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize, bool first)
  1445. {
  1446. bool ret = true;
  1447. byte b=1;
  1448. StringBuffer tmp;
  1449. if (first ? iter->first() : iter->next()) {
  1450. for (;;) {
  1451. mb.append(b);
  1452. bool isdir = iter->isDir();
  1453. __int64 sz = isdir?0:iter->getFileSize();
  1454. CDateTime dt;
  1455. iter->getModifiedTime(dt);
  1456. iter->getName(tmp.clear());
  1457. mb.append(isdir).append(sz);
  1458. dt.serialize(mb);
  1459. mb.append(tmp.str());
  1460. if (bufsize&&(mb.length()>=bufsize-1)) {
  1461. ret = false;
  1462. break;
  1463. }
  1464. if (!iter->next())
  1465. break;
  1466. }
  1467. }
  1468. b = 0;
  1469. mb.append(b);
  1470. return ret;
  1471. }
  1472. static void serializeDiff(MemoryBuffer &mb,IDirectoryDifferenceIterator *iter)
  1473. {
  1474. // bit slow
  1475. MemoryBuffer flags;
  1476. ForEach(*iter)
  1477. flags.append((byte)iter->getFlags());
  1478. if (flags.length()) {
  1479. byte b = 2;
  1480. mb.append(b).append((unsigned)flags.length()).append(flags);
  1481. }
  1482. serialize(mb,iter,0,true);
  1483. }
  1484. void serialize(MemoryBuffer &mb,bool isdiff)
  1485. {
  1486. byte b;
  1487. if (isdiff&&numflags&&flags) {
  1488. b = 2;
  1489. mb.append(b).append(numflags).append(numflags,flags);
  1490. }
  1491. serialize(mb,this,0,true);
  1492. }
  1493. };
  1494. class CCritTable;
  1495. class CEndpointCS : public CriticalSection, public CInterface
  1496. {
  1497. CCritTable &table;
  1498. const SocketEndpoint ep;
  1499. public:
  1500. CEndpointCS(CCritTable &_table, const SocketEndpoint &_ep) : table(_table), ep(_ep) { }
  1501. const void *queryFindParam() const { return &ep; }
  1502. virtual void beforeDispose();
  1503. };
  1504. class CCritTable : private SimpleHashTableOf<CEndpointCS, const SocketEndpoint>
  1505. {
  1506. typedef SimpleHashTableOf<CEndpointCS, const SocketEndpoint> PARENT;
  1507. CriticalSection crit;
  1508. public:
  1509. CEndpointCS *getCrit(const SocketEndpoint &ep)
  1510. {
  1511. CriticalBlock b(crit);
  1512. Linked<CEndpointCS> clientCrit = find(ep);
  1513. if (!clientCrit || !clientCrit->isAlive()) // if !isAlive(), then it is in the process of being destroyed/removed.
  1514. {
  1515. clientCrit.setown(new CEndpointCS(*this, ep));
  1516. replace(*clientCrit); // NB table doesn't own
  1517. }
  1518. return clientCrit.getClear();
  1519. }
  1520. unsigned getHashFromElement(const void *e) const
  1521. {
  1522. const CEndpointCS &elem=*(const CEndpointCS *)e;
  1523. return getHashFromFindParam(elem.queryFindParam());
  1524. }
  1525. unsigned getHashFromFindParam(const void *fp) const
  1526. {
  1527. return ((const SocketEndpoint *)fp)->hash(0);
  1528. }
  1529. void removeExact(CEndpointCS *clientCrit)
  1530. {
  1531. CriticalBlock b(crit);
  1532. PARENT::removeExact(clientCrit); // NB may not exist, could have been replaced if detected !isAlive() in getCrit()
  1533. }
  1534. } *dirCSTable;
  1535. MODULE_INIT(INIT_PRIORITY_STANDARD)
  1536. {
  1537. dirCSTable = new CCritTable;
  1538. return true;
  1539. }
  1540. MODULE_EXIT()
  1541. {
  1542. delete dirCSTable;
  1543. }
  1544. void CEndpointCS::beforeDispose()
  1545. {
  1546. table.removeExact(this);
  1547. }
  1548. class CRemoteFile : public CRemoteBase, implements IFile
  1549. {
  1550. StringAttr remotefilename;
  1551. unsigned flags;
  1552. public:
  1553. IMPLEMENT_IINTERFACE
  1554. CRemoteFile(const SocketEndpoint &_ep, const char * _filename)
  1555. : CRemoteBase(_ep, _filename)
  1556. {
  1557. flags = ((unsigned)IFSHread)|((S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)<<16);
  1558. if (filename.length()>2 && isPathSepChar(filename[0]) && isShareChar(filename[2]))
  1559. {
  1560. VStringBuffer winDriveFilename("%c:%s", filename[1], filename+3);
  1561. filename.set(winDriveFilename);
  1562. }
  1563. }
  1564. bool exists()
  1565. {
  1566. MemoryBuffer sendBuffer;
  1567. initSendBuffer(sendBuffer);
  1568. MemoryBuffer replyBuffer;
  1569. sendBuffer.append((RemoteFileCommandType)RFCexists).append(filename);
  1570. sendRemoteCommand(sendBuffer, replyBuffer);
  1571. bool ok;
  1572. replyBuffer.read(ok);
  1573. return ok;
  1574. }
  1575. bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime)
  1576. {
  1577. CDateTime dummyTime;
  1578. if (!createTime)
  1579. createTime = &dummyTime;
  1580. if (!modifiedTime)
  1581. modifiedTime = &dummyTime;
  1582. if (!accessedTime)
  1583. accessedTime = &dummyTime;
  1584. MemoryBuffer sendBuffer;
  1585. initSendBuffer(sendBuffer);
  1586. MemoryBuffer replyBuffer;
  1587. sendBuffer.append((RemoteFileCommandType)RFCgettime).append(filename);
  1588. sendRemoteCommand(sendBuffer, replyBuffer);
  1589. bool ok;
  1590. replyBuffer.read(ok);
  1591. if (ok) {
  1592. createTime->deserialize(replyBuffer);
  1593. modifiedTime->deserialize(replyBuffer);
  1594. accessedTime->deserialize(replyBuffer);
  1595. }
  1596. return ok;
  1597. }
  1598. bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime)
  1599. {
  1600. MemoryBuffer sendBuffer;
  1601. initSendBuffer(sendBuffer);
  1602. MemoryBuffer replyBuffer;
  1603. sendBuffer.append((RemoteFileCommandType)RFCsettime).append(filename);
  1604. if (createTime) {
  1605. sendBuffer.append((bool)true);
  1606. createTime->serialize(sendBuffer);
  1607. }
  1608. else
  1609. sendBuffer.append((bool)false);
  1610. if (modifiedTime) {
  1611. sendBuffer.append((bool)true);
  1612. modifiedTime->serialize(sendBuffer);
  1613. }
  1614. else
  1615. sendBuffer.append((bool)false);
  1616. if (accessedTime) {
  1617. sendBuffer.append((bool)true);
  1618. accessedTime->serialize(sendBuffer);
  1619. }
  1620. else
  1621. sendBuffer.append((bool)false);
  1622. sendRemoteCommand(sendBuffer, replyBuffer);
  1623. bool ok;
  1624. replyBuffer.read(ok);
  1625. return ok;
  1626. }
  1627. fileBool isDirectory()
  1628. {
  1629. MemoryBuffer sendBuffer;
  1630. initSendBuffer(sendBuffer);
  1631. MemoryBuffer replyBuffer;
  1632. sendBuffer.append((RemoteFileCommandType)RFCisdirectory).append(filename);
  1633. sendRemoteCommand(sendBuffer, replyBuffer);
  1634. unsigned ret;
  1635. replyBuffer.read(ret);
  1636. return (fileBool)ret;
  1637. }
  1638. fileBool isFile()
  1639. {
  1640. MemoryBuffer sendBuffer;
  1641. initSendBuffer(sendBuffer);
  1642. MemoryBuffer replyBuffer;
  1643. sendBuffer.append((RemoteFileCommandType)RFCisfile).append(filename);
  1644. sendRemoteCommand(sendBuffer, replyBuffer);
  1645. unsigned ret;
  1646. replyBuffer.read(ret);
  1647. return (fileBool)ret;
  1648. }
  1649. fileBool isReadOnly()
  1650. {
  1651. MemoryBuffer sendBuffer;
  1652. initSendBuffer(sendBuffer);
  1653. MemoryBuffer replyBuffer;
  1654. sendBuffer.append((RemoteFileCommandType)RFCisreadonly).append(filename);
  1655. sendRemoteCommand(sendBuffer, replyBuffer);
  1656. unsigned ret;
  1657. replyBuffer.read(ret);
  1658. return (fileBool)ret;
  1659. }
  1660. IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone);
  1661. IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
  1662. IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported
  1663. const char * queryFilename()
  1664. {
  1665. if (remotefilename.isEmpty()) {
  1666. RemoteFilename rfn;
  1667. rfn.setPath(ep,filename);
  1668. StringBuffer path;
  1669. rfn.getRemotePath(path);
  1670. remotefilename.set(path);
  1671. }
  1672. return remotefilename.get();
  1673. }
  1674. void resetLocalFilename(const char *name)
  1675. {
  1676. remotefilename.clear();
  1677. filename.set(name);
  1678. }
  1679. bool remove()
  1680. {
  1681. MemoryBuffer sendBuffer;
  1682. initSendBuffer(sendBuffer);
  1683. MemoryBuffer replyBuffer;
  1684. sendBuffer.append((RemoteFileCommandType)RFCremove).append(filename);
  1685. sendRemoteCommand(sendBuffer, replyBuffer);
  1686. bool ok;
  1687. replyBuffer.read(ok);
  1688. return ok;
  1689. }
  1690. void rename(const char *newname)
  1691. {
  1692. // currently ignores directory on newname (in future versions newname will be required to be tail only and not full path)
  1693. StringBuffer path;
  1694. splitDirTail(filename,path);
  1695. StringBuffer newdir;
  1696. path.append(splitDirTail(newname,newdir));
  1697. if (newdir.length()&&(strcmp(newdir.str(),path.str())!=0))
  1698. WARNLOG("CRemoteFile::rename passed full path '%s' that may not to match original directory '%s'",newname,path.str());
  1699. MemoryBuffer sendBuffer;
  1700. initSendBuffer(sendBuffer);
  1701. MemoryBuffer replyBuffer;
  1702. sendBuffer.append((RemoteFileCommandType)RFCrename).append(filename).append(path);
  1703. sendRemoteCommand(sendBuffer, replyBuffer);
  1704. filename.set(path);
  1705. remotefilename.clear();
  1706. }
  1707. void move(const char *newname)
  1708. {
  1709. // like rename except between directories
  1710. // first create replote path
  1711. if (!newname||!*newname)
  1712. return;
  1713. RemoteFilename destrfn;
  1714. if (isPathSepChar(newname[0])&&isPathSepChar(newname[1])) {
  1715. destrfn.setRemotePath(newname);
  1716. if (!destrfn.queryEndpoint().ipequals(ep)) {
  1717. StringBuffer msg;
  1718. msg.appendf("IFile::move %s to %s, destination node must match source node", queryFilename(), newname);
  1719. throw createDafsException(RFSERR_MoveFailed,msg.str());
  1720. }
  1721. }
  1722. else
  1723. destrfn.setPath(ep,newname);
  1724. StringBuffer dest;
  1725. newname = destrfn.getLocalPath(dest).str();
  1726. MemoryBuffer sendBuffer;
  1727. initSendBuffer(sendBuffer);
  1728. MemoryBuffer replyBuffer;
  1729. StringBuffer path;
  1730. splitDirTail(filename,path);
  1731. StringBuffer newdir;
  1732. const char *newtail = splitDirTail(newname,newdir);
  1733. if (strcmp(newdir.str(),path.str())==0) {
  1734. path.append(newtail);
  1735. newname = path;
  1736. sendBuffer.append((RemoteFileCommandType)RFCrename); // use rename if we can (supported on older dafilesrv)
  1737. }
  1738. else
  1739. sendBuffer.append((RemoteFileCommandType)RFCmove);
  1740. sendBuffer.append(filename).append(newname);
  1741. sendRemoteCommand(sendBuffer, replyBuffer);
  1742. filename.set(newname);
  1743. remotefilename.clear();
  1744. }
  1745. void setReadOnly(bool set)
  1746. {
  1747. MemoryBuffer sendBuffer;
  1748. initSendBuffer(sendBuffer);
  1749. MemoryBuffer replyBuffer;
  1750. sendBuffer.append((RemoteFileCommandType)RFCsetreadonly).append(filename).append(set);
  1751. sendRemoteCommand(sendBuffer, replyBuffer);
  1752. }
  1753. void setFilePermissions(unsigned fPerms)
  1754. {
  1755. MemoryBuffer sendBuffer;
  1756. initSendBuffer(sendBuffer);
  1757. MemoryBuffer replyBuffer;
  1758. sendBuffer.append((RemoteFileCommandType)RFCsetfileperms).append(filename).append(fPerms);
  1759. try
  1760. {
  1761. sendRemoteCommand(sendBuffer, replyBuffer);
  1762. }
  1763. catch (IDAFS_Exception *e)
  1764. {
  1765. if (e->errorCode() == RFSERR_InvalidCommand)
  1766. {
  1767. WARNLOG("umask setFilePermissions (0%o) not supported on remote server", fPerms);
  1768. e->Release();
  1769. }
  1770. else
  1771. throw;
  1772. }
  1773. }
  1774. offset_t size()
  1775. {
  1776. #if 1 // faster method (consistant with IFile)
  1777. // do this by using dir call (could be improved with new function but this not *too* bad)
  1778. if (isSpecialPath(filename))
  1779. return 0; // queries deemed to always exist (though don't know size).
  1780. // if needed to get size I guess could use IFileIO method and cache (bit of pain though)
  1781. StringBuffer dir;
  1782. const char *tail = splitDirTail(filename,dir);
  1783. if (!dir.length())
  1784. return false;
  1785. MemoryBuffer sendBuffer;
  1786. initSendBuffer(sendBuffer);
  1787. MemoryBuffer replyBuffer;
  1788. bool includedirs = true;
  1789. bool sub=false;
  1790. {
  1791. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  1792. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1793. CriticalBlock block(*crit);
  1794. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  1795. sendRemoteCommand(sendBuffer, replyBuffer);
  1796. }
  1797. // now should be 0 or 1 files returned
  1798. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  1799. iter->appendBuf(replyBuffer);
  1800. if (!iter->first())
  1801. return (offset_t)-1;
  1802. return (offset_t) iter->getFileSize();
  1803. #else
  1804. IFileIO * io = open(IFOread);
  1805. offset_t length = (offset_t)-1;
  1806. if (io)
  1807. {
  1808. length = io->size();
  1809. io->Release();
  1810. }
  1811. return length;
  1812. #endif
  1813. }
  1814. bool createDirectory()
  1815. {
  1816. MemoryBuffer sendBuffer;
  1817. initSendBuffer(sendBuffer);
  1818. MemoryBuffer replyBuffer;
  1819. sendBuffer.append((RemoteFileCommandType)RFCcreatedir).append(filename);
  1820. sendRemoteCommand(sendBuffer, replyBuffer);
  1821. bool ok;
  1822. replyBuffer.read(ok);
  1823. return ok;
  1824. }
  1825. virtual IDirectoryIterator *directoryFiles(const char *mask,bool sub,bool includedirs)
  1826. {
  1827. if (mask&&!*mask)
  1828. return createDirectoryIterator("",""); // NULL iterator
  1829. CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename);
  1830. byte stream = (sub || !mask || containsFileWildcard(mask)) ? 1 : 0; // no point in streaming if mask without wildcards or sub, as will only be <= 1 match.
  1831. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1832. CriticalBlock block(*crit);
  1833. for (;;) {
  1834. MemoryBuffer sendBuffer;
  1835. initSendBuffer(sendBuffer);
  1836. MemoryBuffer replyBuffer;
  1837. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(filename).append(mask?mask:"").append(includedirs).append(sub).append(stream);
  1838. sendRemoteCommand(sendBuffer, replyBuffer);
  1839. if (ret->appendBuf(replyBuffer))
  1840. break;
  1841. stream = 2; // NB: will never get here if streaming was off (if stream==0 above)
  1842. }
  1843. return ret;
  1844. }
  1845. IDirectoryDifferenceIterator *monitorDirectory(
  1846. IDirectoryIterator *prev=NULL, // in (NULL means use current as baseline)
  1847. const char *mask=NULL,
  1848. bool sub=false,
  1849. bool includedirs=false,
  1850. unsigned checkinterval=60*1000,
  1851. unsigned timeout=(unsigned)-1,
  1852. Semaphore *abortsem=NULL) // returns NULL if timed out
  1853. {
  1854. // abortsem not yet supported
  1855. MemoryBuffer sendBuffer;
  1856. initSendBuffer(sendBuffer);
  1857. MemoryBuffer replyBuffer;
  1858. sendBuffer.append((RemoteFileCommandType)RFCmonitordir).append(filename).append(mask?mask:"").append(includedirs).append(sub);
  1859. sendBuffer.append(checkinterval).append(timeout);
  1860. __int64 cancelid=0; // not yet used
  1861. sendBuffer.append(cancelid);
  1862. byte isprev=(prev!=NULL)?1:0;
  1863. sendBuffer.append(isprev);
  1864. if (prev)
  1865. CRemoteDirectoryIterator::serialize(sendBuffer,prev,0,true);
  1866. sendRemoteCommand(sendBuffer, replyBuffer);
  1867. byte status;
  1868. replyBuffer.read(status);
  1869. if (status==1) {
  1870. CRemoteDirectoryIterator *iter = new CRemoteDirectoryIterator(ep, filename);
  1871. iter->appendBuf(replyBuffer);
  1872. return iter;
  1873. }
  1874. return NULL;
  1875. }
  1876. bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
  1877. {
  1878. // do this by using dir call (could be improved with new function but this not *too* bad)
  1879. StringBuffer dir;
  1880. const char *tail = splitDirTail(filename,dir);
  1881. if (!dir.length())
  1882. return false;
  1883. MemoryBuffer sendBuffer;
  1884. initSendBuffer(sendBuffer);
  1885. MemoryBuffer replyBuffer;
  1886. bool includedirs = true;
  1887. bool sub=false;
  1888. {
  1889. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  1890. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1891. CriticalBlock block(*crit);
  1892. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  1893. sendRemoteCommand(sendBuffer, replyBuffer);
  1894. }
  1895. // now should be 0 or 1 files returned
  1896. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  1897. iter->appendBuf(replyBuffer);
  1898. if (!iter->first())
  1899. return false;
  1900. isdir = iter->isDir();
  1901. size = (offset_t) iter->getFileSize();
  1902. iter->getModifiedTime(modtime);
  1903. return true;
  1904. }
  1905. bool setCompression(bool set)
  1906. {
  1907. assertex(!"Need to implement compress()");
  1908. return false;
  1909. }
  1910. offset_t compressedSize()
  1911. {
  1912. assertex(!"Need to implement actualSize()");
  1913. return (offset_t)-1;
  1914. }
  1915. void serialize(MemoryBuffer &tgt)
  1916. {
  1917. throwUnexpected();
  1918. }
  1919. void deserialize(MemoryBuffer &src)
  1920. {
  1921. throwUnexpected();
  1922. }
  1923. unsigned getCRC()
  1924. {
  1925. MemoryBuffer sendBuffer;
  1926. initSendBuffer(sendBuffer);
  1927. MemoryBuffer replyBuffer;
  1928. sendBuffer.append((RemoteFileCommandType)RFCgetcrc).append(filename);
  1929. sendRemoteCommand(sendBuffer, replyBuffer, true, true);
  1930. unsigned crc;
  1931. replyBuffer.read(crc);
  1932. return crc;
  1933. }
  1934. void setCreateFlags(unsigned short cflags)
  1935. {
  1936. flags &= 0xffff;
  1937. flags |= ((unsigned)cflags<<16);
  1938. }
  1939. unsigned short getCreateFlags()
  1940. {
  1941. return (unsigned short)(flags>>16);
  1942. }
  1943. void setShareMode(IFSHmode shmode)
  1944. {
  1945. flags &= ~(IFSHfull|IFSHread);
  1946. flags |= (unsigned)(shmode&(IFSHfull|IFSHread));
  1947. }
  1948. unsigned short getShareMode()
  1949. {
  1950. return (unsigned short)(flags&0xffff);
  1951. }
  1952. void remoteExtractBlobElements(const char * prefix, ExtractedBlobArray & extracted)
  1953. {
  1954. MemoryBuffer sendBuffer;
  1955. initSendBuffer(sendBuffer);
  1956. sendBuffer.append((RemoteFileCommandType)RFCextractblobelements).append(prefix).append(filename);
  1957. MemoryBuffer replyBuffer;
  1958. sendRemoteCommand(sendBuffer, replyBuffer, true, true); // handles error code
  1959. unsigned n;
  1960. replyBuffer.read(n);
  1961. for (unsigned i=0;i<n;i++) {
  1962. ExtractedBlobInfo *item = new ExtractedBlobInfo;
  1963. item->deserialize(replyBuffer);
  1964. extracted.append(*item);
  1965. }
  1966. }
  1967. bool copySectionAsync(const char *uuid,const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout)
  1968. {
  1969. // now if we get here is it can be assumed the source file is local to where we send the command
  1970. StringBuffer tos;
  1971. dest.getRemotePath(tos);
  1972. MemoryBuffer sendBuffer;
  1973. initSendBuffer(sendBuffer);
  1974. MemoryBuffer replyBuffer;
  1975. sendBuffer.append((RemoteFileCommandType)RFCcopysection).append(uuid).append(queryLocalName()).append(tos).append(toOfs).append(fromOfs).append(size).append(timeout);
  1976. sendRemoteCommand(sendBuffer, replyBuffer);
  1977. unsigned status;
  1978. replyBuffer.read(status);
  1979. if (progress) {
  1980. offset_t sizeDone;
  1981. offset_t totalSize;
  1982. replyBuffer.read(sizeDone).read(totalSize);
  1983. progress->onProgress(sizeDone,totalSize);
  1984. }
  1985. return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception
  1986. }
  1987. void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags=CFnone)
  1988. {
  1989. StringBuffer uuid;
  1990. genUUID(uuid,true);
  1991. unsigned timeout = 60*1000; // check every minute
  1992. while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout));
  1993. }
  1994. void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
  1995. virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write)
  1996. {
  1997. return NULL;
  1998. }
  1999. };
  2000. void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket)
  2001. {
  2002. CriticalBlock block(CConnectionTable::crit);
  2003. if (ConnectionTable)
  2004. ConnectionTable->addLink(ep,socket);
  2005. }
  2006. IFile * createRemoteFile(SocketEndpoint &ep, const char * filename)
  2007. {
  2008. IFile *ret = createFileLocalMount(ep,filename);
  2009. if (ret)
  2010. return ret;
  2011. return new CRemoteFile(ep, filename);
  2012. }
  2013. void clientDisconnectRemoteFile(IFile *file)
  2014. {
  2015. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  2016. if (cfile)
  2017. cfile->disconnect();
  2018. }
  2019. bool clientResetFilename(IFile *file, const char *newname) // returns false if not remote
  2020. {
  2021. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  2022. if (!cfile)
  2023. return false;
  2024. cfile->resetLocalFilename(newname);
  2025. return true;
  2026. }
  2027. extern bool clientAsyncCopyFileSection(const char *uuid,
  2028. IFile *from, // expected to be remote
  2029. RemoteFilename &to,
  2030. offset_t toOfs, // -1 created file and copies to start
  2031. offset_t fromOfs,
  2032. offset_t size,
  2033. ICopyFileProgress *progress,
  2034. unsigned timeout) // returns true when done
  2035. {
  2036. CRemoteFile *cfile = QUERYINTERFACE(from,CRemoteFile);
  2037. if (!cfile) {
  2038. // local - do sync
  2039. from->copySection(to,toOfs,fromOfs,size,progress);
  2040. return true;
  2041. }
  2042. return cfile->copySectionAsync(uuid,to,toOfs,fromOfs, size, progress, timeout);
  2043. }
  2044. //---------------------------------------------------------------------------
  2045. class CRemoteFileIO : implements IFileIO, public CInterface
  2046. {
  2047. protected:
  2048. Linked<CRemoteFile> parent;
  2049. RemoteFileIOHandle handle;
  2050. std::atomic<cycle_t> ioReadCycles;
  2051. std::atomic<cycle_t> ioWriteCycles;
  2052. std::atomic<__uint64> ioReadBytes;
  2053. std::atomic<__uint64> ioWriteBytes;
  2054. std::atomic<__uint64> ioReads;
  2055. std::atomic<__uint64> ioWrites;
  2056. std::atomic<unsigned> ioRetries;
  2057. IFOmode mode;
  2058. compatIFSHmode compatmode;
  2059. IFEflags extraFlags;
  2060. bool disconnectonexit;
  2061. public:
  2062. IMPLEMENT_IINTERFACE
  2063. CRemoteFileIO(CRemoteFile *_parent)
  2064. : parent(_parent), ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0), ioRetries(0)
  2065. {
  2066. handle = 0;
  2067. disconnectonexit = false;
  2068. }
  2069. ~CRemoteFileIO()
  2070. {
  2071. if (handle) {
  2072. try {
  2073. close();
  2074. }
  2075. catch (IException *e) {
  2076. StringBuffer s;
  2077. e->errorMessage(s);
  2078. WARNLOG("CRemoteFileIO close file: %s",s.str());
  2079. e->Release();
  2080. }
  2081. }
  2082. if (disconnectonexit)
  2083. parent->disconnect();
  2084. }
  2085. void close()
  2086. {
  2087. if (handle) {
  2088. try {
  2089. MemoryBuffer sendBuffer;
  2090. initSendBuffer(sendBuffer);
  2091. sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
  2092. parent->sendRemoteCommand(sendBuffer,false);
  2093. }
  2094. catch (IDAFS_Exception *e) {
  2095. if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
  2096. throw;
  2097. e->Release();
  2098. }
  2099. handle = 0;
  2100. }
  2101. }
  2102. RemoteFileIOHandle getHandle() const { return handle; }
  2103. bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
  2104. {
  2105. MemoryBuffer sendBuffer;
  2106. initSendBuffer(sendBuffer);
  2107. MemoryBuffer replyBuffer;
  2108. const char *localname = parent->queryLocalName();
  2109. localname = skipSpecialPath(localname);
  2110. // also send _extraFlags
  2111. // then also send sMode, cFlags
  2112. unsigned short sMode = parent->getShareMode();
  2113. unsigned short cFlags = parent->getCreateFlags();
  2114. switch ((compatIFSHmode)_compatmode)
  2115. {
  2116. case compatIFSHnone:
  2117. sMode = IFSHnone;
  2118. break;
  2119. case compatIFSHread:
  2120. sMode = IFSHread;
  2121. break;
  2122. case compatIFSHwrite:
  2123. sMode = IFSHfull;
  2124. break;
  2125. case compatIFSHall:
  2126. sMode = IFSHfull;
  2127. break;
  2128. }
  2129. sendBuffer.append((RemoteFileCommandType)RFCopenIO).append(localname).append((byte)_mode).append((byte)_compatmode).append((byte)_extraFlags).append(sMode).append(cFlags);
  2130. parent->sendRemoteCommand(sendBuffer, replyBuffer);
  2131. replyBuffer.read(handle);
  2132. if (!handle)
  2133. return false;
  2134. switch (_mode) {
  2135. case IFOcreate:
  2136. mode = IFOwrite;
  2137. break;
  2138. case IFOcreaterw:
  2139. mode = IFOreadwrite;
  2140. break;
  2141. default:
  2142. mode = _mode;
  2143. break;
  2144. }
  2145. compatmode = _compatmode;
  2146. extraFlags = _extraFlags;
  2147. return true;
  2148. }
  2149. bool reopen()
  2150. {
  2151. StringBuffer s;
  2152. PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str());
  2153. if (open(mode,compatmode,extraFlags)) {
  2154. return true;
  2155. }
  2156. return false;
  2157. }
  2158. offset_t size()
  2159. {
  2160. MemoryBuffer sendBuffer;
  2161. initSendBuffer(sendBuffer);
  2162. MemoryBuffer replyBuffer;
  2163. sendBuffer.append((RemoteFileCommandType)RFCsize).append(handle);
  2164. parent->sendRemoteCommand(sendBuffer, replyBuffer, false);
  2165. // Retry using reopen TBD
  2166. offset_t ret;
  2167. replyBuffer.read(ret);
  2168. return ret;
  2169. }
  2170. virtual unsigned __int64 getStatistic(StatisticKind kind)
  2171. {
  2172. switch (kind)
  2173. {
  2174. case StCycleDiskReadIOCycles:
  2175. return ioReadCycles.load(std::memory_order_relaxed);
  2176. case StCycleDiskWriteIOCycles:
  2177. return ioWriteCycles.load(std::memory_order_relaxed);
  2178. case StTimeDiskReadIO:
  2179. return cycle_to_nanosec(ioReadCycles.load(std::memory_order_relaxed));
  2180. case StTimeDiskWriteIO:
  2181. return cycle_to_nanosec(ioWriteCycles.load(std::memory_order_relaxed));
  2182. case StSizeDiskRead:
  2183. return ioReadBytes.load(std::memory_order_relaxed);
  2184. case StSizeDiskWrite:
  2185. return ioWriteBytes.load(std::memory_order_relaxed);
  2186. case StNumDiskReads:
  2187. return ioReads.load(std::memory_order_relaxed);
  2188. case StNumDiskWrites:
  2189. return ioWrites.load(std::memory_order_relaxed);
  2190. case StNumDiskRetries:
  2191. return ioRetries.load(std::memory_order_relaxed);
  2192. }
  2193. return 0;
  2194. }
  2195. size32_t read(offset_t pos, size32_t len, void * data)
  2196. {
  2197. size32_t got;
  2198. MemoryBuffer replyBuffer;
  2199. CCycleTimer timer;
  2200. const void *b;
  2201. try
  2202. {
  2203. b = doRead(pos,len,replyBuffer,got,data);
  2204. }
  2205. catch (...)
  2206. {
  2207. ioReadCycles.fetch_add(timer.elapsedCycles());
  2208. throw;
  2209. }
  2210. ioReadCycles.fetch_add(timer.elapsedCycles());
  2211. ioReadBytes.fetch_add(got);
  2212. ++ioReads;
  2213. if (b!=data)
  2214. memcpy(data,b,got);
  2215. return got;
  2216. }
  2217. virtual void flush()
  2218. {
  2219. }
  2220. const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
  2221. {
  2222. unsigned tries=0;
  2223. for (;;) {
  2224. try {
  2225. MemoryBuffer sendBuffer;
  2226. initSendBuffer(sendBuffer);
  2227. replyBuffer.clear();
  2228. sendBuffer.append((RemoteFileCommandType)RFCread).append(handle).append(pos).append(len);
  2229. parent->sendRemoteCommand(sendBuffer, replyBuffer,false);
  2230. // kludge dafilesrv versions <= 1.5e don't return error correctly
  2231. if (replyBuffer.length()>len+sizeof(size32_t)+sizeof(unsigned)) {
  2232. size32_t save = replyBuffer.getPos();
  2233. replyBuffer.reset(len+sizeof(size32_t)+sizeof(unsigned));
  2234. unsigned errCode;
  2235. replyBuffer.read(errCode);
  2236. if (errCode) {
  2237. StringBuffer msg;
  2238. parent->ep.getUrlStr(msg.append('[')).append("] ");
  2239. if (replyBuffer.getPos()<replyBuffer.length()) {
  2240. StringAttr s;
  2241. replyBuffer.read(s);
  2242. msg.append(s);
  2243. }
  2244. else
  2245. msg.append("ERROR #").append(errCode);
  2246. throw createDafsException(errCode, msg.str());
  2247. }
  2248. else
  2249. replyBuffer.reset(save);
  2250. }
  2251. replyBuffer.read(got);
  2252. if ((got>replyBuffer.remaining())||(got>len)) {
  2253. PROGLOG("Read beyond buffer %d,%d,%d",got,replyBuffer.remaining(),len);
  2254. throw createDafsException(RFSERR_ReadFailed, "Read beyond buffer");
  2255. }
  2256. return replyBuffer.readDirect(got);
  2257. }
  2258. catch (IJSOCK_Exception *e) {
  2259. EXCLOG(e,"CRemoteFileIO::read");
  2260. if (++tries > 3)
  2261. {
  2262. ioRetries.fetch_add(tries);
  2263. throw;
  2264. }
  2265. WARNLOG("Retrying read of %s (%d)",parent->queryLocalName(),tries);
  2266. Owned<IException> exc = e;
  2267. if (!reopen())
  2268. {
  2269. ioRetries.fetch_add(tries);
  2270. throw exc.getClear();
  2271. }
  2272. }
  2273. }
  2274. if (tries)
  2275. ioRetries.fetch_add(tries);
  2276. got = 0;
  2277. return NULL;
  2278. }
  2279. size32_t write(offset_t pos, size32_t len, const void * data)
  2280. {
  2281. unsigned tries=0;
  2282. size32_t ret = 0;
  2283. CCycleTimer timer;
  2284. for (;;) {
  2285. try {
  2286. MemoryBuffer replyBuffer;
  2287. MemoryBuffer sendBuffer;
  2288. initSendBuffer(sendBuffer);
  2289. sendBuffer.append((RemoteFileCommandType)RFCwrite).append(handle).append(pos).append(len).append(len, data);
  2290. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  2291. replyBuffer.read(ret);
  2292. break;
  2293. }
  2294. catch (IJSOCK_Exception *e) {
  2295. EXCLOG(e,"CRemoteFileIO::write");
  2296. if (++tries > 3)
  2297. {
  2298. ioRetries.fetch_add(tries);
  2299. ioWriteCycles.fetch_add(timer.elapsedCycles());
  2300. throw;
  2301. }
  2302. WARNLOG("Retrying write(%" I64F "d,%d) of %s (%d)",pos,len,parent->queryLocalName(),tries);
  2303. Owned<IException> exc = e;
  2304. if (!reopen())
  2305. {
  2306. ioRetries.fetch_add(tries);
  2307. ioWriteCycles.fetch_add(timer.elapsedCycles());
  2308. throw exc.getClear();
  2309. }
  2310. }
  2311. }
  2312. if (tries)
  2313. ioRetries.fetch_add(tries);
  2314. ioWriteCycles.fetch_add(timer.elapsedCycles());
  2315. ioWriteBytes.fetch_add(ret);
  2316. ++ioWrites;
  2317. if ((ret==(size32_t)-1) || (ret < len))
  2318. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?");
  2319. return ret;
  2320. }
  2321. offset_t appendFile(IFile *file,offset_t pos,offset_t len)
  2322. {
  2323. MemoryBuffer sendBuffer;
  2324. initSendBuffer(sendBuffer);
  2325. MemoryBuffer replyBuffer;
  2326. const char * fname = file->queryFilename();
  2327. sendBuffer.append((RemoteFileCommandType)RFCappend).append(handle).append(fname).append(pos).append(len);
  2328. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry not safe
  2329. offset_t ret;
  2330. replyBuffer.read(ret);
  2331. if ((ret==(offset_t)-1) || ((len != ((offset_t)-1)) && (ret < len)))
  2332. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"append failed, disk full?"); // though could be file missing TBD
  2333. return ret;
  2334. }
  2335. void setSize(offset_t size)
  2336. {
  2337. MemoryBuffer sendBuffer;
  2338. initSendBuffer(sendBuffer);
  2339. MemoryBuffer replyBuffer;
  2340. sendBuffer.append((RemoteFileCommandType)RFCsetsize).append(handle).append(size);
  2341. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  2342. // retry using reopen TBD
  2343. }
  2344. void setDisconnectOnExit(bool set) { disconnectonexit = set; }
  2345. void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false, bool handleErrCode=true)
  2346. {
  2347. parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy, handleErrCode);
  2348. }
  2349. };
  2350. void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set)
  2351. {
  2352. CRemoteFileIO *cfileio = QUERYINTERFACE(fileio,CRemoteFileIO);
  2353. if (cfileio)
  2354. cfileio->setDisconnectOnExit(set);
  2355. }
  2356. IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags)
  2357. {
  2358. assertex(((unsigned)shmode&0xffffffc7)==0);
  2359. compatIFSHmode compatmode;
  2360. unsigned fileflags = (flags>>16) & (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH);
  2361. if (fileflags&S_IXUSR) // this is bit hit and miss but backward compatible
  2362. compatmode = compatIFSHexec;
  2363. else if (fileflags&(S_IWGRP|S_IWOTH))
  2364. compatmode = compatIFSHall;
  2365. else if (shmode&IFSHfull)
  2366. compatmode = compatIFSHwrite;
  2367. else if (((shmode&(IFSHread|IFSHfull))==0) && ((fileflags&(S_IRGRP|S_IROTH))==0))
  2368. compatmode = compatIFSHnone;
  2369. else
  2370. compatmode = compatIFSHread;
  2371. Owned<CRemoteFileIO> res = new CRemoteFileIO(this);
  2372. if (res->open(mode,compatmode,extraFlags))
  2373. return res.getClear();
  2374. return NULL;
  2375. }
  2376. IFileIO * CRemoteFile::open(IFOmode mode,IFEflags extraFlags)
  2377. {
  2378. return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)),extraFlags);
  2379. }
  2380. //---------------------------------------------------------------------------
  2381. void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags)
  2382. {
  2383. CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile);
  2384. if (dstfile&&!dstfile->queryEp().isLocal()) {
  2385. StringBuffer tmpname;
  2386. Owned<IFile> destf;
  2387. RemoteFilename dest;
  2388. if (usetmp) {
  2389. makeTempCopyName(tmpname,dstfile->queryLocalName());
  2390. dest.setPath(dstfile->queryEp(),tmpname.str());
  2391. }
  2392. else
  2393. dest.setPath(dstfile->queryEp(),dstfile->queryLocalName());
  2394. destf.setown(createIFile(dest));
  2395. try {
  2396. // following may fail if new dafilesrv not deployed on src
  2397. copySection(dest,(offset_t)-1,0,(offset_t)-1,progress,copyFlags);
  2398. if (usetmp) {
  2399. StringAttr tail(pathTail(dstfile->queryLocalName()));
  2400. dstfile->remove();
  2401. destf->rename(tail);
  2402. }
  2403. return;
  2404. }
  2405. catch (IException *e)
  2406. {
  2407. StringBuffer s;
  2408. s.appendf("Remote File Copy (%d): ",e->errorCode());
  2409. e->errorMessage(s);
  2410. s.append(", retrying local");
  2411. WARNLOG("%s",s.str());
  2412. e->Release();
  2413. }
  2414. // delete dest
  2415. try {
  2416. destf->remove();
  2417. }
  2418. catch (IException *e)
  2419. {
  2420. EXCLOG(e,"Remote File Copy, Deleting temporary file");
  2421. e->Release();
  2422. }
  2423. }
  2424. // assumption if we get here that source remote, dest local (or equiv)
  2425. class cIntercept: implements ICopyFileIntercept
  2426. {
  2427. MemoryAttr ma;
  2428. MemoryBuffer mb;
  2429. virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz)
  2430. {
  2431. if (ma.length()<sz)
  2432. ma.allocate(sz); // may be not used
  2433. void *buf = ma.bufferBase();
  2434. size32_t got;
  2435. CRemoteFileIO *srcio = QUERYINTERFACE(from,CRemoteFileIO);
  2436. const void *dst;
  2437. if (srcio)
  2438. dst = srcio->doRead(ofs,sz,mb.clear(),got,buf);
  2439. else {
  2440. // shouldn't ever get here if source remote
  2441. got = from->read(ofs, sz, buf);
  2442. dst = buf;
  2443. }
  2444. if (got != 0)
  2445. to->write(ofs, got, dst);
  2446. return got;
  2447. }
  2448. } intercept;
  2449. doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
  2450. }
  2451. /////////////////////////
  2452. ISocket *checkSocketSecure(ISocket *socket)
  2453. {
  2454. if (securitySettings.connectMethod == SSLNone)
  2455. return LINK(socket);
  2456. char pname[256];
  2457. pname[0] = 0;
  2458. int pport = socket->peer_name(pname, sizeof(pname)-1);
  2459. if ( (pport == securitySettings.daFileSrvSSLPort) && (!socket->isSecure()) )
  2460. {
  2461. #ifdef _USE_OPENSSL
  2462. Owned<ISecureSocket> ssock;
  2463. try
  2464. {
  2465. ssock.setown(createSecureSocket(LINK(socket), ClientSocket));
  2466. int status = ssock->secure_connect();
  2467. if (status < 0)
  2468. throw createDafsException(DAFSERR_connection_failed, "Failure to establish secure connection");
  2469. return ssock.getClear();
  2470. }
  2471. catch (IException *e)
  2472. {
  2473. cleanupSocket(ssock);
  2474. ssock.clear();
  2475. cleanupSocket(socket);
  2476. StringBuffer eMsg;
  2477. e->errorMessage(eMsg);
  2478. e->Release();
  2479. throw createDafsException(DAFSERR_connection_failed, eMsg.str());
  2480. }
  2481. #else
  2482. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
  2483. #endif
  2484. }
  2485. return LINK(socket);
  2486. }
  2487. ISocket *connectDafs(SocketEndpoint &ep, unsigned timeoutms)
  2488. {
  2489. Owned<ISocket> socket;
  2490. if ( (securitySettings.connectMethod == SSLNone) || (securitySettings.connectMethod == SSLOnly) )
  2491. {
  2492. socket.setown(ISocket::connect_timeout(ep, timeoutms));
  2493. return checkSocketSecure(socket);
  2494. }
  2495. // SSLFirst or UnsecureFirst ...
  2496. unsigned newtimeout = timeoutms;
  2497. if (newtimeout > 5000)
  2498. newtimeout = 5000;
  2499. int conAttempts = 2;
  2500. while (conAttempts > 0)
  2501. {
  2502. conAttempts--;
  2503. bool connected = false;
  2504. try
  2505. {
  2506. socket.setown(ISocket::connect_timeout(ep, newtimeout));
  2507. connected = true;
  2508. newtimeout = timeoutms;
  2509. }
  2510. catch (IJSOCK_Exception *e)
  2511. {
  2512. if (e->errorCode() == JSOCKERR_connection_failed)
  2513. {
  2514. e->Release();
  2515. if (ep.port == securitySettings.daFileSrvSSLPort)
  2516. ep.port = securitySettings.daFileSrvPort;
  2517. else
  2518. ep.port = securitySettings.daFileSrvSSLPort;
  2519. if (!conAttempts)
  2520. throw;
  2521. }
  2522. else
  2523. throw;
  2524. }
  2525. if (connected)
  2526. {
  2527. if (ep.port == securitySettings.daFileSrvSSLPort)
  2528. {
  2529. try
  2530. {
  2531. return checkSocketSecure(socket);
  2532. }
  2533. catch (IDAFS_Exception *e)
  2534. {
  2535. connected = false;
  2536. if (e->errorCode() == DAFSERR_connection_failed)
  2537. {
  2538. // worth logging to help identify any ssl config issues ...
  2539. StringBuffer errmsg;
  2540. e->errorMessage(errmsg);
  2541. WARNLOG("%s", errmsg.str());
  2542. e->Release();
  2543. ep.port = securitySettings.daFileSrvPort;
  2544. if (!conAttempts)
  2545. throw;
  2546. }
  2547. else
  2548. throw;
  2549. }
  2550. }
  2551. else
  2552. return socket.getClear();
  2553. }
  2554. }
  2555. throw createDafsException(DAFSERR_connection_failed, "Failed to establish connection with DaFileSrv");
  2556. }
  2557. unsigned getRemoteVersion(CRemoteFileIO &remoteFileIO, StringBuffer &ver)
  2558. {
  2559. unsigned ret;
  2560. MemoryBuffer sendBuffer;
  2561. initSendBuffer(sendBuffer);
  2562. sendBuffer.append((RemoteFileCommandType)RFCgetver);
  2563. sendBuffer.append((unsigned)RFCgetver);
  2564. MemoryBuffer replyBuffer;
  2565. try
  2566. {
  2567. remoteFileIO.sendRemoteCommand(sendBuffer, replyBuffer, true, false, false);
  2568. }
  2569. catch (IException *e)
  2570. {
  2571. EXCLOG(e);
  2572. ::Release(e);
  2573. return 0;
  2574. }
  2575. unsigned errCode;
  2576. replyBuffer.read(errCode);
  2577. if (errCode==RFSERR_InvalidCommand)
  2578. {
  2579. ver.append("DS V1.0");
  2580. return 10;
  2581. }
  2582. else if (errCode==0)
  2583. ret = 11;
  2584. else if (errCode<0x10000)
  2585. return 0;
  2586. else
  2587. ret = errCode-0x10000;
  2588. StringAttr vers;
  2589. replyBuffer.read(vers);
  2590. ver.append(vers);
  2591. return ret;
  2592. }
  2593. unsigned getRemoteVersion(ISocket *origSock, StringBuffer &ver)
  2594. {
  2595. // used to have a global critical section here
  2596. if (!origSock)
  2597. return 0;
  2598. Owned<ISocket> socket = checkSocketSecure(origSock);
  2599. unsigned ret;
  2600. MemoryBuffer sendbuf;
  2601. initSendBuffer(sendbuf);
  2602. sendbuf.append((RemoteFileCommandType)RFCgetver);
  2603. sendbuf.append((unsigned)RFCgetver);
  2604. MemoryBuffer reply;
  2605. try {
  2606. sendBuffer(socket, sendbuf);
  2607. receiveBuffer(socket, reply, 1 ,4096);
  2608. unsigned errCode;
  2609. reply.read(errCode);
  2610. if (errCode==RFSERR_InvalidCommand) {
  2611. ver.append("DS V1.0");
  2612. return 10;
  2613. }
  2614. else if (errCode==0)
  2615. ret = 11;
  2616. else if (errCode<0x10000)
  2617. return 0;
  2618. else
  2619. ret = errCode-0x10000;
  2620. }
  2621. catch (IException *e) {
  2622. EXCLOG(e);
  2623. ::Release(e);
  2624. return 0;
  2625. }
  2626. StringAttr vers;
  2627. reply.read(vers);
  2628. ver.append(vers);
  2629. return ret;
  2630. }
  2631. /////////////////////////
  2632. class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
  2633. {
  2634. StringAttr filename;
  2635. Linked<IDelayedFile> delayedFile;
  2636. SegMonitorList segs;
  2637. size32_t rowDataRemaining = 0;
  2638. MemoryBuffer rowDataBuffer;
  2639. MemoryBuffer keyCursorMb; // used for continuation
  2640. unsigned __int64 totalGot = 0;
  2641. size32_t currentSize = 0;
  2642. offset_t currentFpos = 0;
  2643. const byte *currentRow = nullptr;
  2644. bool first = true;
  2645. unsigned __int64 chooseNLimit = 0;
  2646. ConstPointerArray activeBlobs;
  2647. unsigned crc = 0;
  2648. mutable bool hasRemoteSupport = false; // must check 1st
  2649. mutable Owned<IKeyManager> directKM; // failover manager if remote key support is unavailable
  2650. CRemoteFileIO *prepKeySend(MemoryBuffer &sendBuffer, RemoteFileCommandType cmd, bool segmentMonitors)
  2651. {
  2652. Owned<IFileIO> iFileIO = delayedFile->getFileIO();
  2653. if (!iFileIO)
  2654. throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
  2655. Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
  2656. assertex(remoteIO);
  2657. initSendBuffer(sendBuffer);
  2658. size32_t keySize = 0; // backward compatibility - now ignored
  2659. sendBuffer.append(cmd).append(remoteIO->getHandle()).append(filename).append(keySize);
  2660. if (segmentMonitors)
  2661. segs.serialize(sendBuffer);
  2662. return remoteIO.getClear();
  2663. }
  2664. bool remoteSupport() const
  2665. {
  2666. if (hasRemoteSupport)
  2667. return true;
  2668. else if (directKM)
  2669. return false;
  2670. Owned<IFileIO> iFileIO = delayedFile->getFileIO();
  2671. if (!iFileIO)
  2672. throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
  2673. Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
  2674. bool useRemote = nullptr != remoteIO.get();
  2675. if (useRemote)
  2676. {
  2677. StringBuffer verString;
  2678. unsigned ver = getRemoteVersion(*remoteIO, verString);
  2679. if (ver < MIN_KEYFILTSUPPORT_VERSION)
  2680. useRemote = false;
  2681. }
  2682. if (useRemote)
  2683. {
  2684. PROGLOG("Using remote key manager for file: %s", filename.get());
  2685. hasRemoteSupport = true;
  2686. }
  2687. else
  2688. {
  2689. Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
  2690. directKM.setown(createLocalKeyManager(keyIndex, nullptr));
  2691. return false;
  2692. }
  2693. return true;
  2694. }
  2695. unsigned __int64 _checkCount(unsigned __int64 limit)
  2696. {
  2697. MemoryBuffer sendBuffer;
  2698. Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexcount, true);
  2699. sendBuffer.append(limit);
  2700. MemoryBuffer replyBuffer;
  2701. remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
  2702. unsigned __int64 count;
  2703. replyBuffer.read(count);
  2704. return count;
  2705. }
  2706. public:
  2707. CRemoteKeyManager(const char *_filename, unsigned _crc, IDelayedFile *_delayedFile) : filename(_filename), crc(_crc), delayedFile(_delayedFile)
  2708. {
  2709. }
  2710. ~CRemoteKeyManager()
  2711. {
  2712. releaseBlobs();
  2713. }
  2714. // IKeyManager impl.
  2715. virtual void reset(bool crappyHack = false) override
  2716. {
  2717. if (!remoteSupport())
  2718. {
  2719. directKM->reset(crappyHack);
  2720. return;
  2721. }
  2722. rowDataBuffer.clear();
  2723. rowDataRemaining = 0;
  2724. keyCursorMb.clear();
  2725. currentSize = 0;
  2726. currentFpos = 0;
  2727. currentRow = nullptr;
  2728. first = true;
  2729. totalGot = 0;
  2730. }
  2731. virtual void releaseSegmentMonitors() override
  2732. {
  2733. if (!remoteSupport())
  2734. {
  2735. directKM->releaseSegmentMonitors();
  2736. return;
  2737. }
  2738. segs.reset();
  2739. }
  2740. virtual const byte *queryKeyBuffer(offset_t & fpos) override
  2741. {
  2742. if (!remoteSupport())
  2743. return directKM->queryKeyBuffer(fpos);;
  2744. fpos = currentFpos;
  2745. return currentRow;
  2746. }
  2747. virtual offset_t queryFpos() override
  2748. {
  2749. if (!remoteSupport())
  2750. return directKM->queryFpos();
  2751. return currentFpos;
  2752. }
  2753. virtual unsigned queryRecordSize() override
  2754. {
  2755. if (!remoteSupport())
  2756. return directKM->queryRecordSize();
  2757. return currentSize; // this is wrong I think
  2758. }
  2759. virtual size32_t queryRowSize() override
  2760. {
  2761. if (!remoteSupport())
  2762. return directKM->queryRowSize();
  2763. return currentSize;
  2764. }
  2765. virtual unsigned __int64 querySequence() override
  2766. {
  2767. if (!remoteSupport())
  2768. return directKM->querySequence();
  2769. UNIMPLEMENTED;
  2770. }
  2771. virtual bool lookup(bool exact) override
  2772. {
  2773. if (!remoteSupport())
  2774. return directKM->lookup(exact);
  2775. while (true)
  2776. {
  2777. if (rowDataRemaining)
  2778. {
  2779. rowDataBuffer.read(currentFpos);
  2780. rowDataBuffer.read(currentSize);
  2781. currentRow = rowDataBuffer.readDirect(currentSize);
  2782. rowDataRemaining -= sizeof(currentFpos) + sizeof(currentSize) + currentSize;
  2783. return true;
  2784. }
  2785. else
  2786. {
  2787. if (!first && (0 == keyCursorMb.length())) // No keyCursor implies there is nothing more to fetch
  2788. return false;
  2789. unsigned maxRecs = 0;
  2790. if (chooseNLimit)
  2791. {
  2792. if (totalGot == chooseNLimit)
  2793. break;
  2794. unsigned __int64 max = chooseNLimit-totalGot;
  2795. if (max > UINT_MAX)
  2796. maxRecs = UINT_MAX;
  2797. else
  2798. maxRecs = (unsigned)max;
  2799. }
  2800. MemoryBuffer sendBuffer;
  2801. Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindex, true);
  2802. sendBuffer.append(first).append(maxRecs);
  2803. if (first)
  2804. first = false;
  2805. else
  2806. {
  2807. dbgassertex(keyCursorMb.length());
  2808. sendBuffer.append(keyCursorMb);
  2809. }
  2810. rowDataBuffer.clear();
  2811. remoteIO->sendRemoteCommand(sendBuffer, rowDataBuffer);
  2812. unsigned recsGot;
  2813. rowDataBuffer.read(recsGot);
  2814. if (0 == recsGot)
  2815. {
  2816. keyCursorMb.clear(); // signals no more data if called again.
  2817. break; // end
  2818. }
  2819. totalGot += recsGot;
  2820. rowDataBuffer.read(rowDataRemaining);
  2821. unsigned pos = rowDataBuffer.getPos(); // start of row data
  2822. const void *rowData = rowDataBuffer.readDirect(rowDataRemaining);
  2823. size32_t keyCursorSz;
  2824. rowDataBuffer.read(keyCursorSz);
  2825. keyCursorMb.clear();
  2826. if (keyCursorSz)
  2827. keyCursorMb.append(keyCursorSz, rowDataBuffer.readDirect(keyCursorSz));
  2828. rowDataBuffer.reset(pos); // reposition to start of row data
  2829. }
  2830. }
  2831. return false;
  2832. }
  2833. virtual unsigned __int64 getCount() override
  2834. {
  2835. if (!remoteSupport())
  2836. return directKM->getCount();
  2837. return _checkCount((unsigned __int64)-1);
  2838. }
  2839. virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) override
  2840. {
  2841. if (!remoteSupport())
  2842. return directKM->getCurrentRangeCount(groupSegCount);
  2843. UNIMPLEMENTED;
  2844. }
  2845. virtual bool nextRange(unsigned groupSegCount) override
  2846. {
  2847. if (!remoteSupport())
  2848. return directKM->nextRange(groupSegCount);
  2849. UNIMPLEMENTED;
  2850. }
  2851. virtual void setKey(IKeyIndexBase * _key) override
  2852. {
  2853. if (!remoteSupport())
  2854. {
  2855. directKM->setKey(_key);
  2856. return;
  2857. }
  2858. UNIMPLEMENTED;
  2859. }
  2860. virtual void setChooseNLimit(unsigned __int64 _chooseNLimit) override
  2861. {
  2862. if (!remoteSupport())
  2863. {
  2864. directKM->setChooseNLimit(_chooseNLimit);
  2865. return;
  2866. }
  2867. chooseNLimit = _chooseNLimit;
  2868. }
  2869. virtual unsigned __int64 checkCount(unsigned __int64 limit) override
  2870. {
  2871. if (!remoteSupport())
  2872. directKM->checkCount(limit);
  2873. return _checkCount(limit);
  2874. }
  2875. virtual void serializeCursorPos(MemoryBuffer &mb) override
  2876. {
  2877. if (!remoteSupport())
  2878. {
  2879. directKM->serializeCursorPos(mb);
  2880. return;
  2881. }
  2882. UNIMPLEMENTED;
  2883. }
  2884. virtual void deserializeCursorPos(MemoryBuffer &mb) override
  2885. {
  2886. if (!remoteSupport())
  2887. {
  2888. directKM->deserializeCursorPos(mb);
  2889. return;
  2890. }
  2891. UNIMPLEMENTED;
  2892. }
  2893. virtual unsigned querySeeks() const override
  2894. {
  2895. if (!remoteSupport())
  2896. return directKM->querySeeks();
  2897. return 0;
  2898. }
  2899. virtual unsigned queryScans() const override
  2900. {
  2901. if (!remoteSupport())
  2902. return directKM->queryScans();
  2903. return 0;
  2904. }
  2905. virtual unsigned querySkips() const override
  2906. {
  2907. if (!remoteSupport())
  2908. return directKM->querySkips();
  2909. return 0;
  2910. }
  2911. virtual unsigned queryNullSkips() const override
  2912. {
  2913. if (!remoteSupport())
  2914. return directKM->queryNullSkips();
  2915. return 0;
  2916. }
  2917. virtual const byte *loadBlob(unsigned __int64 blobId, size32_t &blobSize) override
  2918. {
  2919. if (!remoteSupport())
  2920. return directKM->loadBlob(blobId, blobSize);
  2921. MemoryBuffer sendBuffer;
  2922. Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexblob, false);
  2923. sendBuffer.append(blobId);
  2924. MemoryBuffer replyBuffer;
  2925. remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
  2926. replyBuffer.read(blobSize);
  2927. const byte *blobData = replyBuffer.readDirect(blobSize);
  2928. activeBlobs.append(replyBuffer.detach()); // NB: don't need to retain size, but keep sz+data to avoid copy
  2929. return blobData;
  2930. }
  2931. virtual void releaseBlobs() override
  2932. {
  2933. if (!remoteSupport())
  2934. return directKM->releaseBlobs();
  2935. ForEachItemIn(idx, activeBlobs)
  2936. {
  2937. free((void *) activeBlobs.item(idx));
  2938. }
  2939. activeBlobs.kill();
  2940. }
  2941. virtual void resetCounts() override
  2942. {
  2943. if (!remoteSupport())
  2944. {
  2945. directKM->resetCounts();
  2946. return;
  2947. }
  2948. UNIMPLEMENTED;
  2949. }
  2950. virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) override
  2951. {
  2952. if (!remoteSupport())
  2953. {
  2954. directKM->setLayoutTranslator(trans);
  2955. return;
  2956. }
  2957. UNIMPLEMENTED;
  2958. }
  2959. virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
  2960. {
  2961. if (!remoteSupport())
  2962. {
  2963. directKM->setSegmentMonitors(segmentMonitors);
  2964. return;
  2965. }
  2966. segs.swapWith(segmentMonitors);
  2967. }
  2968. virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
  2969. {
  2970. if (!remoteSupport())
  2971. {
  2972. directKM->deserializeSegmentMonitors(mb);
  2973. return;
  2974. }
  2975. segs.deserialize(mb);
  2976. }
  2977. virtual void finishSegmentMonitors() override
  2978. {
  2979. if (!remoteSupport())
  2980. {
  2981. directKM->finishSegmentMonitors();
  2982. return;
  2983. }
  2984. }
  2985. virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) override
  2986. {
  2987. if (!remoteSupport())
  2988. return directKM->lookupSkip(seek, seekGEOffset, seeklen);
  2989. UNIMPLEMENTED;
  2990. }
  2991. virtual void append(IKeySegmentMonitor *segment) override
  2992. {
  2993. if (!remoteSupport())
  2994. {
  2995. directKM->append(segment);
  2996. return;
  2997. }
  2998. segs.append(segment);
  2999. }
  3000. virtual unsigned ordinality() const override
  3001. {
  3002. if (!remoteSupport())
  3003. return directKM->ordinality();
  3004. return segs.ordinality();
  3005. }
  3006. virtual IKeySegmentMonitor *item(unsigned idx) const override
  3007. {
  3008. if (!remoteSupport())
  3009. return directKM->item(idx);
  3010. return segs.item(idx);
  3011. }
  3012. virtual void setMergeBarrier(unsigned offset) override
  3013. {
  3014. if (!remoteSupport())
  3015. {
  3016. directKM->setMergeBarrier(offset);
  3017. return;
  3018. }
  3019. UNIMPLEMENTED;
  3020. }
  3021. };
  3022. IKeyManager *createRemoteKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile)
  3023. {
  3024. return new CRemoteKeyManager(filename, crc, delayedFile);
  3025. }
  3026. IKeyManager *createKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile, bool allowRemote, bool forceRemote)
  3027. {
  3028. RemoteFilename rfn;
  3029. rfn.setRemotePath(filename);
  3030. if (forceRemote || (allowRemote && !rfn.isLocal()))
  3031. return createRemoteKeyManager(filename, crc, delayedFile);
  3032. else
  3033. {
  3034. Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
  3035. return createLocalKeyManager(keyIndex, nullptr);
  3036. }
  3037. }
  3038. //////////////
  3039. extern unsigned stopRemoteServer(ISocket * socket)
  3040. {
  3041. // used to have a global critical section here
  3042. if (!socket)
  3043. return 0;
  3044. MemoryBuffer sendbuf;
  3045. initSendBuffer(sendbuf);
  3046. sendbuf.append((RemoteFileCommandType)RFCstop);
  3047. sendbuf.append((unsigned)RFCstop);
  3048. MemoryBuffer replybuf;
  3049. unsigned errCode = RFSERR_InvalidCommand;
  3050. try {
  3051. sendBuffer(socket, sendbuf);
  3052. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  3053. replybuf.read(errCode);
  3054. }
  3055. catch (IJSOCK_Exception *e) {
  3056. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  3057. EXCLOG(e);
  3058. else
  3059. errCode = 0;
  3060. }
  3061. catch (IException *e) {
  3062. EXCLOG(e);
  3063. ::Release(e);
  3064. }
  3065. return errCode;
  3066. }
  3067. int setDafsTrace(ISocket * socket,byte flags)
  3068. {
  3069. if (!socket) {
  3070. byte ret = traceFlags;
  3071. traceFlags = flags;
  3072. return ret;
  3073. }
  3074. MemoryBuffer sendbuf;
  3075. initSendBuffer(sendbuf);
  3076. sendbuf.append((RemoteFileCommandType)RFCsettrace).append(flags);
  3077. MemoryBuffer replybuf;
  3078. try {
  3079. sendBuffer(socket, sendbuf);
  3080. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  3081. int retcode;
  3082. replybuf.read(retcode);
  3083. return retcode;
  3084. }
  3085. catch (IException *e) {
  3086. EXCLOG(e);
  3087. ::Release(e);
  3088. }
  3089. return -1;
  3090. }
  3091. int setDafsThrottleLimit(ISocket * socket, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg)
  3092. {
  3093. assertex(socket);
  3094. MemoryBuffer sendbuf;
  3095. initSendBuffer(sendbuf);
  3096. sendbuf.append((RemoteFileCommandType)RFCsetthrottle2).append((unsigned)throttleClass).append(throttleLimit);
  3097. sendbuf.append(throttleDelayMs).append(throttleCPULimit).append(queueLimit);
  3098. MemoryBuffer replybuf;
  3099. try {
  3100. sendBuffer(socket, sendbuf);
  3101. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  3102. int retcode;
  3103. replybuf.read(retcode);
  3104. if (retcode && errMsg && replybuf.remaining())
  3105. replybuf.read(*errMsg);
  3106. return retcode;
  3107. }
  3108. catch (IException *e) {
  3109. EXCLOG(e);
  3110. ::Release(e);
  3111. }
  3112. return -1;
  3113. }
  3114. int getDafsInfo(ISocket * socket, unsigned level, StringBuffer &retstr)
  3115. {
  3116. if (!socket) {
  3117. retstr.append(VERSTRING);
  3118. return 0;
  3119. }
  3120. MemoryBuffer sendbuf;
  3121. initSendBuffer(sendbuf);
  3122. sendbuf.append((RemoteFileCommandType)RFCgetinfo).append(level);
  3123. MemoryBuffer replybuf;
  3124. try {
  3125. sendBuffer(socket, sendbuf);
  3126. receiveBuffer(socket, replybuf, 1);
  3127. int retcode;
  3128. replybuf.read(retcode);
  3129. if (retcode==0) {
  3130. StringAttr s;
  3131. replybuf.read(s);
  3132. retstr.append(s);
  3133. }
  3134. return retcode;
  3135. }
  3136. catch (IException *e) {
  3137. EXCLOG(e);
  3138. ::Release(e);
  3139. }
  3140. return -1;
  3141. }
  3142. void remoteExtractBlobElements(const SocketEndpoint &ep,const char * prefix, const char * filename, ExtractedBlobArray & extracted)
  3143. {
  3144. Owned<CRemoteFile> file = new CRemoteFile (ep,filename);
  3145. file->remoteExtractBlobElements(prefix, extracted);
  3146. }
  3147. //====================================================================================================
  3148. class CAsyncCommandManager
  3149. {
  3150. class CAsyncJob: public CInterface
  3151. {
  3152. class cThread: public Thread
  3153. {
  3154. CAsyncJob *parent;
  3155. public:
  3156. cThread(CAsyncJob *_parent)
  3157. : Thread("CAsyncJob")
  3158. {
  3159. parent = _parent;
  3160. }
  3161. int run()
  3162. {
  3163. int ret = -1;
  3164. try {
  3165. ret = parent->run();
  3166. parent->setDone();
  3167. }
  3168. catch (IException *e)
  3169. {
  3170. parent->setException(e);
  3171. }
  3172. parent->signal();
  3173. return ret;
  3174. }
  3175. } *thread;
  3176. StringAttr uuid;
  3177. CAsyncCommandManager &parent;
  3178. public:
  3179. CAsyncJob(CAsyncCommandManager &_parent, const char *_uuid)
  3180. : uuid(_uuid), parent(_parent)
  3181. {
  3182. thread = new cThread(this);
  3183. hash = hashc((const byte *)uuid.get(),uuid.length(),~0U);
  3184. }
  3185. ~CAsyncJob()
  3186. {
  3187. thread->join();
  3188. thread->Release();
  3189. }
  3190. static void destroy(CAsyncJob *j)
  3191. {
  3192. j->Release();
  3193. }
  3194. void signal()
  3195. {
  3196. parent.signal();
  3197. }
  3198. void start()
  3199. {
  3200. parent.wait();
  3201. thread->start();
  3202. }
  3203. void join()
  3204. {
  3205. thread->join();
  3206. }
  3207. static unsigned getHash(const char *key)
  3208. {
  3209. return hashc((const byte *)key,strlen(key),~0U);
  3210. }
  3211. static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; }
  3212. unsigned hash;
  3213. bool eq(const char *key)
  3214. {
  3215. return stricmp(key,uuid.get())==0;
  3216. }
  3217. virtual int run()=0;
  3218. virtual void setException(IException *e)=0;
  3219. virtual void setDone()=0;
  3220. };
  3221. class CAsyncCopySection: public CAsyncJob
  3222. {
  3223. Owned<IFile> src;
  3224. RemoteFilename dst;
  3225. offset_t toOfs;
  3226. offset_t fromOfs;
  3227. offset_t size;
  3228. CFPmode mode; // not yet supported
  3229. CriticalSection sect;
  3230. offset_t done;
  3231. offset_t total;
  3232. Semaphore finished;
  3233. AsyncCommandStatus status;
  3234. Owned<IException> exc;
  3235. public:
  3236. CAsyncCopySection(CAsyncCommandManager &parent, const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size)
  3237. : CAsyncJob(parent, _uuid)
  3238. {
  3239. status = ACScontinue;
  3240. src.setown(createIFile(fromFile));
  3241. dst.setRemotePath(toFile);
  3242. toOfs = _toOfs;
  3243. fromOfs = _fromOfs;
  3244. size = _size;
  3245. mode = CFPcontinue;
  3246. done = 0;
  3247. total = (offset_t)-1;
  3248. }
  3249. AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout)
  3250. {
  3251. if (timeout&&finished.wait(timeout))
  3252. finished.signal(); // may need to call again
  3253. CriticalBlock block(sect);
  3254. if (exc)
  3255. throw exc.getClear();
  3256. _done = done;
  3257. _total = total;
  3258. return status;
  3259. }
  3260. int run()
  3261. {
  3262. class cProgress: implements ICopyFileProgress
  3263. {
  3264. CriticalSection &sect;
  3265. CFPmode &mode;
  3266. offset_t &done;
  3267. offset_t &total;
  3268. public:
  3269. cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode)
  3270. : sect(_sect), mode(_mode), done(_done), total(_total)
  3271. {
  3272. }
  3273. CFPmode onProgress(offset_t sizeDone, offset_t totalSize)
  3274. {
  3275. CriticalBlock block(sect);
  3276. done = sizeDone;
  3277. total = totalSize;
  3278. return mode;
  3279. }
  3280. } progress(sect,total,done,mode);
  3281. src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class
  3282. return 0;
  3283. }
  3284. void setException(IException *e)
  3285. {
  3286. EXCLOG(e,"CAsyncCommandManager::CAsyncJob");
  3287. CriticalBlock block(sect);
  3288. if (exc.get())
  3289. e->Release();
  3290. else
  3291. exc.setown(e);
  3292. status = ACSerror;
  3293. }
  3294. void setDone()
  3295. {
  3296. CriticalBlock block(sect);
  3297. finished.signal();
  3298. status = ACSdone;
  3299. }
  3300. };
  3301. CMinHashTable<CAsyncJob> jobtable;
  3302. CriticalSection sect;
  3303. Semaphore threadsem;
  3304. unsigned limit;
  3305. public:
  3306. CAsyncCommandManager(unsigned _limit) : limit(_limit)
  3307. {
  3308. if (limit) // 0 == unbound
  3309. threadsem.signal(limit); // max number of async jobs
  3310. }
  3311. void join()
  3312. {
  3313. CriticalBlock block(sect);
  3314. unsigned i;
  3315. CAsyncJob *j=jobtable.first(i);
  3316. while (j) {
  3317. j->join();
  3318. j=jobtable.next(i);
  3319. }
  3320. }
  3321. void signal()
  3322. {
  3323. if (limit)
  3324. threadsem.signal();
  3325. }
  3326. void wait()
  3327. {
  3328. if (limit)
  3329. threadsem.wait();
  3330. }
  3331. AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout)
  3332. {
  3333. // return 0 if continuing, 1 if done
  3334. CAsyncCopySection * job;
  3335. Linked<CAsyncJob> cjob;
  3336. {
  3337. CriticalBlock block(sect);
  3338. cjob.set(jobtable.find(uuid,false));
  3339. if (cjob) {
  3340. job = QUERYINTERFACE(cjob.get(),CAsyncCopySection);
  3341. if (!job) {
  3342. throw MakeStringException(-1,"Async job ID mismatch");
  3343. }
  3344. }
  3345. else {
  3346. job = new CAsyncCopySection(*this, uuid, fromFile, toFile, toOfs, fromOfs, size);
  3347. cjob.setown(job);
  3348. jobtable.add(cjob.getLink());
  3349. cjob->start();
  3350. }
  3351. }
  3352. AsyncCommandStatus ret = ACSerror;
  3353. Owned<IException> rete;
  3354. try {
  3355. ret = job->poll(done,total,timeout);
  3356. }
  3357. catch (IException * e) {
  3358. rete.setown(e);
  3359. }
  3360. if ((ret!=ACScontinue)||rete.get()) {
  3361. job->join();
  3362. CriticalBlock block(sect);
  3363. jobtable.remove(job);
  3364. if (rete.get())
  3365. throw rete.getClear();
  3366. }
  3367. return ret;
  3368. }
  3369. };
  3370. //====================================================================================================
  3371. inline void appendErr(MemoryBuffer &reply, unsigned e)
  3372. {
  3373. reply.append(e).append(getRFSERRText(e));
  3374. }
  3375. inline void appendErr2(MemoryBuffer &reply, unsigned e, unsigned v)
  3376. {
  3377. StringBuffer msg;
  3378. msg.append(getRFSERRText(e)).append(':').append(v);
  3379. reply.append(e).append(msg.str());
  3380. }
  3381. inline void appendErr3(MemoryBuffer &reply, unsigned e, int code, const char *errMsg)
  3382. {
  3383. StringBuffer msg;
  3384. msg.appendf("ERROR: %s(%d) '%s'", getRFSERRText(e), code, errMsg?errMsg:"");
  3385. reply.append(e);
  3386. reply.append(msg.str());
  3387. }
  3388. inline void appendCmdErr(MemoryBuffer &reply, RemoteFileCommandType e, int code, const char *errMsg)
  3389. {
  3390. StringBuffer msg;
  3391. msg.appendf("ERROR: %s(%d) '%s'", getRFCText(e), code, errMsg?errMsg:"");
  3392. // RFCOpenIO needs remapping to non-zero for client to know its an error
  3393. // perhaps we should use code here instead of e ?
  3394. unsigned err = e;
  3395. if (e == RFCopenIO)
  3396. err = RFSERR_OpenFailed;
  3397. reply.append(err);
  3398. reply.append(msg.str());
  3399. }
  3400. #define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
  3401. #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
  3402. #define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { ret = this->p(msg, reply, client); testSocketFlag = true; break; }
  3403. #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
  3404. #define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
  3405. #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { ret = this->p(msg, reply, client, stats); break; }
  3406. static unsigned ClientCount = 0;
  3407. static unsigned MaxClientCount = 0;
  3408. static CriticalSection ClientCountSect;
  3409. #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
  3410. class CClientStats : public CInterface
  3411. {
  3412. public:
  3413. CClientStats(const char *_client) : client(_client), count(0), bRead(0), bWritten(0) { }
  3414. const char *queryFindString() const { return client; }
  3415. inline void addRead(unsigned __int64 len)
  3416. {
  3417. bRead += len;
  3418. }
  3419. inline void addWrite(unsigned __int64 len)
  3420. {
  3421. bWritten += len;
  3422. }
  3423. void getStatus(StringBuffer & info) const
  3424. {
  3425. info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
  3426. client.get(), count, bRead.load(), bWritten.load()).newline();
  3427. }
  3428. StringAttr client;
  3429. unsigned __int64 count;
  3430. std::atomic<unsigned __int64> bRead;
  3431. std::atomic<unsigned __int64> bWritten;
  3432. };
  3433. class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
  3434. {
  3435. typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
  3436. CriticalSection crit;
  3437. unsigned cmdStats[RFCmax];
  3438. static int compareElement(void* const *ll, void* const *rr)
  3439. {
  3440. const CClientStats *l = (const CClientStats *) *ll;
  3441. const CClientStats *r = (const CClientStats *) *rr;
  3442. if (l->count == r->count)
  3443. return 0;
  3444. else if (l->count<r->count)
  3445. return 1;
  3446. else
  3447. return -1;
  3448. }
  3449. public:
  3450. CClientStatsTable()
  3451. {
  3452. memset(&cmdStats[0], 0, sizeof(cmdStats));
  3453. }
  3454. ~CClientStatsTable()
  3455. {
  3456. _releaseAll();
  3457. }
  3458. CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
  3459. {
  3460. CriticalBlock b(crit);
  3461. CClientStats *stats = PARENT::find(client);
  3462. if (!stats)
  3463. {
  3464. stats = new CClientStats(client);
  3465. PARENT::replace(*stats);
  3466. }
  3467. if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
  3468. cmdStats[cmd]++;
  3469. ++stats->count;
  3470. return LINK(stats);
  3471. }
  3472. StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
  3473. {
  3474. CriticalBlock b(crit);
  3475. unsigned __int64 totalCmds = 0;
  3476. for (unsigned c=0; c<RFCmax; c++)
  3477. totalCmds += cmdStats[c];
  3478. unsigned totalClients = PARENT::ordinality();
  3479. info.appendf("Commands processed = %" I64F "u, unique clients = %u", totalCmds, totalClients);
  3480. if (totalCmds)
  3481. {
  3482. info.append("Command stats:").newline();
  3483. for (unsigned c=0; c<RFCmax; c++)
  3484. {
  3485. unsigned __int64 count = cmdStats[c];
  3486. if (count)
  3487. info.append(getRFCText(c)).append(": ").append(count).newline();
  3488. }
  3489. }
  3490. if (totalClients)
  3491. {
  3492. SuperHashIteratorOf<CClientStats> iter(*this);
  3493. PointerArrayOf<CClientStats> elements;
  3494. ForEach(iter)
  3495. {
  3496. CClientStats &elem = iter.query();
  3497. elements.append(&elem);
  3498. }
  3499. elements.sort(&compareElement);
  3500. if (level < 10)
  3501. {
  3502. // list up to 10 clients ordered by # of commands processed
  3503. unsigned max=elements.ordinality();
  3504. if (max>10)
  3505. max = 10; // cap
  3506. info.append("Top 10 clients:").newline();
  3507. for (unsigned e=0; e<max; e++)
  3508. {
  3509. const CClientStats &element = *elements.item(e);
  3510. element.getStatus(info);
  3511. }
  3512. }
  3513. else // list all
  3514. {
  3515. info.append("All clients:").newline();
  3516. ForEachItemIn(e, elements)
  3517. {
  3518. const CClientStats &element = *elements.item(e);
  3519. element.getStatus(info);
  3520. }
  3521. }
  3522. }
  3523. return info;
  3524. }
  3525. void reset()
  3526. {
  3527. CriticalBlock b(crit);
  3528. memset(&cmdStats[0], 0, sizeof(cmdStats));
  3529. kill();
  3530. }
  3531. };
  3532. interface IRemoteActivity : extends IInterface
  3533. {
  3534. virtual const void *nextRow(size32_t &sz) = 0;
  3535. virtual unsigned __int64 queryProcessed() const = 0;
  3536. virtual IOutputMetaData *queryOutputMeta() const = 0;
  3537. virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
  3538. virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
  3539. virtual void restoreCursor(MemoryBuffer &src) = 0;
  3540. };
  3541. enum OpenFileFlag { of_null=0x0, of_key=0x01 };
  3542. struct OpenFileInfo
  3543. {
  3544. OpenFileInfo() { }
  3545. OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
  3546. OpenFileInfo(int _handle, IRemoteActivity *_activity, StringAttrItem *_filename) : handle(_handle), activity(_activity), filename(_filename) { }
  3547. Linked<IFileIO> fileIO;
  3548. Linked<IRemoteActivity> activity;
  3549. Linked<StringAttrItem> filename; // for debug
  3550. int handle = 0;
  3551. unsigned flags = 0;
  3552. };
  3553. static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName)
  3554. {
  3555. IPropertyTree *inputJson = actNode.queryPropTree(typePropName);
  3556. if (inputJson)
  3557. return createTypeInfoOutputMetaData(*inputJson);
  3558. else
  3559. {
  3560. StringBuffer binTypePropName(typePropName);
  3561. MemoryBuffer mb;
  3562. actNode.getPropBin(binTypePropName.append("Bin").str(), mb);
  3563. return createTypeInfoOutputMetaData(mb);
  3564. }
  3565. }
  3566. class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
  3567. {
  3568. StringAttr fileName;
  3569. Linked<IHThorDiskReadArg> helper;
  3570. MemoryBuffer resultBuffer;
  3571. MemoryBufferBuilder *outBuilder = nullptr;
  3572. CThorContiguousRowBuffer prefetchBuffer;
  3573. IArrayOf<IKeySegmentMonitor> segMonitors;
  3574. Owned<ISourceRowPrefetcher> prefetcher;
  3575. Owned<ISerialStream> inputStream;
  3576. Owned<IFileIO> iFileIO;
  3577. Linked<IOutputMetaData> outMeta;
  3578. unsigned __int64 chooseN = 0;
  3579. unsigned __int64 limit = 0;
  3580. unsigned __int64 processed = 0;
  3581. unsigned __int64 startPos = 0;
  3582. bool opened = false;
  3583. bool eofSeen = false;
  3584. bool cursorDirty = false;
  3585. bool canMatchAny = false;
  3586. bool needTransform = true;
  3587. void checkOpen()
  3588. {
  3589. if (opened)
  3590. {
  3591. if (!cursorDirty)
  3592. return;
  3593. if (prefetchBuffer.tell() != startPos)
  3594. {
  3595. inputStream->reset(startPos);
  3596. prefetchBuffer.clearStream();
  3597. prefetchBuffer.setStream(inputStream);
  3598. eofSeen = !canMatchAny;
  3599. }
  3600. cursorDirty = false;
  3601. return;
  3602. }
  3603. if (!canMatchAny)
  3604. eofSeen = true;
  3605. else
  3606. {
  3607. const char *fileName = helper->getFileName();
  3608. OwnedIFile iFile = createIFile(fileName);
  3609. #if 0
  3610. bool compressed = false; // isCompressedFile(iFileIO); // Should be passed with JSON
  3611. StringBuffer encryptionkey;
  3612. if (compressed)
  3613. {
  3614. Owned<IExpander> eexp;
  3615. if (encryptionkey.length()!=0)
  3616. eexp.setown(createAESExpander256((size32_t)encryptionkey.length(),encryptionkey.bufferBase()));
  3617. iFileIO.setown(createCompressedFileReader(iFile,eexp));
  3618. if(!iFileIO && !blockcompressed) //fall back to old decompression, unless dfs marked as new
  3619. {
  3620. iFileIO.setown(iFile->open(IFOread));
  3621. if(iFileIO)
  3622. rowcompressed = true;
  3623. }
  3624. }
  3625. else
  3626. #endif
  3627. iFileIO.setown(iFile->open(IFOread));
  3628. if (!iFileIO)
  3629. throw MakeStringException(0, "Failed to open: '%s'", fileName);
  3630. inputStream.setown(createFileSerialStream(iFileIO, startPos));
  3631. prefetchBuffer.setStream(inputStream);
  3632. prefetcher.setown(helper->queryDiskRecordSize()->createDiskPrefetcher(nullptr, 0));
  3633. outBuilder = new MemoryBufferBuilder(resultBuffer, helper->queryOutputMeta()->getMinRecordSize());
  3634. chooseN = helper->getChooseNLimit();
  3635. limit = helper->getRowLimit();
  3636. }
  3637. opened = true;
  3638. }
  3639. void close()
  3640. {
  3641. iFileIO.clear();
  3642. opened = false;
  3643. }
  3644. bool segMonitorsMatch(const void *row) { return true; }
  3645. public:
  3646. CRemoteDiskReadActivity(IHThorDiskReadArg &_helper)
  3647. : helper(&_helper), prefetchBuffer(nullptr)
  3648. {
  3649. outMeta.set(helper->queryOutputMeta());
  3650. canMatchAny = helper->canMatchAny();
  3651. }
  3652. ~CRemoteDiskReadActivity()
  3653. {
  3654. if (outBuilder)
  3655. delete outBuilder;
  3656. }
  3657. virtual const void *nextRow(size32_t &retSz) override
  3658. {
  3659. checkOpen();
  3660. if (needTransform)
  3661. {
  3662. while (!eofSeen && ((chooseN == 0) || (processed < chooseN)))
  3663. {
  3664. while (!prefetchBuffer.eos())
  3665. {
  3666. prefetcher->readAhead(prefetchBuffer);
  3667. const byte * next = prefetchBuffer.queryRow();
  3668. size32_t rowSz; // use local var instead of reference param for efficiency
  3669. if (segMonitorsMatch(next))
  3670. rowSz = helper->transform(*outBuilder, next);
  3671. else
  3672. rowSz = 0;
  3673. prefetchBuffer.finishedRow();
  3674. if (rowSz)
  3675. {
  3676. if (processed >=limit)
  3677. {
  3678. resultBuffer.clear();
  3679. helper->onLimitExceeded();
  3680. return nullptr;
  3681. }
  3682. retSz = rowSz;
  3683. processed++;
  3684. return resultBuffer.toByteArray();
  3685. }
  3686. }
  3687. eofSeen = true;
  3688. }
  3689. }
  3690. close();
  3691. retSz = 0;
  3692. return nullptr;
  3693. }
  3694. // IRemoteActivity impl.
  3695. virtual void serializeCursor(MemoryBuffer &tgt) const override
  3696. {
  3697. tgt.append(prefetchBuffer.tell());
  3698. tgt.append(processed);
  3699. }
  3700. virtual void restoreCursor(MemoryBuffer &src) override
  3701. {
  3702. cursorDirty = true;
  3703. src.read(startPos);
  3704. src.read(processed);
  3705. }
  3706. virtual unsigned __int64 queryProcessed() const override
  3707. {
  3708. return processed;
  3709. }
  3710. virtual IOutputMetaData *queryOutputMeta() const override
  3711. {
  3712. return outMeta;
  3713. }
  3714. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  3715. {
  3716. return out.appendf("diskread[%s]", helper->getFileName());
  3717. }
  3718. };
  3719. IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
  3720. {
  3721. const char *fileName = actNode.queryProp("fileName");
  3722. unsigned __int64 chooseN = actNode.getPropInt64("choosen", defaultFileStreamChooseN);
  3723. unsigned __int64 skipN = actNode.getPropInt64("skipN", defaultFileStreamSkipN);
  3724. unsigned __int64 rowLimit = actNode.getPropInt64("rowLimit", defaultFileStreamRowLimit);
  3725. Owned<IOutputMetaData> inMeta = getTypeInfoOutputMetaData(actNode, "input");
  3726. Owned<IOutputMetaData> outMeta = getTypeInfoOutputMetaData(actNode, "output");
  3727. Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, inMeta.getClear(), outMeta.getClear(), chooseN, skipN, rowLimit);
  3728. return new CRemoteDiskReadActivity(*helper);
  3729. }
  3730. IRemoteActivity *createRemoteActivity(IPropertyTree &actNode)
  3731. {
  3732. const char *kindStr = actNode.queryProp("kind");
  3733. ThorActivityKind kind = TAKnone;
  3734. if (strieq("diskread", kindStr))
  3735. kind = TAKdiskread;
  3736. Owned<IRemoteActivity> activity;
  3737. switch (kind)
  3738. {
  3739. case TAKdiskread:
  3740. {
  3741. activity.setown(createRemoteDiskRead(actNode));
  3742. break;
  3743. }
  3744. default:
  3745. throwUnexpected(); // for now
  3746. }
  3747. return activity.getClear();
  3748. }
  3749. IRemoteActivity *createOutputActivity(IPropertyTree &requestTree)
  3750. {
  3751. IPropertyTree *actNode = requestTree.queryPropTree("node");
  3752. assertex(actNode);
  3753. return createRemoteActivity(*actNode);
  3754. }
  3755. #define MAX_KEYDATA_SZ 0x10000
  3756. class CRemoteFileServer : implements IRemoteFileServer, public CInterface
  3757. {
  3758. class CThrottler;
  3759. class CRemoteClientHandler : implements ISocketSelectNotify, public CInterface
  3760. {
  3761. public:
  3762. CRemoteFileServer *parent;
  3763. Owned<ISocket> socket;
  3764. StringAttr peerName;
  3765. Owned<IAuthenticatedUser> user;
  3766. MemoryBuffer msg;
  3767. bool selecthandled;
  3768. size32_t left;
  3769. StructArrayOf<OpenFileInfo> openFiles;
  3770. Owned<IDirectoryIterator> opendir;
  3771. unsigned lasttick, lastInactiveTick;
  3772. atomic_t &globallasttick;
  3773. unsigned previdx; // for debug
  3774. IMPLEMENT_IINTERFACE;
  3775. CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,IAuthenticatedUser *_user,atomic_t &_globallasttick)
  3776. : socket(_socket), user(_user), globallasttick(_globallasttick)
  3777. {
  3778. previdx = (unsigned)-1;
  3779. StringBuffer peerBuf;
  3780. char name[256];
  3781. name[0] = 0;
  3782. int port = socket->peer_name(name,sizeof(name)-1);
  3783. if (port>=0)
  3784. {
  3785. peerBuf.append(name);
  3786. if (port)
  3787. peerBuf.append(':').append(port);
  3788. peerName.set(peerBuf);
  3789. }
  3790. else
  3791. {
  3792. /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
  3793. * May potentially be unavailable for other reasons also.
  3794. * Must be set, as used in client stats HT.
  3795. * If socket closed, the handler will start up but notice closed and quit
  3796. */
  3797. peerName.set("UNKNOWN PEER NAME");
  3798. }
  3799. {
  3800. CriticalBlock block(ClientCountSect);
  3801. if (++ClientCount>MaxClientCount)
  3802. MaxClientCount = ClientCount;
  3803. if (TF_TRACE_CLIENT_CONN)
  3804. {
  3805. StringBuffer s;
  3806. s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
  3807. s.append(peerName);
  3808. PROGLOG("%s", s.str());
  3809. }
  3810. }
  3811. parent = _parent;
  3812. left = 0;
  3813. msg.setEndian(__BIG_ENDIAN);
  3814. selecthandled = false;
  3815. touch();
  3816. }
  3817. ~CRemoteClientHandler()
  3818. {
  3819. {
  3820. CriticalBlock block(ClientCountSect);
  3821. ClientCount--;
  3822. if (TF_TRACE_CLIENT_CONN) {
  3823. PROGLOG("Disconnecting(%p) [%d,%d] ",this,ClientCount,MaxClientCount);
  3824. }
  3825. }
  3826. ISocket *sock = socket.getClear();
  3827. try {
  3828. sock->Release();
  3829. }
  3830. catch (IException *e) {
  3831. EXCLOG(e,"~CRemoteClientHandler");
  3832. e->Release();
  3833. }
  3834. }
  3835. bool notifySelected(ISocket *sock,unsigned selected)
  3836. {
  3837. if (TF_TRACE_FULL)
  3838. PROGLOG("notifySelected(%p)",this);
  3839. if (sock!=socket)
  3840. WARNLOG("notifySelected - invalid socket passed");
  3841. size32_t avail = (size32_t)socket->avail_read();
  3842. if (avail)
  3843. touch();
  3844. if (left==0)
  3845. {
  3846. try
  3847. {
  3848. left = avail?receiveBufferSize(socket):0;
  3849. }
  3850. catch (IException *e)
  3851. {
  3852. EXCLOG(e,"notifySelected(1)");
  3853. e->Release();
  3854. left = 0;
  3855. }
  3856. if (left)
  3857. {
  3858. avail = (size32_t)socket->avail_read();
  3859. try
  3860. {
  3861. msg.ensureCapacity(left);
  3862. }
  3863. catch (IException *e)
  3864. {
  3865. EXCLOG(e,"notifySelected(2)");
  3866. e->Release();
  3867. left = 0;
  3868. // if too big then corrupted packet so read avail to try and consume
  3869. char fbuf[1024];
  3870. while (avail)
  3871. {
  3872. size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail;
  3873. try
  3874. {
  3875. socket->read(fbuf, rd); // don't need timeout here
  3876. avail -= rd;
  3877. }
  3878. catch (IException *e)
  3879. {
  3880. EXCLOG(e,"notifySelected(2) flush");
  3881. e->Release();
  3882. break;
  3883. }
  3884. }
  3885. avail = 0;
  3886. left = 0;
  3887. }
  3888. }
  3889. }
  3890. size32_t toread = left>avail?avail:left;
  3891. if (toread)
  3892. {
  3893. try
  3894. {
  3895. socket->read(msg.reserve(toread), toread); // don't need timeout here
  3896. }
  3897. catch (IException *e)
  3898. {
  3899. EXCLOG(e,"notifySelected(3)");
  3900. e->Release();
  3901. toread = left;
  3902. msg.clear();
  3903. }
  3904. }
  3905. if (TF_TRACE_FULL)
  3906. PROGLOG("notifySelected %d,%d",toread,left);
  3907. if ((left!=0)&&(avail==0))
  3908. {
  3909. WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
  3910. toread = left;
  3911. msg.clear();
  3912. }
  3913. left -= toread;
  3914. if (left==0)
  3915. {
  3916. // DEBUG
  3917. parent->notify(this, msg); // consumes msg
  3918. }
  3919. return false;
  3920. }
  3921. void logPrevHandle()
  3922. {
  3923. if (previdx<openFiles.ordinality())
  3924. {
  3925. const OpenFileInfo &fileInfo = openFiles.item(previdx);
  3926. PROGLOG("Previous handle(%d): %s", fileInfo.handle, fileInfo.filename->text.get());
  3927. }
  3928. }
  3929. bool throttleCommand(MemoryBuffer &msg)
  3930. {
  3931. RemoteFileCommandType cmd = RFCunknown;
  3932. Owned<IException> e;
  3933. try
  3934. {
  3935. msg.read(cmd);
  3936. parent->throttleCommand(cmd, msg, this);
  3937. return true;
  3938. }
  3939. catch (IException *_e)
  3940. {
  3941. e.setown(_e);
  3942. }
  3943. /* processCommand() will handle most exception and replies,
  3944. * but if throttleCommand fails before it gets that far, this will handle
  3945. */
  3946. MemoryBuffer reply;
  3947. initSendBuffer(reply);
  3948. StringBuffer s;
  3949. e->errorMessage(s);
  3950. appendCmdErr(reply, cmd, e->errorCode(), s.str());
  3951. parent->appendError(cmd, this, cmd, reply);
  3952. sendBuffer(socket, reply);
  3953. return false;
  3954. }
  3955. void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
  3956. {
  3957. MemoryBuffer reply;
  3958. bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
  3959. sendBuffer(socket, reply, testSocketFlag);
  3960. }
  3961. bool immediateCommand() // returns false if socket closed or failure
  3962. {
  3963. MemoryBuffer msg;
  3964. msg.setEndian(__BIG_ENDIAN);
  3965. touch();
  3966. size32_t avail = (size32_t)socket->avail_read();
  3967. if (avail==0)
  3968. return false;
  3969. receiveBuffer(socket, msg, 5); // shouldn't timeout as data is available
  3970. touch();
  3971. if (msg.length()==0)
  3972. return false;
  3973. return throttleCommand(msg);
  3974. }
  3975. void process(MemoryBuffer &msg)
  3976. {
  3977. if (selecthandled)
  3978. throttleCommand(msg);
  3979. else
  3980. {
  3981. // msg only used/filled if process() has been triggered by notify()
  3982. while (parent->threadRunningCount()<=parent->targetActiveThreads) // if too many threads add to select handler
  3983. {
  3984. int w;
  3985. try
  3986. {
  3987. w = socket->wait_read(1000);
  3988. }
  3989. catch (IException *e)
  3990. {
  3991. EXCLOG(e, "CRemoteClientHandler::main wait_read error");
  3992. e->Release();
  3993. parent->onCloseSocket(this,1);
  3994. return;
  3995. }
  3996. if (w==0)
  3997. break;
  3998. if ((w<0)||!immediateCommand())
  3999. {
  4000. if (w<0)
  4001. WARNLOG("CRemoteClientHandler::main wait_read error");
  4002. parent->onCloseSocket(this,1);
  4003. return;
  4004. }
  4005. }
  4006. /* This is a bit confusing..
  4007. * The addClient below, adds this request to a selecthandler handled by another thread
  4008. * and passes ownership of 'this' (CRemoteClientHandler)
  4009. *
  4010. * When notified, the selecthandler will launch a new pool thread to handle the request
  4011. * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
  4012. *
  4013. * Either way, a thread pool slot is occupied when processing a request.
  4014. * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
  4015. * IOW, if there are lots of incoming clients that can't be serviced by the CThrottler limit,
  4016. * a large number of pool threads will build up after a while.
  4017. *
  4018. * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
  4019. * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
  4020. * in this additional layer of throttling..
  4021. */
  4022. selecthandled = true;
  4023. parent->addClient(this); // add to select handler
  4024. // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
  4025. }
  4026. }
  4027. bool timedOut()
  4028. {
  4029. return (msTick()-lasttick)>CLIENT_TIMEOUT;
  4030. }
  4031. bool inactiveTimedOut()
  4032. {
  4033. unsigned ms = msTick();
  4034. if ((ms-lastInactiveTick)>CLIENT_INACTIVEWARNING_TIMEOUT)
  4035. {
  4036. lastInactiveTick = ms;
  4037. return true;
  4038. }
  4039. return false;
  4040. }
  4041. void touch()
  4042. {
  4043. lastInactiveTick = lasttick = msTick();
  4044. atomic_set(&globallasttick,lasttick);
  4045. }
  4046. const char *queryPeerName()
  4047. {
  4048. return peerName;
  4049. }
  4050. bool getInfo(StringBuffer &str)
  4051. {
  4052. str.append("client(");
  4053. const char *name = queryPeerName();
  4054. bool ok;
  4055. if (name)
  4056. {
  4057. ok = true;
  4058. str.append(name);
  4059. }
  4060. else
  4061. ok = false;
  4062. unsigned ms = msTick();
  4063. str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
  4064. ForEachItemIn(i, openFiles)
  4065. {
  4066. const OpenFileInfo &fileInfo = openFiles.item(i);
  4067. str.appendf("\n %d: ", fileInfo.handle);
  4068. str.append(fileInfo.filename->text.get());
  4069. }
  4070. return ok;
  4071. }
  4072. };
  4073. class CThrottleQueueItem : public CSimpleInterface
  4074. {
  4075. public:
  4076. RemoteFileCommandType cmd;
  4077. Linked<CRemoteClientHandler> client;
  4078. MemoryBuffer msg;
  4079. CCycleTimer timer;
  4080. CThrottleQueueItem(RemoteFileCommandType _cmd, MemoryBuffer &_msg, CRemoteClientHandler *_client) : cmd(_cmd), client(_client)
  4081. {
  4082. msg.swapWith(_msg);
  4083. }
  4084. };
  4085. class CThrottler
  4086. {
  4087. Semaphore sem;
  4088. CriticalSection crit, configureCrit;
  4089. StringAttr title;
  4090. unsigned limit, delayMs, cpuThreshold, queueLimit;
  4091. unsigned disabledLimit;
  4092. unsigned __int64 totalThrottleDelay;
  4093. CCycleTimer totalThrottleDelayTimer;
  4094. QueueOf<CThrottleQueueItem, false> queue;
  4095. unsigned statsIntervalSecs;
  4096. public:
  4097. CThrottler(const char *_title) : title(_title)
  4098. {
  4099. totalThrottleDelay = 0;
  4100. limit = 0;
  4101. delayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
  4102. cpuThreshold = DEFAULT_STDCMD_THROTTLECPULIMIT;
  4103. disabledLimit = 0;
  4104. queueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
  4105. statsIntervalSecs = DEFAULT_STDCMD_THROTTLECPULIMIT;
  4106. }
  4107. ~CThrottler()
  4108. {
  4109. for (;;)
  4110. {
  4111. Owned<CThrottleQueueItem> item = queue.dequeue();
  4112. if (!item)
  4113. break;
  4114. }
  4115. }
  4116. unsigned queryLimit() const { return limit; }
  4117. unsigned queryDelayMs() const { return delayMs; };;
  4118. unsigned queryCpuThreshold() const { return cpuThreshold; }
  4119. unsigned queryQueueLimit() const { return queueLimit; }
  4120. StringBuffer &getInfoSummary(StringBuffer &info)
  4121. {
  4122. info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
  4123. unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
  4124. time_t simple;
  4125. time(&simple);
  4126. simple -= elapsedSecs;
  4127. CDateTime dt;
  4128. dt.set(simple);
  4129. StringBuffer dateStr;
  4130. dt.getTimeString(dateStr, true);
  4131. info.appendf("Throttler(%s): statistics since %s", title.get(), dateStr.str()).newline();
  4132. info.appendf("Total delay of %0.2f seconds", ((double)totalThrottleDelay)/1000).newline();
  4133. info.appendf("Requests currently queued: %u", queue.ordinality());
  4134. return info;
  4135. }
  4136. void getInfo(StringBuffer &info)
  4137. {
  4138. CriticalBlock b(crit);
  4139. getInfoSummary(info).newline();
  4140. }
  4141. void configure(unsigned _limit, unsigned _delayMs, unsigned _cpuThreshold, unsigned _queueLimit)
  4142. {
  4143. if (_limit > THROTTLE_MAX_LIMIT || _delayMs > THROTTLE_MAX_DELAYMS || _cpuThreshold > THROTTLE_MAX_CPUTHRESHOLD || _queueLimit > THROTTLE_MAX_QUEUELIMIT)
  4144. throw MakeStringException(0, "Throttler(%s), rejecting configure command: limit=%u (max permitted=%u), delayMs=%u (max permitted=%u), cpuThreshold=%u (max permitted=%u), queueLimit=%u (max permitted=%u)",
  4145. title.str(), _limit, THROTTLE_MAX_LIMIT, _delayMs, THROTTLE_MAX_DELAYMS, _cpuThreshold,
  4146. THROTTLE_MAX_CPUTHRESHOLD, _queueLimit, THROTTLE_MAX_QUEUELIMIT);
  4147. CriticalBlock b(configureCrit);
  4148. int delta = 0;
  4149. if (_limit)
  4150. {
  4151. if (disabledLimit) // if transitioning from disabled to some throttling
  4152. {
  4153. assertex(0 == limit);
  4154. delta = _limit - disabledLimit; // + or -
  4155. disabledLimit = 0;
  4156. }
  4157. else
  4158. delta = _limit - limit; // + or -
  4159. }
  4160. else if (0 == disabledLimit)
  4161. {
  4162. PROGLOG("Throttler(%s): disabled, previous limit: %u", title.get(), limit);
  4163. /* disabling - set limit immediately to let all new transaction through.
  4164. * NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
  4165. * Instead the existing 'limit' is kept in 'disabledLimit', so that if/when throttling is
  4166. * re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
  4167. */
  4168. disabledLimit = limit;
  4169. limit = 0;
  4170. }
  4171. if (delta > 0)
  4172. {
  4173. PROGLOG("Throttler(%s): Increasing limit from %u to %u", title.get(), limit, _limit);
  4174. sem.signal(delta);
  4175. limit = _limit;
  4176. // NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
  4177. }
  4178. else if (delta < 0)
  4179. {
  4180. PROGLOG("Throttler(%s): Reducing limit from %u to %u", title.get(), limit, _limit);
  4181. // NB: This is not expected to take long
  4182. CCycleTimer timer;
  4183. while (delta < 0)
  4184. {
  4185. if (sem.wait(1000))
  4186. ++delta;
  4187. else
  4188. PROGLOG("Throttler(%s): Waited %0.2f seconds so far for up to a maximum of %u (previous limit) transactions to complete, %u completed", title.get(), ((double)timer.elapsedMs())/1000, limit, -delta);
  4189. }
  4190. limit = _limit;
  4191. // NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
  4192. }
  4193. if (_delayMs != delayMs)
  4194. {
  4195. PROGLOG("Throttler(%s): New delayMs=%u, previous: %u", title.get(), _delayMs, delayMs);
  4196. delayMs = _delayMs;
  4197. }
  4198. if (_cpuThreshold != cpuThreshold)
  4199. {
  4200. PROGLOG("Throttler(%s): New cpuThreshold=%u, previous: %u", title.get(), _cpuThreshold, cpuThreshold);
  4201. cpuThreshold = _cpuThreshold;
  4202. }
  4203. if (((unsigned)-1) != _queueLimit && _queueLimit != queueLimit)
  4204. {
  4205. PROGLOG("Throttler(%s): New queueLimit=%u%s, previous: %u", title.get(), _queueLimit, 0==_queueLimit?"(disabled)":"", queueLimit);
  4206. queueLimit = _queueLimit;
  4207. }
  4208. }
  4209. void setStatsInterval(unsigned _statsIntervalSecs)
  4210. {
  4211. if (_statsIntervalSecs != statsIntervalSecs)
  4212. {
  4213. PROGLOG("Throttler(%s): New statsIntervalSecs=%u, previous: %u", title.get(), _statsIntervalSecs, statsIntervalSecs);
  4214. statsIntervalSecs = _statsIntervalSecs;
  4215. }
  4216. }
  4217. void take(RemoteFileCommandType cmd) // cmd for info. only
  4218. {
  4219. for (;;)
  4220. {
  4221. if (sem.wait(delayMs))
  4222. return;
  4223. PROGLOG("Throttler(%s): transaction delayed [cmd=%s]", title.get(), getRFCText(cmd));
  4224. }
  4225. }
  4226. void release()
  4227. {
  4228. sem.signal();
  4229. }
  4230. StringBuffer &getStats(StringBuffer &stats, bool reset)
  4231. {
  4232. CriticalBlock b(crit);
  4233. getInfoSummary(stats);
  4234. if (reset)
  4235. {
  4236. totalThrottleDelayTimer.reset();
  4237. totalThrottleDelay = 0;
  4238. }
  4239. return stats;
  4240. }
  4241. void addCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  4242. {
  4243. CCycleTimer timer;
  4244. Owned<IException> exception;
  4245. bool hadSem = true;
  4246. if (!sem.wait(delayMs))
  4247. {
  4248. CriticalBlock b(crit);
  4249. if (!sem.wait(0)) // check hasn't become available
  4250. {
  4251. unsigned cpu = getLatestCPUUsage();
  4252. if (getLatestCPUUsage()<cpuThreshold)
  4253. {
  4254. /* Allow to proceed, despite hitting throttle limit because CPU < threshold
  4255. * NB: The overall number of threads is still capped by the thread pool.
  4256. */
  4257. unsigned ms = timer.elapsedMs();
  4258. totalThrottleDelay += ms;
  4259. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
  4260. hadSem = false;
  4261. }
  4262. else
  4263. {
  4264. if (queueLimit && queue.ordinality()>=queueLimit)
  4265. throw MakeStringException(0, "Throttler(%s), the maxiumum number of items are queued (%u), rejecting new command[%s]", title.str(), queue.ordinality(), getRFCText(cmd));
  4266. queue.enqueue(new CThrottleQueueItem(cmd, msg, client)); // NB: takes over ownership of 'client' from running thread
  4267. PROGLOG("Throttler(%s): transaction delayed [cmd=%s], queuing (%u queueud), [client=%p, sock=%u]", title.get(), getRFCText(cmd), queue.ordinality(), client, client->socket->OShandle());
  4268. return;
  4269. }
  4270. }
  4271. }
  4272. /* Guarantee that sem is released.
  4273. * Should normally release on clean exit when queue is empty.
  4274. */
  4275. struct ReleaseSem
  4276. {
  4277. Semaphore *sem;
  4278. ReleaseSem(Semaphore *_sem) { sem = _sem; }
  4279. ~ReleaseSem() { if (sem) sem->signal(); }
  4280. } releaseSem(hadSem?&sem:NULL);
  4281. /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
  4282. * queued items. NB: other threads that are finishing will do also.
  4283. * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
  4284. * Throttle slot (semaphore) is only given back when no more to do.
  4285. */
  4286. Linked<CRemoteClientHandler> currentClient;
  4287. MemoryBuffer currentMsg;
  4288. unsigned ms;
  4289. for (;;)
  4290. {
  4291. RemoteFileCommandType currentCmd;
  4292. {
  4293. CriticalBlock b(crit);
  4294. Owned<CThrottleQueueItem> item = queue.dequeue();
  4295. if (item)
  4296. {
  4297. currentCmd = item->cmd;
  4298. currentClient.setown(item->client.getClear());
  4299. currentMsg.swapWith(item->msg);
  4300. ms = item->timer.elapsedMs();
  4301. }
  4302. else
  4303. {
  4304. if (NULL == client) // previously handled and queue empty
  4305. {
  4306. /* Commands are only queued if semaphore is exhaused (checked inside crit)
  4307. * so only signal the semaphore inside the crit, after checking if there are no queued items
  4308. */
  4309. if (hadSem)
  4310. {
  4311. releaseSem.sem = NULL;
  4312. sem.signal();
  4313. }
  4314. break;
  4315. }
  4316. currentCmd = cmd;
  4317. currentClient.set(client); // process current request after dealing with queue
  4318. currentMsg.swapWith(msg);
  4319. ms = timer.elapsedMs();
  4320. client = NULL;
  4321. }
  4322. }
  4323. if (ms >= 1000)
  4324. {
  4325. if (ms>delayMs)
  4326. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u seconds", title.get(), getRFCText(currentCmd), ms/1000);
  4327. }
  4328. {
  4329. CriticalBlock b(crit);
  4330. totalThrottleDelay += ms;
  4331. }
  4332. try
  4333. {
  4334. currentClient->processCommand(currentCmd, currentMsg, this);
  4335. }
  4336. catch (IException *e)
  4337. {
  4338. EXCLOG(e, "addCommand: processCommand failed");
  4339. e->Release();
  4340. }
  4341. }
  4342. }
  4343. };
  4344. // temporarily release a throttler slot
  4345. class CThrottleReleaseBlock
  4346. {
  4347. CThrottler &throttler;
  4348. RemoteFileCommandType cmd;
  4349. public:
  4350. CThrottleReleaseBlock(CThrottler &_throttler, RemoteFileCommandType _cmd) : throttler(_throttler), cmd(_cmd)
  4351. {
  4352. throttler.release();
  4353. }
  4354. ~CThrottleReleaseBlock()
  4355. {
  4356. throttler.take(cmd);
  4357. }
  4358. };
  4359. int lasthandle;
  4360. CriticalSection sect;
  4361. Owned<ISocket> acceptsock;
  4362. Owned<ISocket> securesock;
  4363. Owned<ISocketSelectHandler> selecthandler;
  4364. Owned<IThreadPool> threads; // for commands
  4365. bool stopping;
  4366. unsigned clientcounttick;
  4367. unsigned closedclients;
  4368. CAsyncCommandManager asyncCommandManager;
  4369. CThrottler stdCmdThrottler, slowCmdThrottler;
  4370. CClientStatsTable clientStatsTable;
  4371. atomic_t globallasttick;
  4372. unsigned targetActiveThreads;
  4373. int getNextHandle()
  4374. {
  4375. // called in sect critical block
  4376. for (;;) {
  4377. if (lasthandle==INT_MAX)
  4378. lasthandle = 1;
  4379. else
  4380. lasthandle++;
  4381. unsigned idx1;
  4382. unsigned idx2;
  4383. if (!findHandle(lasthandle,idx1,idx2))
  4384. return lasthandle;
  4385. }
  4386. }
  4387. bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx)
  4388. {
  4389. // called in sect critical block
  4390. clientidx = (unsigned)-1;
  4391. handleidx = (unsigned)-1;
  4392. ForEachItemIn(i,clients) {
  4393. CRemoteClientHandler &client = clients.item(i);
  4394. ForEachItemIn(j, client.openFiles)
  4395. {
  4396. if (client.openFiles.item(j).handle==handle)
  4397. {
  4398. handleidx = j;
  4399. clientidx = i;
  4400. return true;
  4401. }
  4402. }
  4403. }
  4404. return false;
  4405. }
  4406. unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply, bool &maxHit)
  4407. {
  4408. DelayedSizeMarker keyDataSzReturned(reply);
  4409. unsigned numRecs = 0;
  4410. maxHit = false;
  4411. unsigned pos = reply.length();
  4412. while (keyManager->lookup(true))
  4413. {
  4414. unsigned size = keyManager->queryRecordSize();
  4415. offset_t fpos;
  4416. const byte *result = keyManager->queryKeyBuffer(fpos);
  4417. reply.append(fpos);
  4418. reply.append(size);
  4419. reply.append(size, result);
  4420. ++numRecs;
  4421. if (maxRecs && (0 == --maxRecs))
  4422. {
  4423. maxHit = true;
  4424. break;
  4425. }
  4426. if (reply.length()-pos >= MAX_KEYDATA_SZ)
  4427. {
  4428. maxHit = true;
  4429. break;
  4430. }
  4431. }
  4432. keyDataSzReturned.write();
  4433. return numRecs;
  4434. }
  4435. IKeyManager *prepKey(int handle, const char *keyname, SegMonitorList *segs)
  4436. {
  4437. OpenFileInfo fileInfo;
  4438. if (!lookupFileIOHandle(handle, fileInfo, of_key))
  4439. {
  4440. VStringBuffer errStr("Error opening key file : %s", keyname);
  4441. throw createDafsException(RFSERR_InvalidFileIOHandle, errStr.str());
  4442. }
  4443. Owned<IKeyIndex> index = createKeyIndex(keyname, 0, *fileInfo.fileIO, false, false);
  4444. if (!index)
  4445. {
  4446. VStringBuffer errStr("Error opening key file : %s", keyname);
  4447. throw createDafsException(RFSERR_KeyIndexFailed, errStr.str());
  4448. }
  4449. Owned<IKeyManager> keyManager = createLocalKeyManager(index, nullptr);
  4450. if (segs)
  4451. {
  4452. keyManager->setSegmentMonitors(*segs);
  4453. keyManager->finishSegmentMonitors();
  4454. }
  4455. keyManager->reset();
  4456. return keyManager.getLink();
  4457. }
  4458. IKeyManager *prepKey(MemoryBuffer &mb, bool segmentMonitors)
  4459. {
  4460. int handle;
  4461. StringBuffer keyName;
  4462. size32_t keySize; // backward comp
  4463. mb.read(handle).read(keyName).read(keySize);
  4464. if (segmentMonitors)
  4465. {
  4466. SegMonitorList segs;
  4467. segs.deserialize(mb);
  4468. return prepKey(handle, keyName, &segs);
  4469. }
  4470. else
  4471. return prepKey(handle, keyName, nullptr);
  4472. }
  4473. class cCommandProcessor: public CInterface, implements IPooledThread
  4474. {
  4475. Owned<CRemoteClientHandler> client;
  4476. MemoryBuffer msg;
  4477. public:
  4478. IMPLEMENT_IINTERFACE;
  4479. struct cCommandProcessorParams
  4480. {
  4481. cCommandProcessorParams() { msg.setEndian(__BIG_ENDIAN); }
  4482. CRemoteClientHandler *client;
  4483. MemoryBuffer msg;
  4484. };
  4485. virtual void init(void *_params) override
  4486. {
  4487. cCommandProcessorParams &params = *(cCommandProcessorParams *)_params;
  4488. client.setown(params.client);
  4489. msg.swapWith(params.msg);
  4490. }
  4491. virtual void threadmain() override
  4492. {
  4493. // idea is that initially we process commands inline then pass over to select handler
  4494. try
  4495. {
  4496. client->process(msg);
  4497. }
  4498. catch (IException *e)
  4499. {
  4500. // suppress some errors
  4501. EXCLOG(e,"cCommandProcessor::threadmain");
  4502. e->Release();
  4503. }
  4504. try
  4505. {
  4506. client.clear();
  4507. }
  4508. catch (IException *e)
  4509. {
  4510. // suppress some more errors clearing client
  4511. EXCLOG(e,"cCommandProcessor::threadmain(2)");
  4512. e->Release();
  4513. }
  4514. }
  4515. virtual bool stop() override
  4516. {
  4517. return true;
  4518. }
  4519. virtual bool canReuse() const override
  4520. {
  4521. return false; // want to free owned socket
  4522. }
  4523. };
  4524. IArrayOf<CRemoteClientHandler> clients;
  4525. class cImpersonateBlock
  4526. {
  4527. CRemoteClientHandler &client;
  4528. public:
  4529. cImpersonateBlock(CRemoteClientHandler &_client)
  4530. : client(_client)
  4531. {
  4532. if (client.user.get()) {
  4533. if (TF_TRACE)
  4534. PROGLOG("Impersonate user: %s",client.user->username());
  4535. client.user->impersonate();
  4536. }
  4537. }
  4538. ~cImpersonateBlock()
  4539. {
  4540. if (client.user.get()) {
  4541. if (TF_TRACE)
  4542. PROGLOG("Stop impersonating user: %s",client.user->username());
  4543. client.user->revert();
  4544. }
  4545. }
  4546. };
  4547. #define IMPERSONATE_USER(client) cImpersonateBlock ublock(client)
  4548. public:
  4549. IMPLEMENT_IINTERFACE
  4550. CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy)
  4551. : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter")
  4552. {
  4553. lasthandle = 0;
  4554. selecthandler.setown(createSocketSelectHandler(NULL));
  4555. stdCmdThrottler.configure(DEFAULT_STDCMD_PARALLELREQUESTLIMIT, DEFAULT_STDCMD_THROTTLEDELAYMS, DEFAULT_STDCMD_THROTTLECPULIMIT, DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
  4556. slowCmdThrottler.configure(DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT, DEFAULT_SLOWCMD_THROTTLEDELAYMS, DEFAULT_SLOWCMD_THROTTLECPULIMIT, DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
  4557. unsigned targetMinThreads=maxThreads*20/100; // 20%
  4558. if (0 == targetMinThreads) targetMinThreads = 1;
  4559. targetActiveThreads=maxThreads*80/100; // 80%
  4560. if (0 == targetActiveThreads) targetActiveThreads = 1;
  4561. class CCommandFactory : public CSimpleInterfaceOf<IThreadFactory>
  4562. {
  4563. CRemoteFileServer &parent;
  4564. public:
  4565. CCommandFactory(CRemoteFileServer &_parent) : parent(_parent) { }
  4566. virtual IPooledThread *createNew()
  4567. {
  4568. return parent.createCommandProcessor();
  4569. }
  4570. };
  4571. Owned<IThreadFactory> factory = new CCommandFactory(*this); // NB: pool links factory, so takes ownership
  4572. threads.setown(createThreadPool("CRemoteFileServerPool", factory, NULL, maxThreads, maxThreadsDelayMs,
  4573. #ifdef __64BIT__
  4574. 0, // Unlimited stack size
  4575. #else
  4576. 0x10000,
  4577. #endif
  4578. INFINITE,targetMinThreads));
  4579. threads->setStartDelayTracing(60); // trace amount delayed every minute.
  4580. PROGLOG("CRemoteFileServer: maxThreads = %u, maxThreadsDelayMs = %u, maxAsyncCopy = %u", maxThreads, maxThreadsDelayMs, maxAsyncCopy);
  4581. stopping = false;
  4582. clientcounttick = msTick();
  4583. closedclients = 0;
  4584. atomic_set(&globallasttick,msTick());
  4585. }
  4586. ~CRemoteFileServer()
  4587. {
  4588. #ifdef _DEBUG
  4589. PROGLOG("Exiting CRemoteFileServer");
  4590. #endif
  4591. asyncCommandManager.join();
  4592. clients.kill();
  4593. #ifdef _DEBUG
  4594. PROGLOG("Exited CRemoteFileServer");
  4595. #endif
  4596. }
  4597. bool lookupFileIOHandle(int handle, OpenFileInfo &fileInfo, unsigned newFlags=0)
  4598. {
  4599. if (handle<=0)
  4600. return false;
  4601. CriticalBlock block(sect);
  4602. unsigned clientidx;
  4603. unsigned handleidx;
  4604. if (!findHandle(handle,clientidx,handleidx))
  4605. return false;
  4606. CRemoteClientHandler &client = clients.item(clientidx);
  4607. OpenFileInfo &openFileInfo = client.openFiles.element(handleidx); // NB: links members
  4608. openFileInfo.flags |= newFlags;
  4609. fileInfo = openFileInfo;
  4610. client.previdx = handleidx;
  4611. return true;
  4612. }
  4613. //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
  4614. // should throw a different exception
  4615. bool checkFileIOHandle(int handle, IFileIO *&fileio, bool del=false)
  4616. {
  4617. fileio = NULL;
  4618. if (handle<=0)
  4619. return false;
  4620. CriticalBlock block(sect);
  4621. unsigned clientidx;
  4622. unsigned handleidx;
  4623. if (findHandle(handle,clientidx,handleidx))
  4624. {
  4625. CRemoteClientHandler &client = clients.item(clientidx);
  4626. const OpenFileInfo &fileInfo = client.openFiles.item(handleidx);
  4627. if (del)
  4628. {
  4629. if (fileInfo.flags & of_key)
  4630. clearKeyStoreCacheEntry(fileInfo.fileIO);
  4631. client.openFiles.remove(handleidx);
  4632. client.previdx = (unsigned)-1;
  4633. }
  4634. else
  4635. {
  4636. fileio = client.openFiles.item(handleidx).fileIO;
  4637. client.previdx = handleidx;
  4638. }
  4639. return true;
  4640. }
  4641. return false;
  4642. }
  4643. bool checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
  4644. {
  4645. if (!checkFileIOHandle(handle, fileio, del))
  4646. {
  4647. appendErr(reply, RFSERR_InvalidFileIOHandle);
  4648. return false;
  4649. }
  4650. return true;
  4651. }
  4652. void onCloseSocket(CRemoteClientHandler *client, int which)
  4653. {
  4654. if (!client)
  4655. return;
  4656. CriticalBlock block(sect);
  4657. #ifdef _DEBUG
  4658. StringBuffer s(client->queryPeerName());
  4659. PROGLOG("onCloseSocket(%d) %s",which,s.str());
  4660. #endif
  4661. if (client->socket)
  4662. {
  4663. try
  4664. {
  4665. /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
  4666. * It has not been added to the selecthandler
  4667. * Harmless, but wasteful if so.
  4668. */
  4669. selecthandler->remove(client->socket);
  4670. }
  4671. catch (IException *e) {
  4672. EXCLOG(e,"CRemoteFileServer::onCloseSocket.1");
  4673. e->Release();
  4674. }
  4675. }
  4676. try {
  4677. clients.zap(*client);
  4678. }
  4679. catch (IException *e) {
  4680. EXCLOG(e,"CRemoteFileServer::onCloseSocket.2");
  4681. e->Release();
  4682. }
  4683. }
  4684. bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4685. {
  4686. IMPERSONATE_USER(client);
  4687. Owned<StringAttrItem> name = new StringAttrItem;
  4688. byte mode;
  4689. byte share;
  4690. msg.read(name->text).read(mode).read(share);
  4691. // also try to recv extra byte
  4692. byte extra = 0;
  4693. unsigned short sMode = IFUnone;
  4694. unsigned short cFlags = IFUnone;
  4695. if (msg.remaining() >= sizeof(byte))
  4696. {
  4697. msg.read(extra);
  4698. // and then try to recv extra sMode, cFlags (always sent together)
  4699. if (msg.remaining() >= (sizeof(sMode) + sizeof(cFlags)))
  4700. msg.read(sMode).read(cFlags);
  4701. }
  4702. IFEflags extraFlags = (IFEflags)extra;
  4703. // none => nocache for remote (hint)
  4704. // can revert to previous behavior with conf file setting "allow_pgcache_flush=false"
  4705. if (extraFlags == IFEnone)
  4706. extraFlags = IFEnocache;
  4707. Owned<IFile> file = createIFile(name->text);
  4708. switch ((compatIFSHmode)share) {
  4709. case compatIFSHnone:
  4710. file->setCreateFlags(S_IRUSR|S_IWUSR);
  4711. file->setShareMode(IFSHnone);
  4712. break;
  4713. case compatIFSHread:
  4714. file->setShareMode(IFSHread);
  4715. break;
  4716. case compatIFSHwrite:
  4717. file->setShareMode(IFSHfull);
  4718. break;
  4719. case compatIFSHexec:
  4720. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
  4721. break;
  4722. case compatIFSHall:
  4723. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive
  4724. file->setShareMode(IFSHfull);
  4725. break;
  4726. }
  4727. // use sMode, cFlags if sent
  4728. if (sMode != IFUnone && cFlags != IFUnone)
  4729. {
  4730. file->setCreateFlags(cFlags);
  4731. file->setShareMode((IFSHmode)sMode);
  4732. }
  4733. if (TF_TRACE_PRE_IO)
  4734. PROGLOG("before open file '%s', (%d,%d,%d,%d,0%o)",name->text.get(),(int)mode,(int)share,extraFlags,sMode,cFlags);
  4735. Owned<IFileIO> fileio = file->open((IFOmode)mode,extraFlags);
  4736. int handle;
  4737. if (fileio) {
  4738. CriticalBlock block(sect);
  4739. handle = getNextHandle();
  4740. client.previdx = client.openFiles.ordinality();
  4741. client.openFiles.append(OpenFileInfo(handle, fileio, name));
  4742. }
  4743. else
  4744. handle = 0;
  4745. reply.append(RFEnoerror);
  4746. reply.append(handle);
  4747. if (TF_TRACE)
  4748. PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle);
  4749. return true;
  4750. }
  4751. bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply)
  4752. {
  4753. int handle;
  4754. msg.read(handle);
  4755. IFileIO *fileio;
  4756. if (!checkFileIOHandle(reply, handle, fileio, true))
  4757. return false;
  4758. if (TF_TRACE)
  4759. PROGLOG("close file, handle = %d",handle);
  4760. reply.append(RFEnoerror);
  4761. return true;
  4762. }
  4763. bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  4764. {
  4765. int handle;
  4766. __int64 pos;
  4767. size32_t len;
  4768. msg.read(handle).read(pos).read(len);
  4769. IFileIO *fileio;
  4770. if (!checkFileIOHandle(reply, handle, fileio))
  4771. return false;
  4772. //arrange it so we read directly into the reply buffer...
  4773. unsigned posOfErr = reply.length();
  4774. reply.append((unsigned)RFEnoerror);
  4775. size32_t numRead;
  4776. unsigned posOfLength = reply.length();
  4777. if (TF_TRACE_PRE_IO)
  4778. PROGLOG("before read file, handle = %d, toread = %d",handle,len);
  4779. void * data;
  4780. {
  4781. reply.reserve(sizeof(numRead));
  4782. data = reply.reserve(len);
  4783. }
  4784. try {
  4785. numRead = fileio->read(pos,len,data);
  4786. }
  4787. catch (IException *e)
  4788. {
  4789. reply.setWritePos(posOfErr);
  4790. StringBuffer s;
  4791. e->errorMessage(s);
  4792. appendErr3(reply, RFSERR_ReadFailed, e->errorCode(), s.str());
  4793. e->Release();
  4794. return false;
  4795. }
  4796. stats.addRead(len);
  4797. if (TF_TRACE)
  4798. PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
  4799. {
  4800. reply.setLength(posOfLength + sizeof(numRead) + numRead);
  4801. reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead);
  4802. }
  4803. return true;
  4804. }
  4805. bool cmdReadFilteredIndex(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  4806. {
  4807. Owned<IKeyManager> keyManager = prepKey(msg, true);
  4808. bool first;
  4809. unsigned maxRecs;
  4810. msg.read(first).read(maxRecs);
  4811. if (!first)
  4812. keyManager->deserializeCursorPos(msg);
  4813. reply.append((unsigned)RFEnoerror);
  4814. DelayedMarker<unsigned> numReturned(reply);
  4815. bool maxHit;
  4816. unsigned numRecs = readKeyData(keyManager, maxRecs, reply, maxHit);
  4817. numReturned.write(numRecs);
  4818. DelayedSizeMarker keyCursorSzMarker(reply);
  4819. if (maxHit) // if maximum hit, either supplied maxRecs limit, or buffer limit, return cursor
  4820. keyManager->serializeCursorPos(reply);
  4821. keyCursorSzMarker.write();
  4822. return true;
  4823. }
  4824. bool cmdReadFilteredIndexCount(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  4825. {
  4826. Owned<IKeyManager> keyManager = prepKey(msg, true);
  4827. unsigned __int64 limit;
  4828. msg.read(limit);
  4829. unsigned __int64 count;
  4830. if (((unsigned __int64)-1) != limit)
  4831. count = keyManager->checkCount(limit);
  4832. else
  4833. count = keyManager->getCount();
  4834. reply.append((unsigned)RFEnoerror);
  4835. reply.append(count);
  4836. return true;
  4837. }
  4838. bool cmdReadFilteredIndexBlob(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  4839. {
  4840. Owned<IKeyManager> keyManager = prepKey(msg, false);
  4841. unsigned __int64 blobId;
  4842. msg.read(blobId);
  4843. size32_t blobSize;
  4844. const byte *blobData = keyManager->loadBlob(blobId, blobSize);
  4845. reply.append((unsigned)RFEnoerror);
  4846. reply.append(blobSize);
  4847. reply.append(blobSize, blobData);
  4848. keyManager->releaseBlobs();
  4849. return true;
  4850. }
  4851. bool cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
  4852. {
  4853. int handle;
  4854. msg.read(handle);
  4855. IFileIO *fileio;
  4856. if (!checkFileIOHandle(reply, handle, fileio))
  4857. return false;
  4858. __int64 size = fileio->size();
  4859. reply.append((unsigned)RFEnoerror).append(size);
  4860. if (TF_TRACE)
  4861. PROGLOG("size file, handle = %d, size = %" I64F "d",handle,size);
  4862. return true;
  4863. }
  4864. bool cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply)
  4865. {
  4866. int handle;
  4867. offset_t size;
  4868. msg.read(handle).read(size);
  4869. IFileIO *fileio;
  4870. if (TF_TRACE)
  4871. PROGLOG("set size file, handle = %d, size = %" I64F "d",handle,size);
  4872. if (!checkFileIOHandle(reply, handle, fileio))
  4873. return false;
  4874. fileio->setSize(size);
  4875. reply.append((unsigned)RFEnoerror);
  4876. return true;
  4877. }
  4878. bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  4879. {
  4880. int handle;
  4881. __int64 pos;
  4882. size32_t len;
  4883. msg.read(handle).read(pos).read(len);
  4884. IFileIO *fileio;
  4885. if (!checkFileIOHandle(reply, handle, fileio))
  4886. return false;
  4887. const byte *data = (const byte *)msg.readDirect(len);
  4888. if (TF_TRACE_PRE_IO)
  4889. PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
  4890. size32_t numWritten = fileio->write(pos,len,data);
  4891. stats.addWrite(numWritten);
  4892. if (TF_TRACE)
  4893. PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
  4894. reply.append((unsigned)RFEnoerror).append(numWritten);
  4895. return true;
  4896. }
  4897. bool cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4898. {
  4899. IMPERSONATE_USER(client);
  4900. StringAttr name;
  4901. msg.read(name);
  4902. if (TF_TRACE)
  4903. PROGLOG("exists, '%s'",name.get());
  4904. Owned<IFile> file=createIFile(name);
  4905. bool e = file->exists();
  4906. reply.append((unsigned)RFEnoerror).append(e);
  4907. return true;
  4908. }
  4909. bool cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  4910. {
  4911. IMPERSONATE_USER(client);
  4912. StringAttr name;
  4913. msg.read(name);
  4914. if (TF_TRACE)
  4915. PROGLOG("remove, '%s'",name.get());
  4916. Owned<IFile> file=createIFile(name);
  4917. bool e = file->remove();
  4918. reply.append((unsigned)RFEnoerror).append(e);
  4919. return true;
  4920. }
  4921. bool cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply)
  4922. {
  4923. if (TF_TRACE)
  4924. PROGLOG("getVer");
  4925. if (msg.getPos()+sizeof(unsigned)>msg.length())
  4926. reply.append((unsigned)RFEnoerror);
  4927. else
  4928. reply.append((unsigned)FILESRV_VERSION+0x10000);
  4929. reply.append(VERSTRING);
  4930. return true;
  4931. }
  4932. bool cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  4933. {
  4934. IMPERSONATE_USER(client);
  4935. StringAttr fromname;
  4936. msg.read(fromname);
  4937. StringAttr toname;
  4938. msg.read(toname);
  4939. if (TF_TRACE)
  4940. PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get());
  4941. Owned<IFile> file=createIFile(fromname);
  4942. file->rename(toname);
  4943. reply.append((unsigned)RFEnoerror);
  4944. return true;
  4945. }
  4946. bool cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  4947. {
  4948. IMPERSONATE_USER(client);
  4949. StringAttr fromname;
  4950. msg.read(fromname);
  4951. StringAttr toname;
  4952. msg.read(toname);
  4953. if (TF_TRACE)
  4954. PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get());
  4955. Owned<IFile> file=createIFile(fromname);
  4956. file->move(toname);
  4957. reply.append((unsigned)RFEnoerror);
  4958. return true;
  4959. }
  4960. bool cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4961. {
  4962. IMPERSONATE_USER(client);
  4963. StringAttr fromname;
  4964. msg.read(fromname);
  4965. StringAttr toname;
  4966. msg.read(toname);
  4967. if (TF_TRACE)
  4968. PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get());
  4969. copyFile(toname, fromname);
  4970. reply.append((unsigned)RFEnoerror);
  4971. return true;
  4972. }
  4973. bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  4974. {
  4975. IMPERSONATE_USER(client);
  4976. int handle;
  4977. __int64 pos;
  4978. __int64 len;
  4979. StringAttr srcname;
  4980. msg.read(handle).read(srcname).read(pos).read(len);
  4981. IFileIO *fileio;
  4982. if (!checkFileIOHandle(reply, handle, fileio))
  4983. return false;
  4984. Owned<IFile> file = createIFile(srcname.get());
  4985. __int64 written = fileio->appendFile(file,pos,len);
  4986. stats.addWrite(written);
  4987. if (TF_TRACE)
  4988. PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
  4989. reply.append((unsigned)RFEnoerror).append(written);
  4990. return true;
  4991. }
  4992. bool cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  4993. {
  4994. IMPERSONATE_USER(client);
  4995. StringAttr name;
  4996. msg.read(name);
  4997. if (TF_TRACE)
  4998. PROGLOG("isFile, '%s'",name.get());
  4999. Owned<IFile> file=createIFile(name);
  5000. unsigned ret = (unsigned)file->isFile();
  5001. reply.append((unsigned)RFEnoerror).append(ret);
  5002. return true;
  5003. }
  5004. bool cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5005. {
  5006. IMPERSONATE_USER(client);
  5007. StringAttr name;
  5008. msg.read(name);
  5009. if (TF_TRACE)
  5010. PROGLOG("isDir, '%s'",name.get());
  5011. Owned<IFile> file=createIFile(name);
  5012. unsigned ret = (unsigned)file->isDirectory();
  5013. reply.append((unsigned)RFEnoerror).append(ret);
  5014. return true;
  5015. }
  5016. bool cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5017. {
  5018. IMPERSONATE_USER(client);
  5019. StringAttr name;
  5020. msg.read(name);
  5021. if (TF_TRACE)
  5022. PROGLOG("isReadOnly, '%s'",name.get());
  5023. Owned<IFile> file=createIFile(name);
  5024. unsigned ret = (unsigned)file->isReadOnly();
  5025. reply.append((unsigned)RFEnoerror).append(ret);
  5026. return true;
  5027. }
  5028. bool cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5029. {
  5030. IMPERSONATE_USER(client);
  5031. StringAttr name;
  5032. bool set;
  5033. msg.read(name).read(set);
  5034. if (TF_TRACE)
  5035. PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set);
  5036. Owned<IFile> file=createIFile(name);
  5037. file->setReadOnly(set);
  5038. reply.append((unsigned)RFEnoerror);
  5039. return true;
  5040. }
  5041. bool cmdSetFilePerms(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5042. {
  5043. IMPERSONATE_USER(client);
  5044. StringAttr name;
  5045. unsigned fPerms;
  5046. msg.read(name).read(fPerms);
  5047. if (TF_TRACE)
  5048. PROGLOG("setFilePerms, '%s' 0%o",name.get(),fPerms);
  5049. Owned<IFile> file=createIFile(name);
  5050. file->setFilePermissions(fPerms);
  5051. reply.append((unsigned)RFEnoerror);
  5052. return true;
  5053. }
  5054. bool cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5055. {
  5056. IMPERSONATE_USER(client);
  5057. StringAttr name;
  5058. msg.read(name);
  5059. if (TF_TRACE)
  5060. PROGLOG("getTime, '%s'",name.get());
  5061. Owned<IFile> file=createIFile(name);
  5062. CDateTime createTime;
  5063. CDateTime modifiedTime;
  5064. CDateTime accessedTime;
  5065. bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime);
  5066. reply.append((unsigned)RFEnoerror).append(ret);
  5067. if (ret) {
  5068. createTime.serialize(reply);
  5069. modifiedTime.serialize(reply);
  5070. accessedTime.serialize(reply);
  5071. }
  5072. return true;
  5073. }
  5074. bool cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5075. {
  5076. IMPERSONATE_USER(client);
  5077. StringAttr name;
  5078. bool creategot;
  5079. CDateTime createTime;
  5080. bool modifiedgot;
  5081. CDateTime modifiedTime;
  5082. bool accessedgot;
  5083. CDateTime accessedTime;
  5084. msg.read(name);
  5085. msg.read(creategot);
  5086. if (creategot)
  5087. createTime.deserialize(msg);
  5088. msg.read(modifiedgot);
  5089. if (modifiedgot)
  5090. modifiedTime.deserialize(msg);
  5091. msg.read(accessedgot);
  5092. if (accessedgot)
  5093. accessedTime.deserialize(msg);
  5094. if (TF_TRACE)
  5095. PROGLOG("setTime, '%s'",name.get());
  5096. Owned<IFile> file=createIFile(name);
  5097. bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL);
  5098. reply.append((unsigned)RFEnoerror).append(ret);
  5099. return true;
  5100. }
  5101. bool cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5102. {
  5103. IMPERSONATE_USER(client);
  5104. StringAttr name;
  5105. msg.read(name);
  5106. if (TF_TRACE)
  5107. PROGLOG("CreateDir, '%s'",name.get());
  5108. Owned<IFile> dir=createIFile(name);
  5109. bool ret = dir->createDirectory();
  5110. reply.append((unsigned)RFEnoerror).append(ret);
  5111. return true;
  5112. }
  5113. bool cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5114. {
  5115. IMPERSONATE_USER(client);
  5116. StringAttr name;
  5117. StringAttr mask;
  5118. bool includedir;
  5119. bool sub;
  5120. byte stream = 0;
  5121. msg.read(name).read(mask).read(includedir).read(sub);
  5122. if (msg.remaining()>=sizeof(byte))
  5123. {
  5124. msg.read(stream);
  5125. if (stream==1)
  5126. client.opendir.clear();
  5127. }
  5128. if (TF_TRACE)
  5129. PROGLOG("GetDir, '%s', '%s', stream='%u'",name.get(),mask.get(),stream);
  5130. if (!stream && !containsFileWildcard(mask))
  5131. {
  5132. // if no streaming, and mask contains no wildcard, it is much more efficient to get the info without a directory iterator!
  5133. StringBuffer fullFilename(name);
  5134. addPathSepChar(fullFilename).append(mask);
  5135. Owned<IFile> iFile = createIFile(fullFilename);
  5136. if (!iFile->exists())
  5137. {
  5138. reply.append((unsigned)RFSERR_GetDirFailed);
  5139. return false;
  5140. }
  5141. else
  5142. {
  5143. reply.append((unsigned)RFEnoerror);
  5144. // NB: This must preserve same serialization format as CRemoteDirectoryIterator::serialize produces for 1 file.
  5145. byte b=1;
  5146. reply.append(b);
  5147. bool isDir = foundYes == iFile->isDirectory();
  5148. reply.append(isDir);
  5149. reply.append(isDir ? 0 : iFile->size());
  5150. CDateTime dt;
  5151. iFile->getTime(nullptr, &dt, nullptr);
  5152. dt.serialize(reply);
  5153. reply.append(iFile->queryFilename());
  5154. b = 0;
  5155. reply.append(b);
  5156. return true;
  5157. }
  5158. }
  5159. else
  5160. {
  5161. Owned<IFile> dir=createIFile(name);
  5162. Owned<IDirectoryIterator> iter;
  5163. if (stream>1)
  5164. iter.set(client.opendir);
  5165. else
  5166. {
  5167. iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
  5168. if (stream != 0)
  5169. client.opendir.set(iter);
  5170. }
  5171. if (!iter)
  5172. {
  5173. reply.append((unsigned)RFSERR_GetDirFailed);
  5174. return false;
  5175. }
  5176. reply.append((unsigned)RFEnoerror);
  5177. if (CRemoteDirectoryIterator::serialize(reply,iter,stream?0x100000:0,stream<2))
  5178. {
  5179. if (stream != 0)
  5180. client.opendir.clear();
  5181. }
  5182. else
  5183. {
  5184. bool cont=true;
  5185. reply.append(cont);
  5186. }
  5187. }
  5188. return true;
  5189. }
  5190. bool cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5191. {
  5192. IMPERSONATE_USER(client);
  5193. StringAttr name;
  5194. StringAttr mask;
  5195. bool includedir;
  5196. bool sub;
  5197. unsigned checkinterval;
  5198. unsigned timeout;
  5199. __int64 cancelid; // not yet used
  5200. msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid);
  5201. byte isprev;
  5202. msg.read(isprev);
  5203. Owned<IDirectoryIterator> prev;
  5204. if (isprev==1) {
  5205. SocketEndpoint ep;
  5206. CRemoteDirectoryIterator *di = new CRemoteDirectoryIterator(ep,name);
  5207. di->appendBuf(msg);
  5208. prev.setown(di);
  5209. }
  5210. if (TF_TRACE)
  5211. PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get());
  5212. Owned<IFile> dir=createIFile(name);
  5213. Owned<IDirectoryDifferenceIterator> iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout);
  5214. reply.append((unsigned)RFEnoerror);
  5215. byte state = (iter.get()==NULL)?0:1;
  5216. reply.append(state);
  5217. if (state==1)
  5218. CRemoteDirectoryIterator::serializeDiff(reply,iter);
  5219. return true;
  5220. }
  5221. bool cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5222. {
  5223. IMPERSONATE_USER(client);
  5224. StringAttr uuid;
  5225. StringAttr fromFile;
  5226. StringAttr toFile;
  5227. offset_t toOfs;
  5228. offset_t fromOfs;
  5229. offset_t size;
  5230. offset_t sizeDone=0;
  5231. offset_t totalSize=(offset_t)-1;
  5232. unsigned timeout;
  5233. msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout);
  5234. AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout);
  5235. reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize);
  5236. return true;
  5237. }
  5238. static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
  5239. {
  5240. unsigned start = msTick();
  5241. Owned<IFile> dstfile = createIFile(dstfn);
  5242. // the following is really to check the dest node is up and working (otherwise not much point in continuing!)
  5243. if (dstfile->exists())
  5244. PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename());
  5245. Owned<IFile> srcfile = createIFile(srcfn);
  5246. unsigned lastmin = 0;
  5247. if (!srcfn.queryIP().ipequals(dstfn.queryIP())) {
  5248. CriticalBlock block(treeCopyCrit);
  5249. for (;;) {
  5250. CDateTime dt;
  5251. offset_t sz;
  5252. try {
  5253. sz = srcfile->size();
  5254. if (sz==(offset_t)-1) {
  5255. if (TF_TRACE_TREE_COPY)
  5256. PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename());
  5257. break;
  5258. }
  5259. srcfile->getTime(NULL,&dt,NULL);
  5260. }
  5261. catch (IException *e) {
  5262. EXCLOG(e,"treeCopyFile(1)");
  5263. e->Release();
  5264. break;
  5265. }
  5266. Linked<CTreeCopyItem> tc;
  5267. unsigned now = msTick();
  5268. ForEachItemInRev(i1,treeCopyArray) {
  5269. CTreeCopyItem &item = treeCopyArray.item(i1);
  5270. // prune old entries (not strictly needed buf I think better)
  5271. if (now-item.lastused>TREECOPYPRUNETIME)
  5272. treeCopyArray.remove(i1);
  5273. else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) {
  5274. tc.set(&item);
  5275. item.lastused = now;
  5276. }
  5277. }
  5278. if (!tc.get()) {
  5279. if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE)
  5280. treeCopyArray.remove(0);
  5281. tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt));
  5282. treeCopyArray.append(*tc.getLink());
  5283. }
  5284. ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?)
  5285. if (!tc->busy->testSet(cand)) {
  5286. // check file accessible and matches
  5287. if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor
  5288. continue;
  5289. bool ok = true;
  5290. Owned<IFile> rmtfile = createIFile(tc->loc.item(cand));
  5291. if (cand) { // only need to check if remote
  5292. try {
  5293. if (rmtfile->size()!=sz)
  5294. ok = false;
  5295. else {
  5296. CDateTime fdt;
  5297. rmtfile->getTime(NULL,&fdt,NULL);
  5298. ok = fdt.equals(dt);
  5299. }
  5300. }
  5301. catch (IException *e) {
  5302. EXCLOG(e,"treeCopyFile(2)");
  5303. e->Release();
  5304. ok = false;
  5305. }
  5306. }
  5307. if (ok) { // if not ok leave 'busy'
  5308. // finally lets try and copy!
  5309. try {
  5310. if (TF_TRACE_TREE_COPY)
  5311. PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  5312. {
  5313. CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
  5314. rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  5315. }
  5316. if (TF_TRACE_TREE_COPY)
  5317. PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  5318. tc->busy->set(cand,false);
  5319. if (treeCopyWaiting)
  5320. treeCopySem.signal((treeCopyWaiting>1)?2:1);
  5321. // add to known locations
  5322. tc->busy->set(tc->loc.ordinality(),false); // prob already is clear
  5323. tc->loc.append(dstfn);
  5324. ip.ipset(tc->loc.item(cand).queryIP());
  5325. return;
  5326. }
  5327. catch (IException *e) {
  5328. if (cand==0) {
  5329. tc->busy->set(0,false); // don't leave busy
  5330. if (treeCopyWaiting)
  5331. treeCopySem.signal();
  5332. throw; // what more can we do!
  5333. }
  5334. EXCLOG(e,"treeCopyFile(3)");
  5335. e->Release();
  5336. }
  5337. }
  5338. }
  5339. }
  5340. // all locations busy
  5341. if (msTick()-start>TREECOPYTIMEOUT) {
  5342. WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename());
  5343. break;
  5344. }
  5345. treeCopyWaiting++; // note this isn't precise - just indication
  5346. {
  5347. CriticalUnblock unblock(treeCopyCrit);
  5348. if (throttler)
  5349. {
  5350. CThrottleReleaseBlock block(*throttler, RFCtreecopy);
  5351. treeCopySem.wait(TREECOPYPOLLTIME);
  5352. }
  5353. else
  5354. treeCopySem.wait(TREECOPYPOLLTIME);
  5355. }
  5356. treeCopyWaiting--;
  5357. if ((msTick()-start)/10*1000!=lastmin) {
  5358. lastmin = (msTick()-start)/10*1000;
  5359. PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  5360. }
  5361. }
  5362. }
  5363. else if (TF_TRACE_TREE_COPY)
  5364. PROGLOG("TREECOPY source on same node as destination");
  5365. if (TF_TRACE_TREE_COPY)
  5366. PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  5367. try {
  5368. GetHostIp(ip);
  5369. srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  5370. }
  5371. catch (IException *e) {
  5372. EXCLOG(e,"TREECOPY(done,fallback)");
  5373. throw;
  5374. }
  5375. if (TF_TRACE_TREE_COPY)
  5376. PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  5377. }
  5378. bool cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler, bool usetmp=false)
  5379. {
  5380. IMPERSONATE_USER(client);
  5381. RemoteFilename src;
  5382. src.deserialize(msg);
  5383. RemoteFilename dst;
  5384. dst.deserialize(msg);
  5385. StringAttr net;
  5386. StringAttr mask;
  5387. msg.read(net).read(mask);
  5388. IpAddress ip;
  5389. treeCopyFile(src,dst,net,mask,ip,usetmp,throttler);
  5390. unsigned status = 0;
  5391. reply.append((unsigned)RFEnoerror).append((unsigned)status);
  5392. ip.ipserialize(reply);
  5393. return true;
  5394. }
  5395. bool cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler)
  5396. {
  5397. return cmdTreeCopy(msg, reply, client, throttler, true);
  5398. }
  5399. bool cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5400. {
  5401. IMPERSONATE_USER(client);
  5402. StringAttr name;
  5403. msg.read(name);
  5404. if (TF_TRACE)
  5405. PROGLOG("getCRC, '%s'",name.get());
  5406. Owned<IFile> file=createIFile(name);
  5407. unsigned ret = file->getCRC();
  5408. reply.append((unsigned)RFEnoerror).append(ret);
  5409. return true;
  5410. }
  5411. bool cmdStop(MemoryBuffer &msg, MemoryBuffer &reply)
  5412. {
  5413. PROGLOG("Abort request received");
  5414. stopping = true;
  5415. if (acceptsock)
  5416. acceptsock->cancel_accept();
  5417. if (securesock)
  5418. securesock->cancel_accept();
  5419. reply.append((unsigned)RFEnoerror);
  5420. return false;
  5421. }
  5422. bool cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5423. {
  5424. StringAttr cmdLine;
  5425. msg.read(cmdLine);
  5426. // NB: legacy remoteExec used to simply pass error code and buffer back to caller.
  5427. VStringBuffer errMsg("Remote command execution no longer supported. Trying to execute cmdline=%s", cmdLine.get());
  5428. WARNLOG("%s", errMsg.str());
  5429. size32_t outSz = errMsg.length()+1; // reply with null terminated string
  5430. // reply with error code -1
  5431. reply.append((unsigned)-1).append((unsigned)0).append(outSz).append(outSz, errMsg.str());
  5432. return true;
  5433. }
  5434. bool cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply)
  5435. {
  5436. byte flags;
  5437. msg.read(flags);
  5438. int retcode=-1;
  5439. if (flags!=255) { // escape
  5440. retcode = traceFlags;
  5441. traceFlags = flags;
  5442. }
  5443. reply.append(retcode);
  5444. return true;
  5445. }
  5446. bool cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply)
  5447. {
  5448. unsigned level=1;
  5449. if (msg.remaining() >= sizeof(unsigned))
  5450. msg.read(level);
  5451. StringBuffer retstr;
  5452. getInfo(retstr, level);
  5453. reply.append(0).append(retstr.str());
  5454. return true;
  5455. }
  5456. bool cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply)
  5457. {
  5458. // TBD
  5459. StringBuffer retstr;
  5460. getInfo(retstr);
  5461. reply.append(0).append(retstr.str());
  5462. return true;
  5463. }
  5464. bool cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  5465. {
  5466. IMPERSONATE_USER(client);
  5467. StringAttr prefix;
  5468. StringAttr filename;
  5469. msg.read(prefix).read(filename);
  5470. RemoteFilename rfn;
  5471. rfn.setLocalPath(filename);
  5472. ExtractedBlobArray extracted;
  5473. extractBlobElements(prefix, rfn, extracted);
  5474. unsigned n = extracted.ordinality();
  5475. reply.append((unsigned)RFEnoerror).append(n);
  5476. for (unsigned i=0;i<n;i++)
  5477. extracted.item(i).serialize(reply);
  5478. return true;
  5479. }
  5480. bool cmdRedeploy(MemoryBuffer &msg, MemoryBuffer &reply)
  5481. {
  5482. return false; // TBD
  5483. }
  5484. bool cmdUnknown(MemoryBuffer & msg, MemoryBuffer & reply,RemoteFileCommandType cmd)
  5485. {
  5486. appendErr2(reply, RFSERR_InvalidCommand, cmd);
  5487. return false;
  5488. }
  5489. bool cmdUnlock(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  5490. {
  5491. // this is an attempt to authenticate when we haven't got authentication turned on
  5492. if (TF_TRACE_CLIENT_STATS)
  5493. {
  5494. StringBuffer s(client.queryPeerName());
  5495. PROGLOG("Connect from %s",s.str());
  5496. }
  5497. appendErr2(reply, RFSERR_InvalidCommand, RFCunlock);
  5498. return false;
  5499. }
  5500. bool cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  5501. {
  5502. unsigned replyPos = reply.length();
  5503. try
  5504. {
  5505. reply.append('J');
  5506. return cmdStreamRead(msg, reply, client);
  5507. }
  5508. catch (IException *)
  5509. {
  5510. reply.rewrite(replyPos);
  5511. reply.append('-');
  5512. throw;
  5513. }
  5514. }
  5515. bool cmdStreamRead(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  5516. {
  5517. // this is an attempt to authenticate when we haven't got authentication turned on
  5518. if (TF_TRACE_CLIENT_STATS)
  5519. {
  5520. StringBuffer s(client.queryPeerName());
  5521. PROGLOG("Connect from %s",s.str());
  5522. }
  5523. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(msg.length(), msg.toByteArray());
  5524. /* Example JSON request:
  5525. * {
  5526. * "format" : "xml",
  5527. * "cursor" : "1234",
  5528. * "node" : {
  5529. * "kind" : "diskread",
  5530. * "fileName": "examplefilename",
  5531. * "keyfilter" : "f1='1 '",
  5532. * "choosen" : "5",
  5533. * "cursor" : "12345", // cursor handle
  5534. * "input" : {
  5535. * "f1" : "string5",
  5536. * "f2" : "string5"
  5537. * },
  5538. * "output" : {
  5539. * "f2" : "string",
  5540. * "f1" : "real"
  5541. * }
  5542. * }
  5543. * }
  5544. *
  5545. */
  5546. const char *outputFmtStr = requestTree->queryProp("format");
  5547. int cursorHandle = requestTree->getPropInt("cursor");
  5548. Owned<IPropertyTree> responseTree; // Used if xml/json
  5549. OutputFormat outputFormat;
  5550. if (!outputFmtStr || strieq("xml", outputFmtStr))
  5551. outputFormat = outFmt_Xml;
  5552. else if (strieq("json", outputFmtStr))
  5553. outputFormat = outFmt_Json;
  5554. else
  5555. outputFormat = outFmt_Binary;
  5556. if (outFmt_Binary != outputFormat)
  5557. responseTree.setown(createPTree("Response"));
  5558. MemoryBuffer cursorMb;
  5559. if (requestTree->getPropBin("cursorBin", cursorMb))
  5560. cursorMb.setEndian(__BIG_ENDIAN);
  5561. Owned<IRemoteActivity> outputActivity;
  5562. OpenFileInfo fileInfo;
  5563. if (!cursorHandle)
  5564. {
  5565. // In future this may be passed the request and build a chain of activities and return sink.
  5566. outputActivity.setown(createOutputActivity(*requestTree));
  5567. StringBuffer requestStr("jsonrequest:");
  5568. outputActivity->getInfoStr(requestStr);
  5569. Owned<StringAttrItem> name = new StringAttrItem(requestStr);
  5570. CriticalBlock block(sect);
  5571. cursorHandle = getNextHandle();
  5572. client.previdx = client.openFiles.ordinality();
  5573. client.openFiles.append(OpenFileInfo(cursorHandle, outputActivity, name));
  5574. }
  5575. else if (!lookupFileIOHandle(cursorHandle, fileInfo))
  5576. cursorHandle = 0; // challenge response ..
  5577. else // known handle, continuation
  5578. outputActivity.set(fileInfo.activity);
  5579. if (outputActivity && cursorMb.length()) // use handle if one provided
  5580. outputActivity->restoreCursor(cursorMb);
  5581. if (outFmt_Binary != outputFormat)
  5582. responseTree->setPropInt("cursor", cursorHandle);
  5583. else
  5584. reply.append(cursorHandle);
  5585. if (cursorHandle)
  5586. {
  5587. IOutputMetaData *out = outputActivity->queryOutputMeta();
  5588. unsigned __int64 initProcessed = outputActivity->queryProcessed();
  5589. bool eoi=false;
  5590. if (outFmt_Binary == outputFormat)
  5591. {
  5592. DelayedSizeMarker dataLenMarker(reply); // data length
  5593. for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
  5594. {
  5595. size32_t rowSz;
  5596. const void *row = outputActivity->nextRow(rowSz);
  5597. if (!row)
  5598. {
  5599. eoi = true;
  5600. break;
  5601. }
  5602. reply.append(rowSz, row);
  5603. }
  5604. dataLenMarker.write();
  5605. }
  5606. else
  5607. {
  5608. CPropertyTreeWriter iptWriter;
  5609. for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
  5610. {
  5611. size32_t rowSz;
  5612. const void *row = outputActivity->nextRow(rowSz);
  5613. if (!row)
  5614. {
  5615. eoi = true;
  5616. break;
  5617. }
  5618. IPropertyTree *rowNode = responseTree->addPropTree("Row");
  5619. iptWriter.setRoot(*rowNode);
  5620. out->toXML((const byte *)row, iptWriter);
  5621. }
  5622. }
  5623. if (outFmt_Binary != outputFormat)
  5624. {
  5625. if (!eoi)
  5626. {
  5627. MemoryBuffer cursorMb;
  5628. cursorMb.setEndian(__BIG_ENDIAN);
  5629. outputActivity->serializeCursor(cursorMb);
  5630. responseTree->setPropBin("cursorBin", cursorMb.length(), cursorMb.toByteArray());
  5631. }
  5632. }
  5633. else
  5634. {
  5635. DelayedSizeMarker cursorLenMarker(reply); // cursor length
  5636. if (!eoi)
  5637. outputActivity->serializeCursor(reply);
  5638. cursorLenMarker.write();
  5639. }
  5640. }
  5641. switch (outputFormat)
  5642. {
  5643. case outFmt_Xml:
  5644. {
  5645. StringBuffer responseXmlStr;
  5646. toXML(responseTree, responseXmlStr);
  5647. reply.append(responseXmlStr.length(), responseXmlStr.str());
  5648. break;
  5649. }
  5650. case outFmt_Json:
  5651. {
  5652. StringBuffer responseJsonStr;
  5653. toJSON(responseTree, responseJsonStr);
  5654. reply.append(responseJsonStr.length(), responseJsonStr.str());
  5655. break;
  5656. }
  5657. default:
  5658. break;
  5659. }
  5660. return true;
  5661. }
  5662. // legacy version
  5663. bool cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
  5664. {
  5665. unsigned limit, delayMs, cpuThreshold;
  5666. msg.read(limit);
  5667. msg.read(delayMs);
  5668. msg.read(cpuThreshold);
  5669. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, (unsigned)-1);
  5670. reply.append((unsigned)RFEnoerror);
  5671. return true;
  5672. }
  5673. bool cmdSetThrottle2(MemoryBuffer & msg, MemoryBuffer & reply)
  5674. {
  5675. unsigned throttleClass, limit, delayMs, cpuThreshold, queueLimit;
  5676. msg.read(throttleClass);
  5677. msg.read(limit);
  5678. msg.read(delayMs);
  5679. msg.read(cpuThreshold);
  5680. msg.read(queueLimit);
  5681. setThrottle((ThrottleClass)throttleClass, limit, delayMs, cpuThreshold, queueLimit);
  5682. reply.append((unsigned)RFEnoerror);
  5683. return true;
  5684. }
  5685. void appendError(RemoteFileCommandType cmd, CRemoteClientHandler *client, unsigned ret, MemoryBuffer &reply)
  5686. {
  5687. if (reply.length()>=sizeof(unsigned)*2)
  5688. {
  5689. reply.reset();
  5690. unsigned z;
  5691. unsigned e;
  5692. reply.read(z).read(e);
  5693. StringBuffer err("ERR(");
  5694. err.append(e).append(") ");
  5695. if (client)
  5696. {
  5697. const char *peer = client->queryPeerName();
  5698. if (peer)
  5699. err.append(peer);
  5700. }
  5701. if (e&&(reply.getPos()<reply.length()))
  5702. {
  5703. StringAttr es;
  5704. reply.read(es);
  5705. err.append(" : ").append(es);
  5706. }
  5707. reply.reset();
  5708. if (cmd!=RFCunlock)
  5709. PROGLOG("%s",err.str()); // supress authentication logging
  5710. if (client)
  5711. client->logPrevHandle();
  5712. }
  5713. }
  5714. void throttleCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  5715. {
  5716. switch(cmd)
  5717. {
  5718. case RFCexec:
  5719. case RFCgetcrc:
  5720. case RFCcopy:
  5721. case RFCappend:
  5722. case RFCtreecopy:
  5723. case RFCtreecopytmp:
  5724. slowCmdThrottler.addCommand(cmd, msg, client);
  5725. return;
  5726. case RFCcloseIO:
  5727. case RFCopenIO:
  5728. case RFCread:
  5729. case RFCsize:
  5730. case RFCwrite:
  5731. case RFCexists:
  5732. case RFCremove:
  5733. case RFCrename:
  5734. case RFCgetver:
  5735. case RFCisfile:
  5736. case RFCisdirectory:
  5737. case RFCisreadonly:
  5738. case RFCsetreadonly:
  5739. case RFCsetfileperms:
  5740. case RFCreadfilteredindex:
  5741. case RFCreadfilteredindexcount:
  5742. case RFCreadfilteredindexblob:
  5743. case RFCgettime:
  5744. case RFCsettime:
  5745. case RFCcreatedir:
  5746. case RFCgetdir:
  5747. case RFCmonitordir:
  5748. case RFCstop:
  5749. case RFCextractblobelements:
  5750. case RFCredeploy:
  5751. case RFCmove:
  5752. case RFCsetsize:
  5753. case RFCsettrace:
  5754. case RFCgetinfo:
  5755. case RFCfirewall:
  5756. case RFCunlock:
  5757. case RFCStreamRead:
  5758. stdCmdThrottler.addCommand(cmd, msg, client);
  5759. return;
  5760. // NB: The following commands are still bound by the the thread pool
  5761. case RFCsetthrottle: // legacy version
  5762. case RFCsetthrottle2:
  5763. case RFCcopysection: // slightly odd, but has it's own limit
  5764. default:
  5765. {
  5766. client->processCommand(cmd, msg, NULL);
  5767. break;
  5768. }
  5769. }
  5770. }
  5771. bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
  5772. {
  5773. Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
  5774. bool ret = true;
  5775. bool testSocketFlag = false;
  5776. try
  5777. {
  5778. switch(cmd)
  5779. {
  5780. MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
  5781. MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
  5782. MAPCOMMANDSTATS(RFCreadfilteredindex, cmdReadFilteredIndex, *stats);
  5783. MAPCOMMANDSTATS(RFCreadfilteredindexcount, cmdReadFilteredIndexCount, *stats);
  5784. MAPCOMMANDSTATS(RFCreadfilteredindexblob, cmdReadFilteredIndexBlob, *stats);
  5785. MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
  5786. MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
  5787. MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
  5788. MAPCOMMAND(RFCsize, cmdSize);
  5789. MAPCOMMANDCLIENT(RFCexists, cmdExists, *client);
  5790. MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client);
  5791. MAPCOMMANDCLIENT(RFCrename, cmdRename, *client);
  5792. MAPCOMMAND(RFCgetver, cmdGetVer);
  5793. MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client);
  5794. MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client);
  5795. MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client);
  5796. MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client);
  5797. MAPCOMMANDCLIENT(RFCsetfileperms, cmdSetFilePerms, *client);
  5798. MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client);
  5799. MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client);
  5800. MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client);
  5801. MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client);
  5802. MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client);
  5803. MAPCOMMAND(RFCstop, cmdStop);
  5804. MAPCOMMANDCLIENT(RFCexec, cmdExec, *client);
  5805. MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client);
  5806. MAPCOMMAND(RFCredeploy, cmdRedeploy); // only Windows
  5807. MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client);
  5808. MAPCOMMANDCLIENT(RFCmove, cmdMove, *client);
  5809. MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client);
  5810. MAPCOMMAND(RFCsetsize, cmdSetSize);
  5811. MAPCOMMAND(RFCsettrace, cmdSetTrace);
  5812. MAPCOMMAND(RFCgetinfo, cmdGetInfo);
  5813. MAPCOMMAND(RFCfirewall, cmdFirewall);
  5814. MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
  5815. MAPCOMMANDCLIENTTESTSOCKET(RFCStreamRead, cmdStreamReadTestSocket, *client);
  5816. MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
  5817. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
  5818. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
  5819. MAPCOMMAND(RFCsetthrottle, cmdSetThrottle); // legacy version
  5820. MAPCOMMAND(RFCsetthrottle2, cmdSetThrottle2);
  5821. default:
  5822. ret = cmdUnknown(msg,reply,cmd);
  5823. break;
  5824. }
  5825. }
  5826. catch (IException *e)
  5827. {
  5828. ret = false;
  5829. StringBuffer s;
  5830. e->errorMessage(s);
  5831. appendCmdErr(reply, cmd, e->errorCode(), s.str());
  5832. e->Release();
  5833. }
  5834. if (!ret) // append error string
  5835. {
  5836. appendError(cmd, client, cmd, reply);
  5837. }
  5838. return testSocketFlag;
  5839. }
  5840. IPooledThread *createCommandProcessor()
  5841. {
  5842. return new cCommandProcessor();
  5843. }
  5844. void run(DAFSConnectCfg _connectMethod, SocketEndpoint &listenep, unsigned sslPort)
  5845. {
  5846. SocketEndpoint sslep(listenep);
  5847. if (sslPort)
  5848. sslep.port = sslPort;
  5849. else
  5850. sslep.port = securitySettings.daFileSrvSSLPort;
  5851. Owned<ISocket> acceptSocket, acceptSSLSocket;
  5852. if (_connectMethod != SSLOnly)
  5853. {
  5854. if (listenep.port == 0)
  5855. throw createDafsException(DAFSERR_serverinit_failed, "dafilesrv port not specified");
  5856. if (listenep.isNull())
  5857. acceptSocket.setown(ISocket::create(listenep.port));
  5858. else
  5859. {
  5860. StringBuffer ips;
  5861. listenep.getIpText(ips);
  5862. acceptSocket.setown(ISocket::create_ip(listenep.port,ips.str()));
  5863. }
  5864. }
  5865. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  5866. {
  5867. if (sslep.port == 0)
  5868. throw createDafsException(DAFSERR_serverinit_failed, "Secure dafilesrv port not specified");
  5869. if (_connectMethod == UnsecureFirst)
  5870. {
  5871. // don't fail, but warn - this allows for fast SSL client rejections
  5872. if (!securitySettings.certificate)
  5873. WARNLOG("SSL Certificate information not found in environment.conf, cannot accept SSL connections");
  5874. else if ( !checkFileExists(securitySettings.certificate) )
  5875. {
  5876. WARNLOG("SSL Certificate File not found in environment.conf, cannot accept SSL connections");
  5877. securitySettings.certificate = nullptr;
  5878. }
  5879. if (!securitySettings.privateKey)
  5880. WARNLOG("SSL Key information not found in environment.conf, cannot accept SSL connections");
  5881. else if ( !checkFileExists(securitySettings.privateKey) )
  5882. {
  5883. WARNLOG("SSL Key File not found in environment.conf, cannot accept SSL connections");
  5884. securitySettings.privateKey = nullptr;
  5885. }
  5886. }
  5887. else
  5888. {
  5889. if (!securitySettings.certificate)
  5890. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
  5891. if (!checkFileExists(securitySettings.certificate))
  5892. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
  5893. if (!securitySettings.privateKey)
  5894. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
  5895. if (!checkFileExists(securitySettings.privateKey))
  5896. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
  5897. }
  5898. if (sslep.isNull())
  5899. acceptSSLSocket.setown(ISocket::create(sslep.port));
  5900. else
  5901. {
  5902. StringBuffer ips;
  5903. sslep.getIpText(ips);
  5904. acceptSSLSocket.setown(ISocket::create_ip(sslep.port,ips.str()));
  5905. }
  5906. }
  5907. run(_connectMethod, acceptSocket.getClear(), acceptSSLSocket.getClear());
  5908. }
  5909. void run(DAFSConnectCfg _connectMethod, ISocket *regSocket, ISocket *secureSocket)
  5910. {
  5911. if (_connectMethod != SSLOnly)
  5912. {
  5913. if (regSocket)
  5914. acceptsock.setown(regSocket);
  5915. else
  5916. throw createDafsException(DAFSERR_serverinit_failed, "Invalid non-secure socket");
  5917. }
  5918. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  5919. {
  5920. if (secureSocket)
  5921. securesock.setown(secureSocket);
  5922. else
  5923. throw createDafsException(DAFSERR_serverinit_failed, "Invalid secure socket");
  5924. }
  5925. selecthandler->start();
  5926. for (;;)
  5927. {
  5928. Owned<ISocket> sock;
  5929. Owned<ISocket> sockSSL;
  5930. bool sockavail = false;
  5931. bool securesockavail = false;
  5932. if (_connectMethod == SSLNone)
  5933. sockavail = acceptsock->wait_read(1000*60*1)!=0;
  5934. else if (_connectMethod == SSLOnly)
  5935. securesockavail = securesock->wait_read(1000*60*1)!=0;
  5936. else
  5937. {
  5938. UnsignedArray readSocks;
  5939. UnsignedArray waitingSocks;
  5940. readSocks.append(acceptsock->OShandle());
  5941. readSocks.append(securesock->OShandle());
  5942. int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
  5943. if (numReady > 0)
  5944. {
  5945. for (int idx = 0; idx < numReady; idx++)
  5946. {
  5947. if (waitingSocks.item(idx) == acceptsock->OShandle())
  5948. sockavail = true;
  5949. else if (waitingSocks.item(idx) == securesock->OShandle())
  5950. securesockavail = true;
  5951. }
  5952. }
  5953. }
  5954. #if 0
  5955. if (!sockavail && !securesockavail)
  5956. {
  5957. JSocketStatistics stats;
  5958. getSocketStatistics(stats);
  5959. StringBuffer s;
  5960. getSocketStatisticsString(stats,s);
  5961. PROGLOG( "Socket statistics : \n%s\n",s.str());
  5962. }
  5963. #endif
  5964. if (stopping)
  5965. break;
  5966. if (sockavail || securesockavail)
  5967. {
  5968. if (sockavail)
  5969. {
  5970. try
  5971. {
  5972. sock.setown(acceptsock->accept(true));
  5973. if (!sock||stopping)
  5974. break;
  5975. #ifdef _DEBUG
  5976. SocketEndpoint eps;
  5977. sock->getPeerEndpoint(eps);
  5978. StringBuffer sb;
  5979. eps.getUrlStr(sb);
  5980. PROGLOG("Server accepting from %s", sb.str());
  5981. #endif
  5982. }
  5983. catch (IException *e)
  5984. {
  5985. EXCLOG(e,"CRemoteFileServer");
  5986. e->Release();
  5987. break;
  5988. }
  5989. }
  5990. if (securesockavail)
  5991. {
  5992. Owned<ISecureSocket> ssock;
  5993. try
  5994. {
  5995. sockSSL.setown(securesock->accept(true));
  5996. if (!sockSSL||stopping)
  5997. break;
  5998. if ( (_connectMethod == UnsecureFirst) && (!securitySettings.certificate || !securitySettings.privateKey) )
  5999. {
  6000. // for client secure_connect() to fail quickly ...
  6001. cleanupSocket(sockSSL);
  6002. sockSSL.clear();
  6003. securesockavail = false;
  6004. }
  6005. else
  6006. {
  6007. #ifdef _USE_OPENSSL
  6008. ssock.setown(createSecureSocket(sockSSL.getClear(), ServerSocket));
  6009. int status = ssock->secure_accept();
  6010. if (status < 0)
  6011. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection");
  6012. sockSSL.setown(ssock.getLink());
  6013. #else
  6014. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection: OpenSSL disabled in build");
  6015. #endif
  6016. #ifdef _DEBUG
  6017. SocketEndpoint eps;
  6018. sockSSL->getPeerEndpoint(eps);
  6019. StringBuffer sb;
  6020. eps.getUrlStr(sb);
  6021. PROGLOG("Server accepting SECURE from %s", sb.str());
  6022. #endif
  6023. }
  6024. }
  6025. catch (IJSOCK_Exception *e)
  6026. {
  6027. // accept failed ...
  6028. EXCLOG(e,"CRemoteFileServer (secure)");
  6029. e->Release();
  6030. break;
  6031. }
  6032. catch (IException *e) // IDAFS_Exception also ...
  6033. {
  6034. EXCLOG(e,"CRemoteFileServer1 (secure)");
  6035. e->Release();
  6036. cleanupSocket(sockSSL);
  6037. sockSSL.clear();
  6038. cleanupSocket(ssock);
  6039. ssock.clear();
  6040. securesockavail = false;
  6041. }
  6042. }
  6043. if (sockavail)
  6044. runClient(sock.getClear());
  6045. if (securesockavail)
  6046. runClient(sockSSL.getClear());
  6047. }
  6048. else
  6049. checkTimeout();
  6050. }
  6051. if (TF_TRACE_CLIENT_STATS)
  6052. PROGLOG("CRemoteFileServer:run exiting");
  6053. selecthandler->stop(true);
  6054. }
  6055. void processUnauthenticatedCommand(RemoteFileCommandType cmd, ISocket *socket, MemoryBuffer &msg)
  6056. {
  6057. // these are unauthenticated commands
  6058. if (cmd != RFCgetver)
  6059. cmd = RFCinvalid;
  6060. MemoryBuffer reply;
  6061. bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
  6062. sendBuffer(socket, reply, testSocketFlag);
  6063. }
  6064. bool checkAuthentication(ISocket *socket, IAuthenticatedUser *&ret)
  6065. {
  6066. ret = NULL;
  6067. if (!AuthenticationEnabled)
  6068. return true;
  6069. MemoryBuffer reqbuf;
  6070. MemoryBuffer reply;
  6071. MemoryBuffer encbuf; // because aesEncrypt clears input
  6072. initSendBuffer(reply);
  6073. receiveBuffer(socket, reqbuf, 1);
  6074. RemoteFileCommandType typ=0;
  6075. if (reqbuf.remaining()<sizeof(RemoteFileCommandType))
  6076. return false;
  6077. reqbuf.read(typ);
  6078. if (typ!=RFCunlock) {
  6079. processUnauthenticatedCommand(typ,socket,reqbuf);
  6080. return false;
  6081. }
  6082. if (reqbuf.remaining()<sizeof(OnceKey))
  6083. return false;
  6084. OnceKey oncekey;
  6085. reqbuf.read(sizeof(oncekey),&oncekey);
  6086. IpAddress ip;
  6087. socket->getPeerAddress(ip);
  6088. byte ipdata[16];
  6089. ip.getNetAddress(sizeof(ipdata),&ipdata);
  6090. mergeOnce(oncekey,sizeof(ipdata),&ipdata); // this is clients key
  6091. OnceKey mykey;
  6092. genOnce(mykey);
  6093. reply.append((unsigned)0); // errcode
  6094. aesEncrypt(&oncekey,sizeof(oncekey),&mykey,sizeof(oncekey),encbuf);
  6095. reply.append(encbuf.length()).append(encbuf);
  6096. sendBuffer(socket, reply); // send my oncekey
  6097. reqbuf.clear();
  6098. receiveBuffer(socket, reqbuf, 1);
  6099. if (reqbuf.remaining()>sizeof(RemoteFileCommandType)+sizeof(size32_t)) {
  6100. reqbuf.read(typ);
  6101. if (typ==RFCunlockreply) {
  6102. size32_t bs;
  6103. reqbuf.read(bs);
  6104. if (bs<=reqbuf.remaining()) {
  6105. MemoryBuffer userbuf;
  6106. aesDecrypt(&mykey,sizeof(mykey),reqbuf.readDirect(bs),bs,userbuf);
  6107. byte n;
  6108. userbuf.read(n);
  6109. if (n>=2) {
  6110. StringAttr user;
  6111. StringAttr password;
  6112. userbuf.read(user).read(password);
  6113. Owned<IAuthenticatedUser> iau = createAuthenticatedUser();
  6114. if (iau->login(user,password)) {
  6115. initSendBuffer(reply.clear());
  6116. reply.append((unsigned)0);
  6117. sendBuffer(socket, reply); // send OK
  6118. ret = iau;
  6119. return true;
  6120. }
  6121. }
  6122. }
  6123. }
  6124. }
  6125. reply.clear();
  6126. appendErr(reply, RFSERR_AuthenticateFailed);
  6127. sendBuffer(socket, reply); // send OK
  6128. return false;
  6129. }
  6130. void runClient(ISocket *sock)
  6131. {
  6132. cCommandProcessor::cCommandProcessorParams params;
  6133. IAuthenticatedUser *user=NULL;
  6134. bool authenticated = false;
  6135. try {
  6136. if (checkAuthentication(sock,user))
  6137. authenticated = true;
  6138. }
  6139. catch (IException *e) {
  6140. e->Release();
  6141. }
  6142. if (!authenticated) {
  6143. try {
  6144. sock->Release();
  6145. }
  6146. catch (IException *e) {
  6147. e->Release();
  6148. }
  6149. return;
  6150. }
  6151. params.client = new CRemoteClientHandler(this,sock,user,globallasttick);
  6152. {
  6153. CriticalBlock block(sect);
  6154. clients.append(*LINK(params.client));
  6155. }
  6156. // NB: This could be blocked, by thread pool limit
  6157. threads->start(&params);
  6158. }
  6159. void stop()
  6160. {
  6161. // stop accept loop
  6162. if (TF_TRACE_CLIENT_STATS)
  6163. PROGLOG("CRemoteFileServer::stop");
  6164. if (acceptsock)
  6165. acceptsock->cancel_accept();
  6166. if (securesock)
  6167. securesock->cancel_accept();
  6168. threads->stopAll();
  6169. threads->joinAll(true,60*1000);
  6170. }
  6171. bool notify(CRemoteClientHandler *_client, MemoryBuffer &msg)
  6172. {
  6173. Linked<CRemoteClientHandler> client;
  6174. client.set(_client);
  6175. if (TF_TRACE_FULL)
  6176. PROGLOG("notify %d", msg.length());
  6177. if (msg.length())
  6178. {
  6179. if (TF_TRACE_FULL)
  6180. PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
  6181. cCommandProcessor::cCommandProcessorParams params;
  6182. params.client = client.getClear();
  6183. params.msg.swapWith(msg);
  6184. /* This can block because the thread pool is full and therefore block the selecthandler
  6185. * This is akin to the main server blocking post accept() for the same reason.
  6186. */
  6187. threads->start(&params);
  6188. }
  6189. else
  6190. onCloseSocket(client,3); // removes owned handles
  6191. return false;
  6192. }
  6193. void addClient(CRemoteClientHandler *client)
  6194. {
  6195. if (client&&client->socket)
  6196. selecthandler->add(client->socket,SELECTMODE_READ,client);
  6197. }
  6198. void checkTimeout()
  6199. {
  6200. if (msTick()-clientcounttick>1000*60*60)
  6201. {
  6202. CriticalBlock block(ClientCountSect);
  6203. if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
  6204. PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
  6205. clientcounttick = msTick();
  6206. MaxClientCount = ClientCount;
  6207. if (closedclients)
  6208. {
  6209. if (TF_TRACE_CLIENT_STATS)
  6210. PROGLOG("Closed client count = %d",closedclients);
  6211. closedclients = 0;
  6212. }
  6213. }
  6214. CriticalBlock block(sect);
  6215. ForEachItemInRev(i,clients)
  6216. {
  6217. CRemoteClientHandler &client = clients.item(i);
  6218. if (client.timedOut())
  6219. {
  6220. StringBuffer s;
  6221. bool ok = client.getInfo(s); // will spot duff sockets
  6222. if (ok&&(client.openFiles.ordinality()!=0))
  6223. {
  6224. if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
  6225. WARNLOG("Inactive %s",s.str());
  6226. }
  6227. else
  6228. {
  6229. #ifndef _DEBUG
  6230. if (TF_TRACE_CLIENT_CONN)
  6231. #endif
  6232. PROGLOG("Timing out %s",s.str());
  6233. closedclients++;
  6234. onCloseSocket(&client,4); // removes owned handles
  6235. }
  6236. }
  6237. }
  6238. }
  6239. void getInfo(StringBuffer &info, unsigned level=1)
  6240. {
  6241. {
  6242. CriticalBlock block(ClientCountSect);
  6243. info.append(VERSTRING).append('\n');
  6244. info.appendf("Client count = %d\n",ClientCount);
  6245. info.appendf("Max client count = %d",MaxClientCount);
  6246. }
  6247. CriticalBlock block(sect);
  6248. ForEachItemIn(i,clients)
  6249. {
  6250. info.newline().append(i).append(": ");
  6251. clients.item(i).getInfo(info);
  6252. }
  6253. info.newline().appendf("Running threads: %u", threadRunningCount());
  6254. info.newline();
  6255. stdCmdThrottler.getInfo(info);
  6256. info.newline();
  6257. slowCmdThrottler.getInfo(info);
  6258. clientStatsTable.getInfo(info, level);
  6259. }
  6260. unsigned threadRunningCount()
  6261. {
  6262. if (!threads)
  6263. return 0;
  6264. return threads->runningCount();
  6265. }
  6266. unsigned idleTime()
  6267. {
  6268. unsigned t = (unsigned)atomic_read(&globallasttick);
  6269. return msTick()-t;
  6270. }
  6271. void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs, unsigned cpuThreshold, unsigned queueLimit)
  6272. {
  6273. switch (throttleClass)
  6274. {
  6275. case ThrottleStd:
  6276. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  6277. break;
  6278. case ThrottleSlow:
  6279. slowCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  6280. break;
  6281. default:
  6282. {
  6283. StringBuffer availableClasses("{ ");
  6284. for (unsigned c=0; c<ThrottleClassMax; c++)
  6285. {
  6286. availableClasses.append(c).append(" = ").append(getThrottleClassText((ThrottleClass)c));
  6287. if (c+1<ThrottleClassMax)
  6288. availableClasses.append(", ");
  6289. }
  6290. availableClasses.append(" }");
  6291. throw MakeStringException(0, "Unknown throttle class: %u, available classes are: %s", (unsigned)throttleClass, availableClasses.str());
  6292. }
  6293. }
  6294. }
  6295. StringBuffer &getStats(StringBuffer &stats, bool reset)
  6296. {
  6297. CriticalBlock block(sect);
  6298. stdCmdThrottler.getStats(stats, reset).newline();
  6299. slowCmdThrottler.getStats(stats, reset);
  6300. if (reset)
  6301. clientStatsTable.reset();
  6302. return stats;
  6303. }
  6304. };
  6305. IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy)
  6306. {
  6307. #if SIMULATE_PACKETLOSS
  6308. errorSimulationOn = false;
  6309. #endif
  6310. return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy);
  6311. }
  6312. #ifdef _USE_CPPUNIT
  6313. #include "unittests.hpp"
  6314. #include "rmtfile.hpp"
  6315. static unsigned serverPort = DAFILESRV_PORT+1; // do not use standard port, which if in a URL will be converted to local parth if IP is local
  6316. static StringBuffer basePath;
  6317. static Owned<CSimpleInterface> serverThread;
  6318. class RemoteFileTest : public CppUnit::TestFixture
  6319. {
  6320. CPPUNIT_TEST_SUITE(RemoteFileTest);
  6321. CPPUNIT_TEST(testStartServer);
  6322. CPPUNIT_TEST(testBasicFunctionality);
  6323. CPPUNIT_TEST(testCopy);
  6324. CPPUNIT_TEST(testOther);
  6325. CPPUNIT_TEST(testConfiguration);
  6326. CPPUNIT_TEST(testDirectoryMonitoring);
  6327. CPPUNIT_TEST(testFinish);
  6328. CPPUNIT_TEST_SUITE_END();
  6329. size32_t testLen = 1024;
  6330. protected:
  6331. void testStartServer()
  6332. {
  6333. Owned<ISocket> socket;
  6334. unsigned endPort = MP_END_PORT;
  6335. while (1)
  6336. {
  6337. try
  6338. {
  6339. socket.setown(ISocket::create(serverPort));
  6340. break;
  6341. }
  6342. catch (IJSOCK_Exception *e)
  6343. {
  6344. if (e->errorCode() != JSOCKERR_port_in_use)
  6345. {
  6346. StringBuffer eStr;
  6347. e->errorMessage(eStr);
  6348. e->Release();
  6349. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  6350. }
  6351. else if (serverPort == endPort)
  6352. {
  6353. e->Release();
  6354. CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
  6355. }
  6356. }
  6357. ++serverPort;
  6358. }
  6359. basePath.append("//");
  6360. SocketEndpoint ep(serverPort);
  6361. ep.getUrlStr(basePath);
  6362. char cpath[_MAX_DIR];
  6363. if (!GetCurrentDirectory(_MAX_DIR, cpath))
  6364. CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
  6365. else
  6366. basePath.append(cpath);
  6367. addPathSepChar(basePath);
  6368. PROGLOG("basePath = %s", basePath.str());
  6369. class CServerThread : public CSimpleInterface, implements IThreaded
  6370. {
  6371. CThreaded threaded;
  6372. Owned<CRemoteFileServer> server;
  6373. Linked<ISocket> socket;
  6374. public:
  6375. CServerThread(CRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
  6376. {
  6377. threaded.init(this);
  6378. }
  6379. ~CServerThread()
  6380. {
  6381. threaded.join();
  6382. }
  6383. // IThreaded
  6384. virtual void threadmain() override
  6385. {
  6386. DAFSConnectCfg sslCfg = SSLNone;
  6387. server->run(sslCfg, socket, nullptr);
  6388. }
  6389. };
  6390. enableDafsAuthentication(false);
  6391. Owned<IRemoteFileServer> server = createRemoteFileServer();
  6392. serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), CRemoteFileServer), socket.getClear()));
  6393. }
  6394. void testBasicFunctionality()
  6395. {
  6396. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  6397. // create file
  6398. Owned<IFile> iFile = createIFile(filePath);
  6399. CPPUNIT_ASSERT(iFile);
  6400. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  6401. CPPUNIT_ASSERT(iFileIO);
  6402. // write out 1k of random data and crc
  6403. MemoryBuffer mb;
  6404. char *buf = (char *)mb.reserveTruncate(testLen);
  6405. for (unsigned b=0; b<1024; b++)
  6406. buf[b] = getRandom()%256;
  6407. CRC32 crc;
  6408. crc.tally(testLen, buf);
  6409. unsigned writeCrc = crc.get();
  6410. size32_t sz = iFileIO->write(0, testLen, buf);
  6411. CPPUNIT_ASSERT(sz == testLen);
  6412. // close file
  6413. iFileIO.clear();
  6414. // validate remote crc
  6415. CPPUNIT_ASSERT(writeCrc == iFile->getCRC());
  6416. // exists
  6417. CPPUNIT_ASSERT(iFile->exists());
  6418. // validate size
  6419. CPPUNIT_ASSERT(iFile->size() == testLen);
  6420. // read back and validate read data's crc against written
  6421. iFileIO.setown(iFile->open(IFOread));
  6422. CPPUNIT_ASSERT(iFileIO);
  6423. sz = iFileIO->read(0, testLen, buf);
  6424. iFileIO.clear();
  6425. CPPUNIT_ASSERT(sz == testLen);
  6426. crc.reset();
  6427. crc.tally(testLen, buf);
  6428. CPPUNIT_ASSERT(writeCrc == crc.get());
  6429. }
  6430. void testCopy()
  6431. {
  6432. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  6433. Owned<IFile> iFile = createIFile(filePath);
  6434. // test file copy
  6435. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  6436. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  6437. iFile->copyTo(iFile1Copy);
  6438. // read back copy and validate read data's crc against written
  6439. Owned<IFileIO> iFileIO = iFile1Copy->open(IFOreadwrite); // open read/write for appendFile in next step.
  6440. CPPUNIT_ASSERT(iFileIO);
  6441. MemoryBuffer mb;
  6442. char *buf = (char *)mb.reserveTruncate(testLen);
  6443. size32_t sz = iFileIO->read(0, testLen, buf);
  6444. CPPUNIT_ASSERT(sz == testLen);
  6445. CRC32 crc;
  6446. crc.tally(testLen, buf);
  6447. CPPUNIT_ASSERT(iFile->getCRC() == crc.get());
  6448. // check appendFile functionality. NB after this "file1copy" should be 2*testLen
  6449. CPPUNIT_ASSERT(testLen == iFileIO->appendFile(iFile));
  6450. iFileIO.clear();
  6451. // validate new size
  6452. CPPUNIT_ASSERT(iFile1Copy->size() == 2 * testLen);
  6453. // setSize test, truncate copy to original size
  6454. iFileIO.setown(iFile1Copy->open(IFOreadwrite));
  6455. iFileIO->setSize(testLen);
  6456. // validate new size
  6457. CPPUNIT_ASSERT(iFile1Copy->size() == testLen);
  6458. }
  6459. void testOther()
  6460. {
  6461. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  6462. Owned<IFile> iFile = createIFile(filePath);
  6463. // rename
  6464. iFile->rename("file2");
  6465. // create a directory
  6466. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  6467. Owned<IFile> subDirIFile = createIFile(subDirPath);
  6468. subDirIFile->createDirectory();
  6469. // check isDirectory result
  6470. CPPUNIT_ASSERT(subDirIFile->isDirectory());
  6471. // move previous created and renamed file into new sub-directory
  6472. // ensure not present before move
  6473. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  6474. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  6475. iFile2->remove();
  6476. iFile->move(subDirFilePath);
  6477. // count sub-directory files with a wildcard
  6478. unsigned count=0;
  6479. Owned<IDirectoryIterator> iter = subDirIFile->directoryFiles("*2");
  6480. ForEach(*iter)
  6481. ++count;
  6482. CPPUNIT_ASSERT(1 == count);
  6483. // check isFile result
  6484. CPPUNIT_ASSERT(iFile2->isFile());
  6485. // validate isReadOnly before after setting
  6486. CPPUNIT_ASSERT(!iFile2->isReadOnly());
  6487. iFile2->setReadOnly(true);
  6488. CPPUNIT_ASSERT(iFile2->isReadOnly());
  6489. // get/set Time and validate result
  6490. CDateTime createTime, modifiedTime, accessedTime;
  6491. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  6492. CDateTime newModifiedTime = modifiedTime;
  6493. newModifiedTime.adjustTime(-86400); // -1 day
  6494. CPPUNIT_ASSERT(subDirIFile->setTime(&createTime, &newModifiedTime, &accessedTime));
  6495. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  6496. CPPUNIT_ASSERT(modifiedTime == newModifiedTime);
  6497. // test set file permissions
  6498. iFile2->setFilePermissions(0777);
  6499. }
  6500. void testConfiguration()
  6501. {
  6502. SocketEndpoint ep(serverPort); // test trace open connections
  6503. CPPUNIT_ASSERT(setDafileSvrTraceFlags(ep, 0x08));
  6504. StringBuffer infoStr;
  6505. CPPUNIT_ASSERT(RFEnoerror == getDafileSvrInfo(ep, 10, infoStr));
  6506. CPPUNIT_ASSERT(RFEnoerror == setDafileSvrThrottleLimit(ep, ThrottleStd, DEFAULT_STDCMD_PARALLELREQUESTLIMIT+1, DEFAULT_STDCMD_THROTTLEDELAYMS+1, DEFAULT_STDCMD_THROTTLECPULIMIT+1, DEFAULT_STDCMD_THROTTLEQUEUELIMIT+1));
  6507. }
  6508. void testDirectoryMonitoring()
  6509. {
  6510. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  6511. Owned<IFile> subDirIFile = createIFile(subDirPath);
  6512. subDirIFile->createDirectory();
  6513. VStringBuffer filePath("%s/%s", subDirPath.str(), "file1");
  6514. class CDelayedFileCreate : implements IThreaded
  6515. {
  6516. CThreaded threaded;
  6517. StringAttr filePath;
  6518. Semaphore doneSem;
  6519. public:
  6520. CDelayedFileCreate(const char *_filePath) : filePath(_filePath), threaded("CDelayedFileCreate")
  6521. {
  6522. threaded.init(this);
  6523. }
  6524. ~CDelayedFileCreate()
  6525. {
  6526. stop();
  6527. }
  6528. void stop()
  6529. {
  6530. doneSem.signal();
  6531. threaded.join();
  6532. }
  6533. // IThreaded impl.
  6534. virtual void threadmain() override
  6535. {
  6536. MilliSleep(1000); // give monitorDirectory a chance to be monitoring
  6537. // create file
  6538. Owned<IFile> iFile = createIFile(filePath);
  6539. CPPUNIT_ASSERT(iFile);
  6540. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  6541. CPPUNIT_ASSERT(iFileIO);
  6542. iFileIO.clear();
  6543. doneSem.wait(60 * 1000);
  6544. CPPUNIT_ASSERT(iFile->remove());
  6545. }
  6546. } delayedFileCreate(filePath);
  6547. Owned<IDirectoryDifferenceIterator> iter = subDirIFile->monitorDirectory(nullptr, nullptr, false, false, 2000, 60 * 1000);
  6548. ForEach(*iter)
  6549. {
  6550. StringBuffer fname;
  6551. iter->getName(fname);
  6552. PROGLOG("fname = %s", fname.str());
  6553. }
  6554. delayedFileCreate.stop();
  6555. }
  6556. void testFinish()
  6557. {
  6558. // clearup
  6559. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  6560. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  6561. CPPUNIT_ASSERT(iFile1Copy->remove());
  6562. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  6563. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  6564. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  6565. CPPUNIT_ASSERT(iFile2->remove());
  6566. Owned<IFile> subDirIFile = createIFile(subDirPath);
  6567. CPPUNIT_ASSERT(subDirIFile->remove());
  6568. SocketEndpoint ep(serverPort);
  6569. Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
  6570. CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
  6571. serverThread.clear();
  6572. }
  6573. };
  6574. CPPUNIT_TEST_SUITE_REGISTRATION( RemoteFileTest );
  6575. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RemoteFileTest, "RemoteFileTests" );
  6576. #endif // _USE_CPPUNIT