dafsserver.cpp 202 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390539153925393539453955396539753985399540054015402540354045405540654075408540954105411541254135414541554165417541854195420542154225423542454255426542754285429543054315432543354345435543654375438543954405441544254435444544554465447544854495450545154525453545454555456545754585459546054615462546354645465546654675468546954705471547254735474547554765477547854795480548154825483548454855486548754885489549054915492549354945495549654975498549955005501550255035504550555065507550855095510551155125513551455155516551755185519552055215522552355245525552655275528552955305531553255335534553555365537553855395540554155425543554455455546554755485549555055515552555355545555555655575558555955605561556255635564556555665567556855695570557155725573557455755576557755785579558055815582558355845585558655875588558955905591559255935594559555965597559855995600560156025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717571857195720572157225723572457255726572757285729573057315732573357345735573657375738573957405741574257435744
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // todo look at IRemoteFileServer stop
  14. #include "platform.h"
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jmisc.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "securesocket.hpp"
  24. #include "portlist.h"
  25. #include "jsocket.hpp"
  26. #include "jencrypt.hpp"
  27. #include "jlzw.hpp"
  28. #include "jset.hpp"
  29. #include "jhtree.hpp"
  30. #include "dadfs.hpp"
  31. #include "remoteerr.hpp"
  32. #include <atomic>
  33. #include <string>
  34. #include <unordered_map>
  35. #include "rtldynfield.hpp"
  36. #include "rtlds_imp.hpp"
  37. #include "rtlread_imp.hpp"
  38. #include "rtlrecord.hpp"
  39. #include "eclhelper_dyn.hpp"
  40. #include "rtlcommon.hpp"
  41. #include "rtlformat.hpp"
  42. #include "jflz.hpp"
  43. #include "digisign.hpp"
  44. #include "dafdesc.hpp"
  45. #include "thorcommon.hpp"
  46. #include "csvsplitter.hpp"
  47. #include "thorxmlread.hpp"
  48. #include "dafscommon.hpp"
  49. #include "rmtfile.hpp"
  50. #include "rmtclient_impl.hpp"
  51. #include "dafsserver.hpp"
  52. using namespace cryptohelper;
  53. #define SOCKET_CACHE_MAX 500
  54. #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger)
  55. #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed
  56. #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day
  57. static const unsigned __int64 defaultFileStreamChooseNLimit = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
  58. static const unsigned __int64 defaultFileStreamSkipN = 0;
  59. static const unsigned defaultDaFSReplyLimitKB = 1024; // 1MB
  60. enum OutputFormat:byte { outFmt_Binary, outFmt_Xml, outFmt_Json };
  61. ///////////////////////////
  62. static unsigned maxConnectTime = 0;
  63. static unsigned maxReceiveTime = 0;
  64. //Security and default port attributes
  65. static class _securitySettings
  66. {
  67. public:
  68. DAFSConnectCfg connectMethod;
  69. unsigned short daFileSrvPort;
  70. unsigned short daFileSrvSSLPort;
  71. const char * certificate;
  72. const char * privateKey;
  73. const char * passPhrase;
  74. _securitySettings()
  75. {
  76. queryDafsSecSettings(&connectMethod, &daFileSrvPort, &daFileSrvSSLPort, &certificate, &privateKey, &passPhrase);
  77. }
  78. } securitySettings;
  79. static CriticalSection secureContextCrit;
  80. static Owned<ISecureSocketContext> secureContextServer;
  81. static Owned<ISecureSocketContext> secureContextClient;
  82. #ifdef _USE_OPENSSL
  83. static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
  84. {
  85. {
  86. CriticalBlock b(secureContextCrit);
  87. if (type == ServerSocket)
  88. {
  89. if (!secureContextServer)
  90. secureContextServer.setown(createSecureSocketContextEx(securitySettings.certificate, securitySettings.privateKey, securitySettings.passPhrase, type));
  91. }
  92. else if (!secureContextClient)
  93. secureContextClient.setown(createSecureSocketContext(type));
  94. }
  95. int loglevel = SSLogNormal;
  96. #ifdef _DEBUG
  97. loglevel = SSLogMax;
  98. #endif
  99. if (type == ServerSocket)
  100. return secureContextServer->createSecureSocket(sock, loglevel);
  101. else
  102. return secureContextClient->createSecureSocket(sock, loglevel);
  103. }
  104. #else
  105. static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
  106. {
  107. throwUnexpected();
  108. }
  109. #endif
  110. struct sRFTM
  111. {
  112. CTimeMon *timemon;
  113. sRFTM(unsigned limit) { timemon = limit ? new CTimeMon(limit) : NULL; }
  114. ~sRFTM() { delete timemon; }
  115. };
  116. const char *remoteServerVersionString() { return DAFILESRV_VERSIONSTRING; }
  117. #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
  118. #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
  119. #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
  120. // (increased when waiting for large block)
  121. #define RFCText(cmd) #cmd
  122. const char *RFCStrings[] =
  123. {
  124. RFCText(RFCopenIO),
  125. RFCText(RFCcloseIO),
  126. RFCText(RFCread),
  127. RFCText(RFCwrite),
  128. RFCText(RFCsize),
  129. RFCText(RFCexists),
  130. RFCText(RFCremove),
  131. RFCText(RFCrename),
  132. RFCText(RFCgetver),
  133. RFCText(RFCisfile),
  134. RFCText(RFCisdirectory),
  135. RFCText(RFCisreadonly),
  136. RFCText(RFCsetreadonly),
  137. RFCText(RFCgettime),
  138. RFCText(RFCsettime),
  139. RFCText(RFCcreatedir),
  140. RFCText(RFCgetdir),
  141. RFCText(RFCstop),
  142. RFCText(RFCexec),
  143. RFCText(RFCdummy1),
  144. RFCText(RFCredeploy),
  145. RFCText(RFCgetcrc),
  146. RFCText(RFCmove),
  147. RFCText(RFCsetsize),
  148. RFCText(RFCextractblobelements),
  149. RFCText(RFCcopy),
  150. RFCText(RFCappend),
  151. RFCText(RFCmonitordir),
  152. RFCText(RFCsettrace),
  153. RFCText(RFCgetinfo),
  154. RFCText(RFCfirewall),
  155. RFCText(RFCunlock),
  156. RFCText(RFCunlockreply),
  157. RFCText(RFCinvalid),
  158. RFCText(RFCcopysection),
  159. RFCText(RFCtreecopy),
  160. RFCText(RFCtreecopytmp),
  161. RFCText(RFCsetthrottle), // legacy version
  162. RFCText(RFCsetthrottle2),
  163. RFCText(RFCsetfileperms),
  164. RFCText(RFCreadfilteredindex),
  165. RFCText(RFCreadfilteredcount),
  166. RFCText(RFCreadfilteredblob),
  167. RFCText(RFCStreamRead),
  168. RFCText(RFCStreamReadTestSocket),
  169. RFCText(RFCStreamGeneral),
  170. RFCText(RFCStreamReadJSON),
  171. RFCText(RFCmaxnormal),
  172. };
  173. static const char *getRFCText(RemoteFileCommandType cmd)
  174. {
  175. if (cmd==RFCStreamReadJSON)
  176. return "RFCStreamReadJSON";
  177. else
  178. {
  179. unsigned elems = sizeof(RFCStrings) / sizeof(RFCStrings[0]);
  180. if (cmd >= elems)
  181. return "RFCunknown";
  182. return RFCStrings[cmd];
  183. }
  184. }
  185. static const char *getRFSERRText(unsigned err)
  186. {
  187. switch (err)
  188. {
  189. case RFSERR_InvalidCommand:
  190. return "RFSERR_InvalidCommand";
  191. case RFSERR_NullFileIOHandle:
  192. return "RFSERR_NullFileIOHandle";
  193. case RFSERR_InvalidFileIOHandle:
  194. return "RFSERR_InvalidFileIOHandle";
  195. case RFSERR_TimeoutFileIOHandle:
  196. return "RFSERR_TimeoutFileIOHandle";
  197. case RFSERR_OpenFailed:
  198. return "RFSERR_OpenFailed";
  199. case RFSERR_ReadFailed:
  200. return "RFSERR_ReadFailed";
  201. case RFSERR_WriteFailed:
  202. return "RFSERR_WriteFailed";
  203. case RFSERR_RenameFailed:
  204. return "RFSERR_RenameFailed";
  205. case RFSERR_ExistsFailed:
  206. return "RFSERR_ExistsFailed";
  207. case RFSERR_RemoveFailed:
  208. return "RFSERR_RemoveFailed";
  209. case RFSERR_CloseFailed:
  210. return "RFSERR_CloseFailed";
  211. case RFSERR_IsFileFailed:
  212. return "RFSERR_IsFileFailed";
  213. case RFSERR_IsDirectoryFailed:
  214. return "RFSERR_IsDirectoryFailed";
  215. case RFSERR_IsReadOnlyFailed:
  216. return "RFSERR_IsReadOnlyFailed";
  217. case RFSERR_SetReadOnlyFailed:
  218. return "RFSERR_SetReadOnlyFailed";
  219. case RFSERR_GetTimeFailed:
  220. return "RFSERR_GetTimeFailed";
  221. case RFSERR_SetTimeFailed:
  222. return "RFSERR_SetTimeFailed";
  223. case RFSERR_CreateDirFailed:
  224. return "RFSERR_CreateDirFailed";
  225. case RFSERR_GetDirFailed:
  226. return "RFSERR_GetDirFailed";
  227. case RFSERR_GetCrcFailed:
  228. return "RFSERR_GetCrcFailed";
  229. case RFSERR_MoveFailed:
  230. return "RFSERR_MoveFailed";
  231. case RFSERR_ExtractBlobElementsFailed:
  232. return "RFSERR_ExtractBlobElementsFailed";
  233. case RFSERR_CopyFailed:
  234. return "RFSERR_CopyFailed";
  235. case RFSERR_AppendFailed:
  236. return "RFSERR_AppendFailed";
  237. case RFSERR_AuthenticateFailed:
  238. return "RFSERR_AuthenticateFailed";
  239. case RFSERR_CopySectionFailed:
  240. return "RFSERR_CopySectionFailed";
  241. case RFSERR_TreeCopyFailed:
  242. return "RFSERR_TreeCopyFailed";
  243. case RAERR_InvalidUsernamePassword:
  244. return "RAERR_InvalidUsernamePassword";
  245. case RFSERR_MasterSeemsToHaveDied:
  246. return "RFSERR_MasterSeemsToHaveDied";
  247. case RFSERR_TimeoutWaitSlave:
  248. return "RFSERR_TimeoutWaitSlave";
  249. case RFSERR_TimeoutWaitConnect:
  250. return "RFSERR_TimeoutWaitConnect";
  251. case RFSERR_TimeoutWaitMaster:
  252. return "RFSERR_TimeoutWaitMaster";
  253. case RFSERR_NoConnectSlave:
  254. return "RFSERR_NoConnectSlave";
  255. case RFSERR_NoConnectSlaveXY:
  256. return "RFSERR_NoConnectSlaveXY";
  257. case RFSERR_VersionMismatch:
  258. return "RFSERR_VersionMismatch";
  259. case RFSERR_SetThrottleFailed:
  260. return "RFSERR_SetThrottleFailed";
  261. case RFSERR_MaxQueueRequests:
  262. return "RFSERR_MaxQueueRequests";
  263. case RFSERR_KeyIndexFailed:
  264. return "RFSERR_MaxQueueRequests";
  265. case RFSERR_StreamReadFailed:
  266. return "RFSERR_StreamReadFailed";
  267. case RFSERR_InternalError:
  268. return "Internal Error";
  269. }
  270. return "RFSERR_Unknown";
  271. }
  272. #define ThrottleText(throttleClass) #throttleClass
  273. const char *ThrottleStrings[] =
  274. {
  275. ThrottleText(ThrottleStd),
  276. ThrottleText(ThrottleSlow),
  277. };
  278. // very high upper limits that configure can't exceed
  279. #define THROTTLE_MAX_LIMIT 1000000
  280. #define THROTTLE_MAX_DELAYMS 3600000
  281. #define THROTTLE_MAX_CPUTHRESHOLD 100
  282. #define THROTTLE_MAX_QUEUELIMIT 10000000
  283. static const char *getThrottleClassText(ThrottleClass throttleClass) { return ThrottleStrings[throttleClass]; }
  284. //---------------------------------------------------------------------------
  285. // TreeCopy
  286. #define TREECOPY_CACHE_SIZE 50
  287. struct CTreeCopyItem: public CInterface
  288. {
  289. StringAttr net;
  290. StringAttr mask;
  291. offset_t sz; // original size
  292. CDateTime dt; // original date
  293. RemoteFilenameArray loc; // locations for file - 0 is original
  294. Owned<IBitSet> busy;
  295. unsigned lastused;
  296. CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  297. : net(_net), mask(_mask)
  298. {
  299. loc.append(orig);
  300. dt.set(_dt);
  301. sz = _sz;
  302. busy.setown(createThreadSafeBitSet());
  303. lastused = msTick();
  304. }
  305. bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  306. {
  307. if (!orig.equals(loc.item(0)))
  308. return false;
  309. if (strcmp(_net,net)!=0)
  310. return false;
  311. if (strcmp(_mask,mask)!=0)
  312. return false;
  313. if (sz!=_sz)
  314. return false;
  315. return (dt.equals(_dt,false));
  316. }
  317. };
  318. static CIArrayOf<CTreeCopyItem> treeCopyArray;
  319. static CriticalSection treeCopyCrit;
  320. static unsigned treeCopyWaiting=0;
  321. static Semaphore treeCopySem;
  322. /////////////////////////
  323. //====================================================================================================
  324. class CAsyncCommandManager
  325. {
  326. class CAsyncJob: public CInterface
  327. {
  328. class cThread: public Thread
  329. {
  330. CAsyncJob *parent;
  331. public:
  332. cThread(CAsyncJob *_parent)
  333. : Thread("CAsyncJob")
  334. {
  335. parent = _parent;
  336. }
  337. int run()
  338. {
  339. int ret = -1;
  340. try {
  341. ret = parent->run();
  342. parent->setDone();
  343. }
  344. catch (IException *e)
  345. {
  346. parent->setException(e);
  347. }
  348. parent->signal();
  349. return ret;
  350. }
  351. } *thread;
  352. StringAttr uuid;
  353. CAsyncCommandManager &parent;
  354. public:
  355. CAsyncJob(CAsyncCommandManager &_parent, const char *_uuid)
  356. : uuid(_uuid), parent(_parent)
  357. {
  358. thread = new cThread(this);
  359. hash = hashc((const byte *)uuid.get(),uuid.length(),~0U);
  360. }
  361. ~CAsyncJob()
  362. {
  363. thread->join();
  364. thread->Release();
  365. }
  366. static void destroy(CAsyncJob *j)
  367. {
  368. j->Release();
  369. }
  370. void signal()
  371. {
  372. parent.signal();
  373. }
  374. void start()
  375. {
  376. parent.wait();
  377. thread->start();
  378. }
  379. void join()
  380. {
  381. thread->join();
  382. }
  383. static unsigned getHash(const char *key)
  384. {
  385. return hashc((const byte *)key,strlen(key),~0U);
  386. }
  387. static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; }
  388. unsigned hash;
  389. bool eq(const char *key)
  390. {
  391. return stricmp(key,uuid.get())==0;
  392. }
  393. virtual int run()=0;
  394. virtual void setException(IException *e)=0;
  395. virtual void setDone()=0;
  396. };
  397. class CAsyncCopySection: public CAsyncJob
  398. {
  399. Owned<IFile> src;
  400. RemoteFilename dst;
  401. offset_t toOfs;
  402. offset_t fromOfs;
  403. offset_t size;
  404. CFPmode mode; // not yet supported
  405. CriticalSection sect;
  406. offset_t done;
  407. offset_t total;
  408. Semaphore finished;
  409. AsyncCommandStatus status;
  410. Owned<IException> exc;
  411. public:
  412. CAsyncCopySection(CAsyncCommandManager &parent, const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size)
  413. : CAsyncJob(parent, _uuid)
  414. {
  415. status = ACScontinue;
  416. src.setown(createIFile(fromFile));
  417. dst.setRemotePath(toFile);
  418. toOfs = _toOfs;
  419. fromOfs = _fromOfs;
  420. size = _size;
  421. mode = CFPcontinue;
  422. done = 0;
  423. total = (offset_t)-1;
  424. }
  425. AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout)
  426. {
  427. if (timeout&&finished.wait(timeout))
  428. finished.signal(); // may need to call again
  429. CriticalBlock block(sect);
  430. if (exc)
  431. throw exc.getClear();
  432. _done = done;
  433. _total = total;
  434. return status;
  435. }
  436. int run()
  437. {
  438. class cProgress: implements ICopyFileProgress
  439. {
  440. CriticalSection &sect;
  441. CFPmode &mode;
  442. offset_t &done;
  443. offset_t &total;
  444. public:
  445. cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode)
  446. : sect(_sect), mode(_mode), done(_done), total(_total)
  447. {
  448. }
  449. CFPmode onProgress(offset_t sizeDone, offset_t totalSize)
  450. {
  451. CriticalBlock block(sect);
  452. done = sizeDone;
  453. total = totalSize;
  454. return mode;
  455. }
  456. } progress(sect,total,done,mode);
  457. src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class
  458. return 0;
  459. }
  460. void setException(IException *e)
  461. {
  462. EXCLOG(e,"CAsyncCommandManager::CAsyncJob");
  463. CriticalBlock block(sect);
  464. if (exc.get())
  465. e->Release();
  466. else
  467. exc.setown(e);
  468. status = ACSerror;
  469. }
  470. void setDone()
  471. {
  472. CriticalBlock block(sect);
  473. finished.signal();
  474. status = ACSdone;
  475. }
  476. };
  477. CMinHashTable<CAsyncJob> jobtable;
  478. CriticalSection sect;
  479. Semaphore threadsem;
  480. unsigned limit;
  481. public:
  482. CAsyncCommandManager(unsigned _limit) : limit(_limit)
  483. {
  484. if (limit) // 0 == unbound
  485. threadsem.signal(limit); // max number of async jobs
  486. }
  487. void join()
  488. {
  489. CriticalBlock block(sect);
  490. unsigned i;
  491. CAsyncJob *j=jobtable.first(i);
  492. while (j) {
  493. j->join();
  494. j=jobtable.next(i);
  495. }
  496. }
  497. void signal()
  498. {
  499. if (limit)
  500. threadsem.signal();
  501. }
  502. void wait()
  503. {
  504. if (limit)
  505. threadsem.wait();
  506. }
  507. AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout)
  508. {
  509. // return 0 if continuing, 1 if done
  510. CAsyncCopySection * job;
  511. Linked<CAsyncJob> cjob;
  512. {
  513. CriticalBlock block(sect);
  514. cjob.set(jobtable.find(uuid,false));
  515. if (cjob) {
  516. job = QUERYINTERFACE(cjob.get(),CAsyncCopySection);
  517. if (!job) {
  518. throw MakeStringException(-1,"Async job ID mismatch");
  519. }
  520. }
  521. else {
  522. job = new CAsyncCopySection(*this, uuid, fromFile, toFile, toOfs, fromOfs, size);
  523. cjob.setown(job);
  524. jobtable.add(cjob.getLink());
  525. cjob->start();
  526. }
  527. }
  528. AsyncCommandStatus ret = ACSerror;
  529. Owned<IException> rete;
  530. try {
  531. ret = job->poll(done,total,timeout);
  532. }
  533. catch (IException * e) {
  534. rete.setown(e);
  535. }
  536. if ((ret!=ACScontinue)||rete.get()) {
  537. job->join();
  538. CriticalBlock block(sect);
  539. jobtable.remove(job);
  540. if (rete.get())
  541. throw rete.getClear();
  542. }
  543. return ret;
  544. }
  545. };
  546. //====================================================================================================
  547. inline void appendErr(MemoryBuffer &reply, unsigned e)
  548. {
  549. reply.append(e).append(getRFSERRText(e));
  550. }
  551. #define MAPCOMMAND(c,p) case c: { this->p(msg, reply) ; break; }
  552. #define MAPCOMMANDCLIENT(c,p,client) case c: { this->p(msg, reply, client); break; }
  553. #define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { testSocketFlag = true; this->p(msg, reply, client); break; }
  554. #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { this->p(msg, reply, client, throttler); break; }
  555. #define MAPCOMMANDSTATS(c,p,stats) case c: { this->p(msg, reply, stats); break; }
  556. #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { this->p(msg, reply, client, stats); break; }
  557. static unsigned ClientCount = 0;
  558. static unsigned MaxClientCount = 0;
  559. static CriticalSection ClientCountSect;
  560. #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
  561. class CClientStats : public CInterface
  562. {
  563. public:
  564. CClientStats(const char *_client) : client(_client), count(0), bRead(0), bWritten(0) { }
  565. const char *queryFindString() const { return client; }
  566. inline void addRead(unsigned __int64 len)
  567. {
  568. bRead += len;
  569. }
  570. inline void addWrite(unsigned __int64 len)
  571. {
  572. bWritten += len;
  573. }
  574. void getStatus(StringBuffer & info) const
  575. {
  576. info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
  577. client.get(), count, bRead.load(), bWritten.load()).newline();
  578. }
  579. StringAttr client;
  580. unsigned __int64 count;
  581. std::atomic<unsigned __int64> bRead;
  582. std::atomic<unsigned __int64> bWritten;
  583. };
  584. class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
  585. {
  586. typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
  587. CriticalSection crit;
  588. unsigned cmdStats[RFCmax];
  589. static int compareElement(void* const *ll, void* const *rr)
  590. {
  591. const CClientStats *l = (const CClientStats *) *ll;
  592. const CClientStats *r = (const CClientStats *) *rr;
  593. if (l->count == r->count)
  594. return 0;
  595. else if (l->count<r->count)
  596. return 1;
  597. else
  598. return -1;
  599. }
  600. public:
  601. CClientStatsTable()
  602. {
  603. memset(&cmdStats[0], 0, sizeof(cmdStats));
  604. }
  605. ~CClientStatsTable()
  606. {
  607. _releaseAll();
  608. }
  609. CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
  610. {
  611. CriticalBlock b(crit);
  612. CClientStats *stats = PARENT::find(client);
  613. if (!stats)
  614. {
  615. stats = new CClientStats(client);
  616. PARENT::replace(*stats);
  617. }
  618. if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
  619. cmdStats[cmd]++;
  620. ++stats->count;
  621. return LINK(stats);
  622. }
  623. StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
  624. {
  625. CriticalBlock b(crit);
  626. unsigned __int64 totalCmds = 0;
  627. for (unsigned c=0; c<RFCmax; c++)
  628. totalCmds += cmdStats[c];
  629. unsigned totalClients = PARENT::ordinality();
  630. info.appendf("Commands processed = %" I64F "u, unique clients = %u", totalCmds, totalClients);
  631. if (totalCmds)
  632. {
  633. info.append("Command stats:").newline();
  634. for (unsigned c=0; c<RFCmax; c++)
  635. {
  636. unsigned __int64 count = cmdStats[c];
  637. if (count)
  638. info.append(getRFCText(c)).append(": ").append(count).newline();
  639. }
  640. }
  641. if (totalClients)
  642. {
  643. SuperHashIteratorOf<CClientStats> iter(*this);
  644. PointerArrayOf<CClientStats> elements;
  645. ForEach(iter)
  646. {
  647. CClientStats &elem = iter.query();
  648. elements.append(&elem);
  649. }
  650. elements.sort(&compareElement);
  651. if (level < 10)
  652. {
  653. // list up to 10 clients ordered by # of commands processed
  654. unsigned max=elements.ordinality();
  655. if (max>10)
  656. max = 10; // cap
  657. info.append("Top 10 clients:").newline();
  658. for (unsigned e=0; e<max; e++)
  659. {
  660. const CClientStats &element = *elements.item(e);
  661. element.getStatus(info);
  662. }
  663. }
  664. else // list all
  665. {
  666. info.append("All clients:").newline();
  667. ForEachItemIn(e, elements)
  668. {
  669. const CClientStats &element = *elements.item(e);
  670. element.getStatus(info);
  671. }
  672. }
  673. }
  674. return info;
  675. }
  676. void reset()
  677. {
  678. CriticalBlock b(crit);
  679. memset(&cmdStats[0], 0, sizeof(cmdStats));
  680. kill();
  681. }
  682. };
  683. interface IRemoteReadActivity;
  684. interface IRemoteWriteActivity;
  685. interface IRemoteActivity : extends IInterface
  686. {
  687. virtual unsigned __int64 queryProcessed() const = 0;
  688. virtual IOutputMetaData *queryOutputMeta() const = 0;
  689. virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
  690. virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
  691. virtual void restoreCursor(MemoryBuffer &src) = 0;
  692. virtual bool isGrouped() const = 0;
  693. virtual void flushStatistics(CClientStats &stats) = 0;
  694. virtual IRemoteReadActivity *queryIsReadActivity() { return nullptr; }
  695. virtual IRemoteWriteActivity *queryIsWriteActivity() { return nullptr; }
  696. };
  697. interface IRemoteReadActivity : extends IRemoteActivity
  698. {
  699. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &sz) = 0;
  700. virtual bool requiresPostProject() const = 0;
  701. };
  702. interface IRemoteWriteActivity : extends IRemoteActivity
  703. {
  704. virtual void write(size32_t sz, const void *row) = 0;
  705. };
  706. class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
  707. {
  708. int cursorHandle;
  709. OutputFormat format;
  710. unsigned __int64 replyLimit = defaultDaFSReplyLimitKB * 1024;
  711. Linked<IRemoteActivity> activity;
  712. Linked<ICompressor> compressor;
  713. Linked<IExpander> expander;
  714. MemoryBuffer expandMb;
  715. Owned<IXmlWriterExt> responseWriter; // for xml or json response
  716. bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
  717. {
  718. size32_t sz = inMb.length()-inPos;
  719. if (sz < replyLimit)
  720. return false;
  721. if (!compressor)
  722. return true;
  723. // consumes data from inMb into compressor
  724. totalSz += sz;
  725. const void *data = inMb.bytes()+inPos;
  726. assertex(compressor->write(data, sz) == sz);
  727. inMb.setLength(inPos);
  728. return compressMb.capacity() > replyLimit;
  729. }
  730. void processRead(IPropertyTree *requestTree, MemoryBuffer &responseMb)
  731. {
  732. IRemoteReadActivity *readActivity = activity->queryIsReadActivity();
  733. assertex(readActivity);
  734. MemoryBuffer compressMb;
  735. IOutputMetaData *outMeta = readActivity->queryOutputMeta();
  736. bool eoi=false;
  737. bool grouped = readActivity->isGrouped();
  738. MemoryBuffer resultBuffer;
  739. MemoryBufferBuilder outBuilder(resultBuffer, outMeta->getMinRecordSize());
  740. if (outFmt_Binary == format)
  741. {
  742. if (compressor)
  743. {
  744. compressMb.setEndian(__BIG_ENDIAN);
  745. compressMb.append(responseMb);
  746. }
  747. DelayedMarker<size32_t> dataLenMarker(compressor ? compressMb : responseMb); // uncompressed data size
  748. if (compressor)
  749. {
  750. size32_t initialSz = replyLimit >= 0x10000 ? 0x10000 : replyLimit;
  751. compressor->open(compressMb, initialSz);
  752. }
  753. outBuilder.setBuffer(responseMb); // write direct to responseMb buffer for efficiency
  754. unsigned __int64 numProcessedStart = readActivity->queryProcessed();
  755. size32_t totalDataSz = 0;
  756. size32_t dataStartPos = responseMb.length();
  757. if (grouped)
  758. {
  759. bool pastFirstRow = numProcessedStart>0;
  760. do
  761. {
  762. size32_t eogPos = 0;
  763. if (pastFirstRow)
  764. {
  765. /* this is for last row output, which might have been returned in the previous request
  766. * The eog marker may change as a result of the next row (see writeDirect() call below);
  767. */
  768. eogPos = responseMb.length();
  769. responseMb.append(false);
  770. }
  771. size32_t rowSz;
  772. const void *row = readActivity->nextRow(outBuilder, rowSz);
  773. if (!row)
  774. {
  775. if (!pastFirstRow)
  776. {
  777. eoi = true;
  778. break;
  779. }
  780. else
  781. {
  782. bool eog = true;
  783. responseMb.writeDirect(eogPos, sizeof(eog), &eog);
  784. row = readActivity->nextRow(outBuilder, rowSz);
  785. if (!row)
  786. {
  787. eoi = true;
  788. break;
  789. }
  790. }
  791. }
  792. pastFirstRow = true;
  793. }
  794. while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
  795. }
  796. else
  797. {
  798. do
  799. {
  800. size32_t rowSz;
  801. const void *row = readActivity->nextRow(outBuilder, rowSz);
  802. if (!row)
  803. {
  804. eoi = true;
  805. break;
  806. }
  807. }
  808. while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
  809. }
  810. // Consume any trailing data remaining
  811. if (compressor)
  812. {
  813. size32_t sz = responseMb.length()-dataStartPos;
  814. if (sz)
  815. {
  816. // consumes data built up in responseMb buffer into compressor
  817. totalDataSz += sz;
  818. const void *data = responseMb.bytes()+dataStartPos;
  819. assertex(compressor->write(data, sz) == sz);
  820. responseMb.setLength(dataStartPos);
  821. }
  822. }
  823. // finalize responseMb
  824. dataLenMarker.write(compressor ? totalDataSz : responseMb.length()-dataStartPos);
  825. DelayedSizeMarker cursorLenMarker(responseMb); // cursor length
  826. if (!eoi)
  827. readActivity->serializeCursor(responseMb);
  828. cursorLenMarker.write();
  829. if (compressor)
  830. {
  831. // consume cursor into compressor
  832. size32_t sz = responseMb.length()-dataStartPos;
  833. const void *data = responseMb.bytes()+dataStartPos;
  834. assertex(compressor->write(data, sz) == sz);
  835. compressor->close();
  836. // now ready to swap compressed output into responseMb
  837. responseMb.swapWith(compressMb);
  838. }
  839. }
  840. else
  841. {
  842. responseWriter->outputBeginArray("Row");
  843. if (grouped)
  844. {
  845. bool pastFirstRow = readActivity->queryProcessed()>0;
  846. bool first = true;
  847. do
  848. {
  849. size32_t rowSz;
  850. const void *row = readActivity->nextRow(outBuilder, rowSz);
  851. if (!row)
  852. {
  853. if (!pastFirstRow)
  854. {
  855. eoi = true;
  856. break;
  857. }
  858. else
  859. {
  860. row = readActivity->nextRow(outBuilder, rowSz);
  861. if (!row)
  862. {
  863. eoi = true;
  864. break;
  865. }
  866. if (first) // possible if eog was 1st row on next packet
  867. responseWriter->outputBeginNested("Row", false);
  868. responseWriter->outputBool(true, "dfs:Eog"); // field name cannot clash with an ecl field name
  869. }
  870. }
  871. if (pastFirstRow)
  872. responseWriter->outputEndNested("Row"); // close last row
  873. responseWriter->outputBeginNested("Row", false);
  874. outMeta->toXML((const byte *)row, *responseWriter);
  875. resultBuffer.clear();
  876. pastFirstRow = true;
  877. first = false;
  878. }
  879. while (responseWriter->length() < replyLimit);
  880. if (pastFirstRow)
  881. responseWriter->outputEndNested("Row"); // close last row
  882. }
  883. else
  884. {
  885. do
  886. {
  887. size32_t rowSz;
  888. const void *row = readActivity->nextRow(outBuilder, rowSz);
  889. if (!row)
  890. {
  891. eoi = true;
  892. break;
  893. }
  894. responseWriter->outputBeginNested("Row", false);
  895. outMeta->toXML((const byte *)row, *responseWriter);
  896. responseWriter->outputEndNested("Row");
  897. resultBuffer.clear();
  898. }
  899. while (responseWriter->length() < replyLimit);
  900. }
  901. responseWriter->outputEndArray("Row");
  902. if (!eoi)
  903. {
  904. MemoryBuffer cursorMb;
  905. cursorMb.setEndian(__BIG_ENDIAN);
  906. readActivity->serializeCursor(cursorMb);
  907. StringBuffer cursorBinStr;
  908. JBASE64_Encode(cursorMb.toByteArray(), cursorMb.length(), cursorBinStr, true);
  909. responseWriter->outputString(cursorBinStr.length(), cursorBinStr.str(), "cursorBin");
  910. }
  911. }
  912. }
  913. void processWrite(IPropertyTree *requestTree, MemoryBuffer &rowDataMb, MemoryBuffer &responseMb)
  914. {
  915. IRemoteWriteActivity *writeActivity = activity->queryIsWriteActivity();
  916. assertex(writeActivity);
  917. /* row data is in serialized disk format already, and do not need to look at individual rows
  918. * so simply dump to disk
  919. */
  920. size32_t rowDataSz;
  921. rowDataMb.read(rowDataSz);
  922. const void *rowData;
  923. if (expander)
  924. {
  925. rowDataSz = expander->init(rowDataMb.readDirect(rowDataSz));
  926. expandMb.clear().reserve(rowDataSz);
  927. expander->expand(expandMb.bufferBase());
  928. rowData = expandMb.bufferBase();
  929. }
  930. else
  931. rowData = rowDataMb.readDirect(rowDataSz);
  932. writeActivity->write(rowDataSz, rowData);
  933. }
  934. public:
  935. CRemoteRequest(int _cursorHandle, OutputFormat _format, ICompressor *_compressor, IExpander *_expander, IRemoteActivity *_activity)
  936. : cursorHandle(_cursorHandle), format(_format), compressor(_compressor), expander(_expander), activity(_activity)
  937. {
  938. if (outFmt_Binary != format)
  939. {
  940. responseWriter.setown(createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == format ? WTStandard : WTJSONObject));
  941. responseWriter->outputBeginNested("Response", true);
  942. if (outFmt_Xml == format)
  943. responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
  944. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  945. }
  946. }
  947. OutputFormat queryFormat() const { return format; }
  948. unsigned __int64 queryReplyLimit() const { return replyLimit; }
  949. IRemoteActivity *queryActivity() const { return activity; }
  950. ICompressor *queryCompressor() const { return compressor; }
  951. void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb, CClientStats &stats)
  952. {
  953. if (requestTree->hasProp("replyLimit"))
  954. replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
  955. if (outFmt_Binary == format)
  956. responseMb.append(cursorHandle);
  957. else // outFmt_Xml || outFmt_Json
  958. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  959. if (requestTree->hasProp("cursorBin")) // use handle if one provided
  960. {
  961. MemoryBuffer cursorMb;
  962. cursorMb.setEndian(__BIG_ENDIAN);
  963. JBASE64_Decode(requestTree->queryProp("cursorBin"), cursorMb);
  964. activity->restoreCursor(cursorMb);
  965. }
  966. if (activity->queryIsReadActivity())
  967. processRead(requestTree, responseMb);
  968. else if (activity->queryIsWriteActivity())
  969. processWrite(requestTree, restMb, responseMb);
  970. activity->flushStatistics(stats);
  971. if (outFmt_Binary != format)
  972. {
  973. responseWriter->outputEndNested("Response");
  974. responseWriter->finalize();
  975. PROGLOG("Response: %s", responseWriter->str());
  976. responseMb.append(responseWriter->length(), responseWriter->str());
  977. }
  978. }
  979. };
  980. enum OpenFileFlag { of_null=0x0, of_key=0x01 };
  981. struct OpenFileInfo
  982. {
  983. OpenFileInfo() { }
  984. OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
  985. OpenFileInfo(int _handle, CRemoteRequest *_remoteRequest, StringAttrItem *_filename)
  986. : handle(_handle), remoteRequest(_remoteRequest), filename(_filename) { }
  987. Linked<IFileIO> fileIO;
  988. Linked<CRemoteRequest> remoteRequest;
  989. Linked<StringAttrItem> filename; // for debug
  990. int handle = 0;
  991. unsigned flags = 0;
  992. };
  993. static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName, bool grouped)
  994. {
  995. IPropertyTree *json = actNode.queryPropTree(typePropName);
  996. if (json)
  997. return createTypeInfoOutputMetaData(*json, grouped);
  998. else
  999. {
  1000. StringBuffer binTypePropName(typePropName);
  1001. const char *jsonBin = actNode.queryProp(binTypePropName.append("Bin"));
  1002. if (!jsonBin)
  1003. return nullptr;
  1004. MemoryBuffer mb;
  1005. JBASE64_Decode(jsonBin, mb);
  1006. return createTypeInfoOutputMetaData(mb, grouped);
  1007. }
  1008. }
  1009. class CRemoteDiskBaseActivity : public CSimpleInterfaceOf<IRemoteReadActivity>, implements IVirtualFieldCallback
  1010. {
  1011. typedef CSimpleInterfaceOf<IRemoteReadActivity> PARENT;
  1012. protected:
  1013. StringAttr fileName; // physical filename
  1014. Linked<IOutputMetaData> inMeta, outMeta;
  1015. unsigned __int64 processed = 0;
  1016. bool outputGrouped = false;
  1017. bool opened = false;
  1018. bool eofSeen = false;
  1019. const RtlRecord *record = nullptr;
  1020. RowFilter filters;
  1021. RtlDynRow *filterRow = nullptr;
  1022. // virtual field values
  1023. StringAttr logicalFilename;
  1024. unsigned numInputFields = 0;
  1025. inline bool fieldFilterMatch(const void * buffer)
  1026. {
  1027. if (filterRow)
  1028. {
  1029. filterRow->setRow(buffer, filters.getNumFieldsRequired());
  1030. return filters.matches(*filterRow);
  1031. }
  1032. else
  1033. return true;
  1034. }
  1035. public:
  1036. IMPLEMENT_IINTERFACE_USING(PARENT);
  1037. CRemoteDiskBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
  1038. {
  1039. fileName.set(config.queryProp("fileName"));
  1040. if (isEmptyString(fileName))
  1041. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteDiskBaseActivity: fileName missing");
  1042. logicalFilename.set(config.queryProp("virtualFields/logicalFilename"));
  1043. }
  1044. ~CRemoteDiskBaseActivity()
  1045. {
  1046. delete filterRow;
  1047. }
  1048. void setupInputMeta(const IPropertyTree &config, IOutputMetaData *_inMeta)
  1049. {
  1050. inMeta.setown(_inMeta);
  1051. record = &inMeta->queryRecordAccessor(true);
  1052. numInputFields = record->getNumFields();
  1053. if (config.hasProp("keyFilter"))
  1054. {
  1055. filterRow = new RtlDynRow(*record);
  1056. Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
  1057. ForEach(*filterIter)
  1058. filters.addFilter(*record, filterIter->query().queryProp(nullptr));
  1059. }
  1060. }
  1061. // IRemoteReadActivity impl.
  1062. virtual unsigned __int64 queryProcessed() const override
  1063. {
  1064. return processed;
  1065. }
  1066. virtual IOutputMetaData *queryOutputMeta() const override
  1067. {
  1068. return outMeta;
  1069. }
  1070. virtual bool isGrouped() const override
  1071. {
  1072. return outputGrouped;
  1073. }
  1074. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1075. {
  1076. throwUnexpected();
  1077. }
  1078. virtual void restoreCursor(MemoryBuffer &src) override
  1079. {
  1080. throwUnexpected();
  1081. }
  1082. virtual void flushStatistics(CClientStats &stats) override
  1083. {
  1084. throwUnexpected();
  1085. }
  1086. virtual IRemoteReadActivity *queryIsReadActivity()
  1087. {
  1088. return this;
  1089. }
  1090. virtual bool requiresPostProject() const override
  1091. {
  1092. return false;
  1093. }
  1094. //interface IVirtualFieldCallback
  1095. virtual const char * queryLogicalFilename(const void * row) override
  1096. {
  1097. return logicalFilename.str();
  1098. }
  1099. virtual unsigned __int64 getFilePosition(const void * row) override
  1100. {
  1101. throwUnexpected();
  1102. }
  1103. virtual unsigned __int64 getLocalFilePosition(const void * row) override
  1104. {
  1105. throwUnexpected();
  1106. }
  1107. virtual const byte * lookupBlob(unsigned __int64 id) override
  1108. {
  1109. throwUnexpected();
  1110. }
  1111. };
  1112. class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity, implements IFileSerialStreamCallback
  1113. {
  1114. typedef CRemoteDiskBaseActivity PARENT;
  1115. protected:
  1116. Owned<ISerialStream> inputStream;
  1117. Owned<IFileIO> iFileIO;
  1118. unsigned __int64 chooseN = 0;
  1119. unsigned __int64 startPos = 0;
  1120. bool compressed = false;
  1121. bool cursorDirty = false;
  1122. // virtual field values
  1123. unsigned partNum = 0;
  1124. offset_t baseFpos = 0;
  1125. unsigned __int64 bytesRead = 0;
  1126. virtual bool refreshCursor()
  1127. {
  1128. if (inputStream->tell() != startPos)
  1129. {
  1130. inputStream->reset(startPos);
  1131. return true;
  1132. }
  1133. return false;
  1134. }
  1135. bool checkOpen() // NB: returns true if this call opened file
  1136. {
  1137. if (opened)
  1138. {
  1139. if (!cursorDirty)
  1140. return false;
  1141. refreshCursor();
  1142. eofSeen = false;
  1143. cursorDirty = false;
  1144. return false;
  1145. }
  1146. cursorDirty = false;
  1147. OwnedIFile iFile = createIFile(fileName);
  1148. assertex(iFile);
  1149. iFileIO.setown(createCompressedFileReader(iFile));
  1150. if (iFileIO)
  1151. {
  1152. if (!compressed)
  1153. {
  1154. WARNLOG("meta info did not mark file '%s' as compressed, but detected file as compressed", fileName.get());
  1155. compressed = true;
  1156. }
  1157. }
  1158. else
  1159. {
  1160. iFileIO.setown(iFile->open(IFOread));
  1161. if (!iFileIO)
  1162. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Failed to open: '%s'", fileName.get());
  1163. if (compressed)
  1164. {
  1165. WARNLOG("meta info marked file '%s' as compressed, but detected file as uncompressed", fileName.get());
  1166. compressed = false;
  1167. }
  1168. }
  1169. inputStream.setown(createFileSerialStream(iFileIO, startPos, (offset_t)-1, (size32_t)-1, this));
  1170. opened = true;
  1171. eofSeen = false;
  1172. return true;
  1173. }
  1174. void close()
  1175. {
  1176. iFileIO.clear();
  1177. opened = false;
  1178. eofSeen = true;
  1179. }
  1180. // IFileSerialStreamCallback impl.
  1181. virtual void process(offset_t ofs, size32_t sz, const void *buf) override
  1182. {
  1183. bytesRead += sz;
  1184. }
  1185. public:
  1186. CRemoteStreamReadBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1187. {
  1188. compressed = config.getPropBool("compressed");
  1189. chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
  1190. partNum = config.getPropInt("virtualFields/partNum");
  1191. baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
  1192. }
  1193. virtual void flushStatistics(CClientStats &stats) override
  1194. {
  1195. // NB: will be called by same thread that is reading.
  1196. stats.addRead(bytesRead);
  1197. bytesRead = 0;
  1198. }
  1199. // IVirtualFieldCallback impl.
  1200. virtual unsigned __int64 getFilePosition(const void * row) override
  1201. {
  1202. return inputStream->tell() + baseFpos;
  1203. }
  1204. virtual unsigned __int64 getLocalFilePosition(const void * row) override
  1205. {
  1206. return makeLocalFposOffset(partNum, inputStream->tell());
  1207. }
  1208. };
  1209. class CRemoteDiskReadActivity : public CRemoteStreamReadBaseActivity
  1210. {
  1211. typedef CRemoteStreamReadBaseActivity PARENT;
  1212. CThorContiguousRowBuffer prefetchBuffer;
  1213. Owned<ISourceRowPrefetcher> prefetcher;
  1214. bool inputGrouped = false;
  1215. bool eogPending = false;
  1216. bool someInGroup = false;
  1217. Owned<const IDynamicTransform> translator;
  1218. virtual bool refreshCursor() override
  1219. {
  1220. if (prefetchBuffer.tell() != startPos)
  1221. {
  1222. inputStream->reset(startPos);
  1223. prefetchBuffer.clearStream();
  1224. prefetchBuffer.setStream(inputStream);
  1225. return true;
  1226. }
  1227. return false;
  1228. }
  1229. bool checkOpen()
  1230. {
  1231. if (!PARENT::checkOpen()) // returns true if it opened file
  1232. return false;
  1233. prefetchBuffer.setStream(inputStream);
  1234. prefetcher.setown(inMeta->createDiskPrefetcher());
  1235. return true;
  1236. }
  1237. public:
  1238. CRemoteDiskReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc), prefetchBuffer(nullptr)
  1239. {
  1240. inputGrouped = config.getPropBool("inputGrouped", false);
  1241. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", inputGrouped));
  1242. outputGrouped = config.getPropBool("outputGrouped", false);
  1243. if (!inputGrouped && outputGrouped)
  1244. outputGrouped = false; // perhaps should fire error
  1245. outMeta.setown(getTypeInfoOutputMetaData(config, "output", outputGrouped));
  1246. if (!outMeta)
  1247. outMeta.set(inMeta);
  1248. translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
  1249. }
  1250. // IRemoteReadActivity impl.
  1251. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1252. {
  1253. if (eogPending || eofSeen)
  1254. {
  1255. eogPending = false;
  1256. someInGroup = false;
  1257. retSz = 0;
  1258. return nullptr;
  1259. }
  1260. checkOpen();
  1261. while (!eofSeen && (processed < chooseN))
  1262. {
  1263. while (!prefetchBuffer.eos())
  1264. {
  1265. prefetcher->readAhead(prefetchBuffer);
  1266. size32_t inputRowSz = prefetchBuffer.queryRowSize();
  1267. bool eog = false;
  1268. if (inputGrouped)
  1269. {
  1270. prefetchBuffer.skip(sizeof(eog));
  1271. if (outputGrouped)
  1272. {
  1273. byte b = *(prefetchBuffer.queryRow()+inputRowSz);
  1274. memcpy(&eog, prefetchBuffer.queryRow()+inputRowSz, sizeof(eog));
  1275. }
  1276. }
  1277. const byte *next = prefetchBuffer.queryRow();
  1278. size32_t rowSz; // use local var instead of reference param for efficiency
  1279. if (fieldFilterMatch(next))
  1280. rowSz = translator->translate(outBuilder, *this, next);
  1281. else
  1282. rowSz = 0;
  1283. prefetchBuffer.finishedRow();
  1284. const void *ret = outBuilder.getSelf();
  1285. outBuilder.finishRow(rowSz);
  1286. if (rowSz)
  1287. {
  1288. processed++;
  1289. eogPending = eog;
  1290. someInGroup = true;
  1291. retSz = rowSz;
  1292. return ret;
  1293. }
  1294. else if (eog)
  1295. {
  1296. eogPending = false;
  1297. if (someInGroup)
  1298. {
  1299. someInGroup = false;
  1300. return nullptr;
  1301. }
  1302. }
  1303. }
  1304. eofSeen = true;
  1305. }
  1306. close();
  1307. retSz = 0;
  1308. return nullptr;
  1309. }
  1310. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1311. {
  1312. tgt.append(prefetchBuffer.tell());
  1313. tgt.append(processed);
  1314. tgt.append(someInGroup);
  1315. tgt.append(eogPending);
  1316. }
  1317. virtual void restoreCursor(MemoryBuffer &src) override
  1318. {
  1319. cursorDirty = true;
  1320. src.read(startPos);
  1321. src.read(processed);
  1322. src.read(someInGroup);
  1323. src.read(eogPending);
  1324. }
  1325. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1326. {
  1327. return out.appendf("diskread[%s]", fileName.get());
  1328. }
  1329. //interface IVirtualFieldCallback
  1330. virtual unsigned __int64 getFilePosition(const void * row) override
  1331. {
  1332. return prefetchBuffer.tell() + baseFpos;
  1333. }
  1334. };
  1335. class CRemoteExternalFormatReadActivity : public CRemoteStreamReadBaseActivity
  1336. {
  1337. typedef CRemoteStreamReadBaseActivity PARENT;
  1338. protected:
  1339. Owned<const IDynamicFieldValueFetcher> fieldFetcher;
  1340. Owned<const IDynamicTransform> translator;
  1341. bool postProject = false;
  1342. public:
  1343. CRemoteExternalFormatReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1344. {
  1345. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
  1346. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1347. const RtlRecord *outRecord = record;
  1348. if (filterRow)
  1349. {
  1350. if (outMeta)
  1351. postProject = true;
  1352. outMeta.set(inMeta);
  1353. }
  1354. else
  1355. {
  1356. if (outMeta)
  1357. outRecord = &outMeta->queryRecordAccessor(true);
  1358. else
  1359. outMeta.set(inMeta);
  1360. }
  1361. translator.setown(createRecordTranslatorViaCallback(*outRecord, *record, type_utf8));
  1362. }
  1363. virtual bool requiresPostProject() const override
  1364. {
  1365. return postProject;
  1366. }
  1367. };
  1368. class CNullNestedRowIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
  1369. {
  1370. public:
  1371. virtual bool first() override { return false; }
  1372. virtual bool next() override { return false; }
  1373. virtual bool isValid() override { return false; }
  1374. virtual IDynamicFieldValueFetcher &query() override
  1375. {
  1376. throwUnexpected();
  1377. }
  1378. } nullNestedRowIterator;
  1379. class CRemoteCsvReadActivity : public CRemoteExternalFormatReadActivity
  1380. {
  1381. typedef CRemoteExternalFormatReadActivity PARENT;
  1382. StringBuffer csvQuote, csvSeparate, csvTerminate, csvEscape;
  1383. unsigned __int64 headerLines = 0;
  1384. unsigned __int64 maxRowSize = 0;
  1385. bool preserveWhitespace = false;
  1386. CSVSplitter csvSplitter;
  1387. class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
  1388. {
  1389. CSVSplitter &csvSplitter;
  1390. unsigned numInputFields;
  1391. public:
  1392. CFieldFetcher(CSVSplitter &_csvSplitter, unsigned _numInputFields) : csvSplitter(_csvSplitter), numInputFields(_numInputFields)
  1393. {
  1394. }
  1395. virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
  1396. {
  1397. dbgassertex(fieldNum < numInputFields);
  1398. sz = csvSplitter.queryLengths()[fieldNum];
  1399. return csvSplitter.queryData()[fieldNum];
  1400. }
  1401. virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
  1402. {
  1403. return LINK(&nullNestedRowIterator);
  1404. }
  1405. virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
  1406. virtual size32_t getRecordSize() const override { throwUnexpected(); }
  1407. };
  1408. bool checkOpen()
  1409. {
  1410. if (!PARENT::checkOpen())
  1411. return false;
  1412. csvSplitter.init(numInputFields, maxRowSize, csvQuote, csvSeparate, csvTerminate, csvEscape, preserveWhitespace);
  1413. if (headerLines)
  1414. {
  1415. do
  1416. {
  1417. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  1418. if (0 == lineLength)
  1419. break;
  1420. inputStream->skip(lineLength);
  1421. }
  1422. while (--headerLines);
  1423. }
  1424. if (!fieldFetcher)
  1425. fieldFetcher.setown(new CFieldFetcher(csvSplitter, numInputFields));
  1426. return true;
  1427. }
  1428. const unsigned defaultMaxCsvRowSize = 10; // MB
  1429. public:
  1430. CRemoteCsvReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1431. {
  1432. maxRowSize = config.getPropInt64("ActivityOptions/maxRowSize", defaultMaxCsvRowSize) * 1024 * 1024;
  1433. preserveWhitespace = config.getPropBool("ActivityOptions/preserveWhitespace");
  1434. if (!config.getProp("ActivityOptions/csvQuote", csvQuote))
  1435. {
  1436. if (!fileDesc->queryProperties().getProp("@csvQuote", csvQuote))
  1437. csvQuote.append("\"");
  1438. }
  1439. if (!config.getProp("ActivityOptions/csvSeparate", csvSeparate))
  1440. {
  1441. if (!fileDesc->queryProperties().getProp("@csvSeparate", csvSeparate))
  1442. csvSeparate.append("\\,");
  1443. }
  1444. if (!config.getProp("ActivityOptions/csvTerminate", csvTerminate))
  1445. {
  1446. if (!fileDesc->queryProperties().getProp("@csvTerminate", csvTerminate))
  1447. csvTerminate.append("\\n,\\r\\n");
  1448. }
  1449. if (!config.getProp("ActivityOptions/csvEscape", csvEscape))
  1450. fileDesc->queryProperties().getProp("@csvEscape", csvEscape);
  1451. headerLines = config.getPropInt64("ActivityOptions/headerLines"); // really this should be a published attribute too
  1452. }
  1453. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1454. {
  1455. return out.appendf("csvread[%s]", fileName.get());
  1456. }
  1457. // IRemoteReadActivity impl.
  1458. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1459. {
  1460. if (eofSeen)
  1461. {
  1462. retSz = 0;
  1463. return nullptr;
  1464. }
  1465. checkOpen();
  1466. while (!eofSeen && (processed < chooseN))
  1467. {
  1468. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  1469. if (!lineLength)
  1470. break;
  1471. retSz = translator->translate(outBuilder, *this, *fieldFetcher);
  1472. dbgassertex(retSz);
  1473. const void *ret = outBuilder.getSelf();
  1474. if (fieldFilterMatch(ret))
  1475. {
  1476. outBuilder.finishRow(retSz);
  1477. ++processed;
  1478. inputStream->skip(lineLength);
  1479. return ret;
  1480. }
  1481. else
  1482. outBuilder.removeBytes(retSz);
  1483. inputStream->skip(lineLength);
  1484. }
  1485. eofSeen = true;
  1486. close();
  1487. retSz = 0;
  1488. return nullptr;
  1489. }
  1490. };
  1491. class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, implements IXMLSelect
  1492. {
  1493. typedef CRemoteExternalFormatReadActivity PARENT;
  1494. ThorActivityKind kind;
  1495. IXmlToRowTransformer *xmlTransformer;
  1496. Linked<IColumnProvider> lastMatch;
  1497. Owned<IXMLParse> xmlParser;
  1498. bool noRoot = false;
  1499. bool useXmlContents = false;
  1500. // JCSMORE - it would be good if these were cached/reused (can I assume anything using fetcher is single threaded?)
  1501. class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
  1502. {
  1503. unsigned numInputFields;
  1504. const RtlRecord &recInfo;
  1505. Linked<IColumnProvider> currentMatch;
  1506. const char **compoundXPaths = nullptr;
  1507. const char *queryCompoundXPath(unsigned fieldNum) const
  1508. {
  1509. if (compoundXPaths && compoundXPaths[fieldNum])
  1510. return compoundXPaths[fieldNum];
  1511. else
  1512. return recInfo.queryXPath(fieldNum);
  1513. }
  1514. public:
  1515. CFieldFetcher(const RtlRecord &_recInfo, IColumnProvider *_currentMatch) : recInfo(_recInfo), currentMatch(_currentMatch)
  1516. {
  1517. numInputFields = recInfo.getNumFields();
  1518. // JCSMORE - should this be done (optionally) when RtlRecord is created?
  1519. for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
  1520. {
  1521. if (recInfo.queryType(fieldNum)->queryChildType())
  1522. {
  1523. const char *xpath = recInfo.queryXPath(fieldNum);
  1524. dbgassertex(xpath);
  1525. const char *ptr = xpath;
  1526. char *expandedXPath = nullptr;
  1527. char *expandedXPathPtr = nullptr;
  1528. while (true)
  1529. {
  1530. if (*ptr == xpathCompoundSeparatorChar)
  1531. {
  1532. if (!compoundXPaths)
  1533. {
  1534. compoundXPaths = new const char *[numInputFields];
  1535. memset(compoundXPaths, 0, sizeof(const char *)*numInputFields);
  1536. }
  1537. size_t sz = strlen(xpath)+1;
  1538. expandedXPath = new char[sz];
  1539. expandedXPathPtr = expandedXPath;
  1540. if (ptr == xpath) // if leading char, just skip
  1541. ++ptr;
  1542. else
  1543. {
  1544. size32_t len = ptr-xpath;
  1545. memcpy(expandedXPath, xpath, len);
  1546. expandedXPathPtr = expandedXPath + len;
  1547. *expandedXPathPtr++ = '/';
  1548. ++ptr;
  1549. }
  1550. while (*ptr)
  1551. {
  1552. if (*ptr == xpathCompoundSeparatorChar)
  1553. {
  1554. *expandedXPathPtr++ = '/';
  1555. ++ptr;
  1556. }
  1557. else
  1558. *expandedXPathPtr++ = *ptr++;
  1559. }
  1560. }
  1561. else
  1562. ptr++;
  1563. if ('\0' == *ptr)
  1564. {
  1565. if (expandedXPath)
  1566. {
  1567. *expandedXPathPtr = '\0';
  1568. compoundXPaths[fieldNum] = expandedXPath;
  1569. }
  1570. break;
  1571. }
  1572. }
  1573. }
  1574. }
  1575. }
  1576. ~CFieldFetcher()
  1577. {
  1578. if (compoundXPaths)
  1579. {
  1580. for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
  1581. delete [] compoundXPaths[fieldNum];
  1582. delete [] compoundXPaths;
  1583. }
  1584. }
  1585. void setCurrentMatch(IColumnProvider *_currentMatch)
  1586. {
  1587. currentMatch.set(_currentMatch);
  1588. }
  1589. // IDynamicFieldValueFetcher impl.
  1590. virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
  1591. {
  1592. dbgassertex(fieldNum < numInputFields);
  1593. dbgassertex(currentMatch);
  1594. size32_t rawSz;
  1595. const char *ret = currentMatch->readRaw(recInfo.queryXPath(fieldNum), rawSz);
  1596. sz = rawSz;
  1597. return (const byte *)ret;
  1598. }
  1599. virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
  1600. {
  1601. dbgassertex(fieldNum < numInputFields);
  1602. dbgassertex(currentMatch);
  1603. const RtlRecord *nested = recInfo.queryNested(fieldNum);
  1604. if (!nested)
  1605. return nullptr;
  1606. class CIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
  1607. {
  1608. XmlChildIterator xmlIter;
  1609. Linked<IDynamicFieldValueFetcher> curFieldValueFetcher;
  1610. Linked<IColumnProvider> parentMatch;
  1611. const RtlRecord &nestedRecInfo;
  1612. public:
  1613. CIterator(const RtlRecord &_nestedRecInfo, IColumnProvider *_parentMatch, const char *xpath) : nestedRecInfo(_nestedRecInfo), parentMatch(_parentMatch)
  1614. {
  1615. xmlIter.initOwn(parentMatch->getChildIterator(xpath));
  1616. }
  1617. virtual bool first() override
  1618. {
  1619. IColumnProvider *child = xmlIter.first();
  1620. if (!child)
  1621. {
  1622. curFieldValueFetcher.clear();
  1623. return false;
  1624. }
  1625. curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
  1626. return true;
  1627. }
  1628. virtual bool next() override
  1629. {
  1630. IColumnProvider *child = xmlIter.next();
  1631. if (!child)
  1632. {
  1633. curFieldValueFetcher.clear();
  1634. return false;
  1635. }
  1636. curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
  1637. return true;
  1638. }
  1639. virtual bool isValid() override
  1640. {
  1641. return nullptr != curFieldValueFetcher.get();
  1642. }
  1643. virtual IDynamicFieldValueFetcher &query() override
  1644. {
  1645. assertex(curFieldValueFetcher);
  1646. return *curFieldValueFetcher;
  1647. }
  1648. };
  1649. // JCSMORE - it would be good if these were cached/reused (can I assume anything using parent fetcher is single threaded?)
  1650. return new CIterator(*nested, currentMatch, queryCompoundXPath(fieldNum));
  1651. }
  1652. virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
  1653. virtual size32_t getRecordSize() const override { throwUnexpected(); }
  1654. };
  1655. bool checkOpen()
  1656. {
  1657. if (!PARENT::checkOpen())
  1658. return false;
  1659. class CSimpleStream : public CSimpleInterfaceOf<ISimpleReadStream>
  1660. {
  1661. Linked<ISerialStream> stream;
  1662. public:
  1663. CSimpleStream(ISerialStream *_stream) : stream(_stream)
  1664. {
  1665. }
  1666. // ISimpleReadStream impl.
  1667. virtual size32_t read(size32_t max_len, void * data) override
  1668. {
  1669. size32_t got;
  1670. const void *res = stream->peek(max_len, got);
  1671. if (got)
  1672. {
  1673. if (got>max_len)
  1674. got = max_len;
  1675. memcpy(data, res, got);
  1676. stream->skip(got);
  1677. }
  1678. return got;
  1679. }
  1680. };
  1681. Owned<ISimpleReadStream> simpleStream = new CSimpleStream(inputStream);
  1682. if (kind==TAKjsonread)
  1683. xmlParser.setown(createJSONParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
  1684. else
  1685. xmlParser.setown(createXMLParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
  1686. if (!fieldFetcher)
  1687. fieldFetcher.setown(new CFieldFetcher(*record, nullptr));
  1688. return true;
  1689. }
  1690. protected:
  1691. StringBuffer xpath;
  1692. StringBuffer customRowTag;
  1693. public:
  1694. IMPLEMENT_IINTERFACE_USING(PARENT);
  1695. CRemoteMarkupReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc, ThorActivityKind _kind) : PARENT(config, fileDesc), kind(_kind)
  1696. {
  1697. config.getProp("ActivityOptions/rowTag", customRowTag);
  1698. noRoot = config.getPropBool("noRoot");
  1699. }
  1700. IColumnProvider *queryMatch() const { return lastMatch; }
  1701. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1702. {
  1703. return out.appendf("%s[%s]", getActivityText(kind), fileName.get());
  1704. }
  1705. // IRemoteReadActivity impl.
  1706. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1707. {
  1708. if (eofSeen)
  1709. {
  1710. retSz = 0;
  1711. return nullptr;
  1712. }
  1713. checkOpen();
  1714. while (xmlParser->next())
  1715. {
  1716. if (lastMatch)
  1717. {
  1718. ((CFieldFetcher *)fieldFetcher.get())->setCurrentMatch(lastMatch);
  1719. retSz = translator->translate(outBuilder, *this, *fieldFetcher);
  1720. dbgassertex(retSz);
  1721. lastMatch.clear();
  1722. const void *ret = outBuilder.getSelf();
  1723. if (fieldFilterMatch(ret))
  1724. {
  1725. outBuilder.finishRow(retSz);
  1726. ++processed;
  1727. return ret;
  1728. }
  1729. else
  1730. outBuilder.removeBytes(retSz);
  1731. }
  1732. }
  1733. eofSeen = true;
  1734. close();
  1735. retSz = 0;
  1736. return nullptr;
  1737. }
  1738. // IXMLSelect impl.
  1739. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  1740. {
  1741. lastMatch.set(&entry);
  1742. }
  1743. };
  1744. class CRemoteXmlReadActivity : public CRemoteMarkupReadActivity
  1745. {
  1746. typedef CRemoteMarkupReadActivity PARENT;
  1747. public:
  1748. CRemoteXmlReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKxmlread)
  1749. {
  1750. xpath.set("/Dataset/");
  1751. if (customRowTag.isEmpty()) // no override
  1752. fileDesc->queryProperties().getProp("@rowTag", xpath);
  1753. else
  1754. xpath.append(customRowTag);
  1755. }
  1756. };
  1757. class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
  1758. {
  1759. typedef CRemoteMarkupReadActivity PARENT;
  1760. public:
  1761. CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
  1762. {
  1763. xpath.set("/");
  1764. if (customRowTag.isEmpty()) // no override
  1765. fileDesc->queryProperties().getProp("@rowTag", xpath);
  1766. else
  1767. xpath.append(customRowTag);
  1768. }
  1769. };
  1770. /* A IRemoteReadActivity that projects to output format
  1771. * Created if input activity requires filtering, but it must 1st translate from external format to the actual format
  1772. * NB: processor, grouped and cursor are same as input.
  1773. */
  1774. class CRemoteCompoundReadProjectActivity : public CSimpleInterfaceOf<IRemoteReadActivity>
  1775. {
  1776. Linked<IRemoteReadActivity> input;
  1777. Owned<IOutputMetaData> outMeta;
  1778. Owned<const IDynamicTransform> translator;
  1779. UnexpectedVirtualFieldCallback fieldCallback;
  1780. MemoryBuffer inputRowMb;
  1781. MemoryBufferBuilder *inputRowBuilder;
  1782. public:
  1783. CRemoteCompoundReadProjectActivity(IPropertyTree &config, IRemoteReadActivity *_input) : input(_input)
  1784. {
  1785. IOutputMetaData *inMeta = input->queryOutputMeta();
  1786. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1787. dbgassertex(outMeta);
  1788. const RtlRecord &inRecord = inMeta->queryRecordAccessor(true);
  1789. const RtlRecord &outRecord = outMeta->queryRecordAccessor(true);
  1790. translator.setown(createRecordTranslator(outRecord, inRecord));
  1791. inputRowBuilder = new MemoryBufferBuilder(inputRowMb, inMeta->getMinRecordSize());
  1792. }
  1793. ~CRemoteCompoundReadProjectActivity()
  1794. {
  1795. delete inputRowBuilder;
  1796. }
  1797. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1798. {
  1799. return input->getInfoStr(out).append(" - CompoundProject");
  1800. }
  1801. // IRemoteReadActivity impl.
  1802. virtual unsigned __int64 queryProcessed() const override
  1803. {
  1804. return input->queryProcessed();
  1805. }
  1806. virtual IOutputMetaData *queryOutputMeta() const override
  1807. {
  1808. return outMeta;
  1809. }
  1810. virtual bool isGrouped() const override
  1811. {
  1812. return input->isGrouped();
  1813. }
  1814. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1815. {
  1816. input->serializeCursor(tgt);
  1817. }
  1818. virtual void restoreCursor(MemoryBuffer &src) override
  1819. {
  1820. input->restoreCursor(src);
  1821. }
  1822. virtual void flushStatistics(CClientStats &stats) override
  1823. {
  1824. input->flushStatistics(stats);
  1825. }
  1826. virtual IRemoteReadActivity *queryIsReadActivity()
  1827. {
  1828. return this;
  1829. }
  1830. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1831. {
  1832. size32_t rowSz;
  1833. const void *row = input->nextRow(*inputRowBuilder, rowSz);
  1834. if (!row)
  1835. {
  1836. retSz = 0;
  1837. return nullptr;
  1838. }
  1839. retSz = translator->translate(outBuilder, fieldCallback, (const byte *)row);
  1840. const void *ret = outBuilder.getSelf();
  1841. outBuilder.finishRow(retSz);
  1842. return ret;
  1843. }
  1844. virtual bool requiresPostProject() const override
  1845. {
  1846. return false;
  1847. }
  1848. };
  1849. class CRemoteIndexBaseActivity : public CRemoteDiskBaseActivity
  1850. {
  1851. typedef CRemoteDiskBaseActivity PARENT;
  1852. protected:
  1853. bool isTlk = false;
  1854. bool allowPreload = false;
  1855. unsigned fileCrc = 0;
  1856. Owned<IKeyIndex> keyIndex;
  1857. Owned<IKeyManager> keyManager;
  1858. void checkOpen()
  1859. {
  1860. if (opened)
  1861. return;
  1862. Owned<IFile> indexFile = createIFile(fileName);
  1863. CDateTime modTime;
  1864. indexFile->getTime(nullptr, &modTime, nullptr);
  1865. time_t modTimeTT = modTime.getSimple();
  1866. CRC32 crc32(fileCrc);
  1867. crc32.tally(sizeof(time_t), &modTimeTT);
  1868. unsigned crc = crc32.get();
  1869. keyIndex.setown(createKeyIndex(fileName, crc, isTlk, allowPreload));
  1870. keyManager.setown(createLocalKeyManager(*record, keyIndex, nullptr, true, false));
  1871. filters.createSegmentMonitors(keyManager);
  1872. keyManager->finishSegmentMonitors();
  1873. keyManager->reset();
  1874. opened = true;
  1875. }
  1876. void close()
  1877. {
  1878. keyManager.clear();
  1879. keyIndex.clear();
  1880. opened = false;
  1881. eofSeen = true;
  1882. }
  1883. public:
  1884. CRemoteIndexBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1885. {
  1886. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
  1887. isTlk = config.getPropBool("isTlk");
  1888. allowPreload = config.getPropBool("allowPreload");
  1889. fileCrc = config.getPropInt("crc");
  1890. }
  1891. virtual void flushStatistics(CClientStats &stats) override
  1892. {
  1893. // TBD, IKeyCursor should probably have a getStatistic(StatisticKind kind) implementation
  1894. }
  1895. };
  1896. class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity
  1897. {
  1898. typedef CRemoteIndexBaseActivity PARENT;
  1899. Owned<const IDynamicTransform> translator;
  1900. unsigned __int64 chooseN = 0;
  1901. public:
  1902. CRemoteIndexReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1903. {
  1904. chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
  1905. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1906. if (outMeta)
  1907. translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
  1908. else
  1909. outMeta.set(inMeta);
  1910. }
  1911. // IRemoteReadActivity impl.
  1912. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1913. {
  1914. if (eofSeen)
  1915. {
  1916. retSz = 0;
  1917. return nullptr;
  1918. }
  1919. checkOpen();
  1920. if (!eofSeen)
  1921. {
  1922. if (processed < chooseN)
  1923. {
  1924. while (keyManager->lookup(true))
  1925. {
  1926. const byte *keyRow = keyManager->queryKeyBuffer();
  1927. if (fieldFilterMatch(keyRow))
  1928. {
  1929. if (translator)
  1930. retSz = translator->translate(outBuilder, *this, keyRow);
  1931. else
  1932. {
  1933. retSz = keyManager->queryRowSize();
  1934. outBuilder.ensureCapacity(retSz, nullptr);
  1935. memcpy(outBuilder.getSelf(), keyRow, retSz);
  1936. }
  1937. dbgassertex(retSz);
  1938. const void *ret = outBuilder.getSelf();
  1939. outBuilder.finishRow(retSz);
  1940. ++processed;
  1941. return ret;
  1942. }
  1943. }
  1944. retSz = 0;
  1945. }
  1946. eofSeen = true;
  1947. }
  1948. close();
  1949. return nullptr;
  1950. }
  1951. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1952. {
  1953. keyManager->serializeCursorPos(tgt);
  1954. tgt.append(processed);
  1955. /* JCSMORE (see HPCC-19640), serialize seek/scan data to client
  1956. tgt.append(keyManager->querySeeks());
  1957. tgt.append(keyManager->queryScans());
  1958. */
  1959. }
  1960. virtual void restoreCursor(MemoryBuffer &src) override
  1961. {
  1962. checkOpen();
  1963. eofSeen = false;
  1964. keyManager->deserializeCursorPos(src);
  1965. src.read(processed);
  1966. }
  1967. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1968. {
  1969. return out.appendf("indexread[%s]", fileName.get());
  1970. }
  1971. };
  1972. class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
  1973. {
  1974. protected:
  1975. StringAttr fileName; // physical filename
  1976. Linked<IOutputMetaData> meta;
  1977. unsigned __int64 processed = 0;
  1978. unsigned __int64 bytesWritten = 0;
  1979. bool opened = false;
  1980. bool eofSeen = false;
  1981. Owned<IFileIO> iFileIO;
  1982. bool grouped = false;
  1983. void close()
  1984. {
  1985. iFileIO.clear();
  1986. opened = false;
  1987. eofSeen = true;
  1988. }
  1989. public:
  1990. CRemoteWriteBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
  1991. {
  1992. fileName.set(config.queryProp("fileName"));
  1993. if (isEmptyString(fileName))
  1994. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteWriteBaseActivity: fileName missing");
  1995. grouped = config.getPropBool("inputGrouped");
  1996. meta.setown(getTypeInfoOutputMetaData(config, "input", grouped));
  1997. }
  1998. ~CRemoteWriteBaseActivity()
  1999. {
  2000. }
  2001. // IRemoteWriteActivity impl.
  2002. virtual unsigned __int64 queryProcessed() const override
  2003. {
  2004. return processed;
  2005. }
  2006. virtual IOutputMetaData *queryOutputMeta() const override
  2007. {
  2008. return meta;
  2009. }
  2010. virtual bool isGrouped() const override
  2011. {
  2012. return grouped;
  2013. }
  2014. virtual void serializeCursor(MemoryBuffer &tgt) const override
  2015. {
  2016. throwUnexpected();
  2017. }
  2018. virtual void restoreCursor(MemoryBuffer &src) override
  2019. {
  2020. throwUnexpected();
  2021. }
  2022. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  2023. {
  2024. return out.appendf("diskwrite[%s]", fileName.get());
  2025. }
  2026. virtual void write(size32_t sz, const void *rowData) override
  2027. {
  2028. throwUnexpected(); // method should be implemented in derived classes.
  2029. }
  2030. virtual void flushStatistics(CClientStats &stats) override
  2031. {
  2032. // NB: will be called by same thread that is writing.
  2033. stats.addWrite(bytesWritten);
  2034. bytesWritten = 0;
  2035. }
  2036. virtual IRemoteWriteActivity *queryIsWriteActivity()
  2037. {
  2038. return this;
  2039. }
  2040. };
  2041. class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
  2042. {
  2043. typedef CRemoteWriteBaseActivity PARENT;
  2044. unsigned compressionFormat = 0;
  2045. bool eogPending = false;
  2046. bool someInGroup = false;
  2047. size32_t recordSize = 0;
  2048. Owned<IFileIOStream> iFileIOStream;
  2049. bool append = false;
  2050. void checkOpen()
  2051. {
  2052. if (opened)
  2053. return;
  2054. if (!recursiveCreateDirectoryForFile(fileName))
  2055. throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get());
  2056. OwnedIFile iFile = createIFile(fileName);
  2057. assertex(iFile);
  2058. /* NB: if concurrent writers were supported, then would need mutex here, during open/create
  2059. * multiple activities, each with there own handle would be possible, with mutex during write.
  2060. * Would need mutex per physical filename active.
  2061. */
  2062. if (compressionFormat)
  2063. iFileIO.setown(createCompressedFileWriter(iFile, recordSize, append, true, nullptr, compressionFormat));
  2064. else
  2065. {
  2066. iFileIO.setown(iFile->open(append ? IFOwrite : IFOcreate));
  2067. if (!iFileIO)
  2068. throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to open: '%s' for write", fileName.get());
  2069. }
  2070. iFileIOStream.setown(createIOStream(iFileIO));
  2071. if (append)
  2072. iFileIOStream->seek(0, IFSend);
  2073. opened = true;
  2074. eofSeen = false;
  2075. }
  2076. public:
  2077. CRemoteDiskWriteActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  2078. {
  2079. const char *compressed = config.queryProp("compressed"); // the compression format for the serialized rows in the transport
  2080. if (!isEmptyString(compressed))
  2081. {
  2082. // boolean or format allowed
  2083. if (strieq("true", compressed))
  2084. compressionFormat = translateToCompMethod(nullptr); // gets default
  2085. else if (strieq("false", compressed))
  2086. compressionFormat = 0;
  2087. else
  2088. compressionFormat = translateToCompMethod(compressed);
  2089. }
  2090. append = config.getPropBool("append");
  2091. }
  2092. virtual void write(size32_t sz, const void *rowData) override
  2093. {
  2094. checkOpen();
  2095. iFileIOStream->write(sz, rowData);
  2096. bytesWritten += sz;
  2097. }
  2098. virtual void serializeCursor(MemoryBuffer &tgt) const override
  2099. {
  2100. tgt.append(iFileIOStream->tell());
  2101. }
  2102. virtual void restoreCursor(MemoryBuffer &src) override
  2103. {
  2104. offset_t pos;
  2105. src.read(pos);
  2106. checkOpen();
  2107. iFileIOStream->seek(pos, IFSbegin);
  2108. }
  2109. };
  2110. // create a { unsigned8 } output meta for the count
  2111. static const RtlIntTypeInfo indexCountFieldType(type_unsigned|type_int, 8);
  2112. static const RtlFieldStrInfo indexCountField("count", nullptr, &indexCountFieldType);
  2113. static const RtlFieldInfo * const indexCountFields[2] = { &indexCountField, nullptr };
  2114. static const RtlRecordTypeInfo indexCountRecord(type_record, 2, indexCountFields);
  2115. class CRemoteIndexCountActivity : public CRemoteIndexBaseActivity
  2116. {
  2117. typedef CRemoteIndexBaseActivity PARENT;
  2118. unsigned __int64 rowLimit = 0;
  2119. public:
  2120. CRemoteIndexCountActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  2121. {
  2122. rowLimit = config.getPropInt64("chooseN");
  2123. outMeta.setown(new CDynamicOutputMetaData(indexCountRecord));
  2124. }
  2125. // IRemoteReadActivity impl.
  2126. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  2127. {
  2128. if (eofSeen)
  2129. {
  2130. retSz = 0;
  2131. return nullptr;
  2132. }
  2133. checkOpen();
  2134. unsigned __int64 count = 0;
  2135. if (!eofSeen)
  2136. {
  2137. if (rowLimit)
  2138. count = keyManager->checkCount(rowLimit);
  2139. else
  2140. count = keyManager->getCount();
  2141. }
  2142. void *tgt = outBuilder.ensureCapacity(sizeof(count), "count");
  2143. const void *ret = outBuilder.getSelf();
  2144. memcpy(tgt, &count, sizeof(count));
  2145. outBuilder.finishRow(sizeof(count));
  2146. close();
  2147. return ret;
  2148. }
  2149. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  2150. {
  2151. return out.appendf("indexcount[%s]", fileName.get());
  2152. }
  2153. };
  2154. void checkExpiryTime(IPropertyTree &metaInfo)
  2155. {
  2156. const char *expiryTime = metaInfo.queryProp("expiryTime");
  2157. if (isEmptyString(expiryTime))
  2158. throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
  2159. CDateTime expiryTimeDt;
  2160. try
  2161. {
  2162. expiryTimeDt.setString(expiryTime);
  2163. }
  2164. catch (IException *e)
  2165. {
  2166. e->Release();
  2167. throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
  2168. }
  2169. CDateTime nowDt;
  2170. nowDt.setNow();
  2171. if (nowDt >= expiryTimeDt)
  2172. throw createDafsException(DAFSERR_cmdstream_authexpired, "createRemoteActivity: authorization expired");
  2173. }
  2174. IFileDescriptor *verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2175. {
  2176. if (!authorizedOnly) // if configured false, allows unencrypted meta info
  2177. {
  2178. if (actNode.hasProp("fileName"))
  2179. return nullptr;
  2180. }
  2181. StringBuffer metaInfoB64;
  2182. actNode.getProp("metaInfo", metaInfoB64);
  2183. if (0 == metaInfoB64.length())
  2184. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: missing metaInfo");
  2185. MemoryBuffer compressedMetaInfoMb;
  2186. JBASE64_Decode(metaInfoB64.str(), compressedMetaInfoMb);
  2187. MemoryBuffer decompressedMetaInfoMb;
  2188. fastLZDecompressToBuffer(decompressedMetaInfoMb, compressedMetaInfoMb);
  2189. Owned<IPropertyTree> metaInfoEnvelope = createPTree(decompressedMetaInfoMb);
  2190. Owned<IPropertyTree> metaInfo;
  2191. #if defined(_USE_OPENSSL)
  2192. MemoryBuffer metaInfoBlob;
  2193. metaInfoEnvelope->getPropBin("metaInfoBlob", metaInfoBlob);
  2194. bool isSigned = metaInfoBlob.length() != 0;
  2195. if (authorizedOnly && !isSigned)
  2196. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: unathorized");
  2197. if (isSigned)
  2198. {
  2199. metaInfo.setown(createPTree(metaInfoBlob));
  2200. const char *keyPairName = metaInfo->queryProp("keyPairName");
  2201. StringBuffer metaInfoSignature;
  2202. if (!metaInfoEnvelope->getProp("signature", metaInfoSignature))
  2203. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing signature");
  2204. VStringBuffer keyPairPath("KeyPair[@name=\"%s\"]", keyPairName);
  2205. IPropertyTree *keyPair = keyPairInfo->queryPropTree(keyPairPath);
  2206. if (!keyPair)
  2207. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing key pair definition");
  2208. const char *publicKeyFName = keyPair->queryProp("@publicKey");
  2209. if (isEmptyString(publicKeyFName))
  2210. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing public key definition");
  2211. Owned<CLoadedKey> publicKey = loadPublicKeyFromFile(publicKeyFName); // NB: if cared could cache loaded keys
  2212. if (!digiVerify(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *publicKey))
  2213. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: signature verification failed");
  2214. checkExpiryTime(*metaInfo);
  2215. }
  2216. else
  2217. #endif
  2218. metaInfo.set(metaInfoEnvelope);
  2219. assertex(actNode.hasProp("filePart"));
  2220. unsigned partNum = actNode.getPropInt("filePart");
  2221. assertex(partNum);
  2222. unsigned partCopy = actNode.getPropInt("filePartCopy", 1);
  2223. Owned<IFileDescriptor> fileDesc;
  2224. unsigned metaInfoVersion = metaInfo->getPropInt("version");
  2225. switch (metaInfoVersion)
  2226. {
  2227. case 0:
  2228. // implies unsigned direct request from engines (on unsecure port)
  2229. // fall through
  2230. case 1: // legacy
  2231. {
  2232. IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
  2233. assertex(fileInfo);
  2234. VStringBuffer xpath("Part[%u]/Copy[%u]/@filePath", partNum, partCopy);
  2235. StringBuffer partFileName;
  2236. fileInfo->getProp(xpath, partFileName);
  2237. if (!partFileName.length())
  2238. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: invalid file info");
  2239. actNode.setProp("fileName", partFileName.str());
  2240. break;
  2241. }
  2242. case 2: // serialized compact IFileDescriptor
  2243. {
  2244. IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
  2245. fileDesc.setown(deserializeFileDescriptorTree(fileInfo));
  2246. RemoteFilename rfn;
  2247. fileDesc->getFilename(partNum-1, partCopy-1, rfn);
  2248. StringBuffer path;
  2249. rfn.getLocalPath(path);
  2250. actNode.setProp("fileName", path.str());
  2251. break;
  2252. }
  2253. default:
  2254. throwUnexpected();
  2255. }
  2256. verifyex(actNode.removeProp("metaInfo")); // no longer needed
  2257. return fileDesc.getClear();
  2258. }
  2259. template<class ActivityClass> IRemoteReadActivity *createConditionalProjectingActivity(IPropertyTree &actNode, IFileDescriptor *fileDesc)
  2260. {
  2261. Owned<IRemoteReadActivity> activity = new ActivityClass(actNode, fileDesc);
  2262. if (activity->requiresPostProject())
  2263. return new CRemoteCompoundReadProjectActivity(actNode, activity);
  2264. else
  2265. return activity.getClear();
  2266. }
  2267. IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2268. {
  2269. Owned<IFileDescriptor> fileDesc = verifyMetaInfo(actNode, authorizedOnly, keyPairInfo);
  2270. const char *partFileName = actNode.queryProp("fileName");
  2271. const char *kindStr = actNode.queryProp("kind");
  2272. ThorActivityKind kind = TAKnone;
  2273. if (kindStr)
  2274. {
  2275. if (strieq("diskread", kindStr))
  2276. kind = TAKdiskread;
  2277. if (strieq("csvread", kindStr))
  2278. kind = TAKcsvread;
  2279. else if (strieq("xmlread", kindStr))
  2280. kind = TAKxmlread;
  2281. else if (strieq("jsonread", kindStr))
  2282. kind = TAKjsonread;
  2283. else if (strieq("indexread", kindStr))
  2284. kind = TAKindexread;
  2285. else if (strieq("indexcount", kindStr))
  2286. kind = TAKindexcount;
  2287. else if (strieq("diskwrite", kindStr))
  2288. kind = TAKdiskwrite;
  2289. else if (strieq("indexwrite", kindStr))
  2290. kind = TAKindexwrite;
  2291. // else - auto-detect
  2292. }
  2293. Owned<IRemoteActivity> activity;
  2294. switch (kind)
  2295. {
  2296. case TAKdiskread:
  2297. {
  2298. activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
  2299. break;
  2300. }
  2301. case TAKcsvread:
  2302. {
  2303. activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
  2304. break;
  2305. }
  2306. case TAKxmlread:
  2307. {
  2308. activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
  2309. break;
  2310. }
  2311. case TAKjsonread:
  2312. {
  2313. activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
  2314. break;
  2315. }
  2316. case TAKindexread:
  2317. {
  2318. activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
  2319. break;
  2320. }
  2321. case TAKindexcount:
  2322. {
  2323. activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
  2324. break;
  2325. }
  2326. case TAKdiskwrite:
  2327. {
  2328. activity.setown(new CRemoteDiskWriteActivity(actNode, fileDesc));
  2329. break;
  2330. }
  2331. default: // in absense of type, read is assumed and file format is auto-detected.
  2332. {
  2333. const char *action = actNode.queryProp("action");
  2334. if (isIndexFile(partFileName))
  2335. {
  2336. if (!isEmptyString(action))
  2337. {
  2338. if (streq("count", action))
  2339. activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
  2340. else
  2341. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on index '%s'", action, partFileName);
  2342. }
  2343. else
  2344. activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
  2345. }
  2346. else
  2347. {
  2348. if (!isEmptyString(action))
  2349. {
  2350. if (streq("count", action))
  2351. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "Remote Disk Counts currently unsupported");
  2352. else
  2353. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on flat file '%s'", action, partFileName);
  2354. }
  2355. else
  2356. {
  2357. const char *kind = queryFileKind(fileDesc);
  2358. if (isEmptyString(kind) || (streq("flat", kind)))
  2359. activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
  2360. else if (streq("csv", kind))
  2361. activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
  2362. else if (streq("xml", kind))
  2363. activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
  2364. else if (streq("json", kind))
  2365. activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
  2366. else
  2367. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown file kind '%s'", kind);
  2368. }
  2369. }
  2370. break;
  2371. }
  2372. }
  2373. return activity.getClear();
  2374. }
  2375. IRemoteActivity *createOutputActivity(IPropertyTree &requestTree, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2376. {
  2377. IPropertyTree *actNode = requestTree.queryPropTree("node");
  2378. assertex(actNode);
  2379. return createRemoteActivity(*actNode, authorizedOnly, keyPairInfo);
  2380. }
  2381. #define MAX_KEYDATA_SZ 0x10000
  2382. class CRemoteFileServer : implements IRemoteFileServer, public CInterface
  2383. {
  2384. class CThrottler;
  2385. class CRemoteClientHandler : implements ISocketSelectNotify, public CInterface
  2386. {
  2387. bool calledByRowService;
  2388. public:
  2389. CRemoteFileServer *parent;
  2390. Owned<ISocket> socket;
  2391. StringAttr peerName;
  2392. MemoryBuffer msg;
  2393. bool selecthandled;
  2394. size32_t left;
  2395. StructArrayOf<OpenFileInfo> openFiles;
  2396. Owned<IDirectoryIterator> opendir;
  2397. unsigned lasttick, lastInactiveTick;
  2398. atomic_t &globallasttick;
  2399. unsigned previdx; // for debug
  2400. IMPLEMENT_IINTERFACE;
  2401. CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,atomic_t &_globallasttick, bool _calledByRowService)
  2402. : socket(_socket), globallasttick(_globallasttick), calledByRowService(_calledByRowService)
  2403. {
  2404. previdx = (unsigned)-1;
  2405. StringBuffer peerBuf;
  2406. char name[256];
  2407. name[0] = 0;
  2408. int port = socket->peer_name(name,sizeof(name)-1);
  2409. if (port>=0)
  2410. {
  2411. peerBuf.append(name);
  2412. if (port)
  2413. peerBuf.append(':').append(port);
  2414. peerName.set(peerBuf);
  2415. }
  2416. else
  2417. {
  2418. /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
  2419. * May potentially be unavailable for other reasons also.
  2420. * Must be set, as used in client stats HT.
  2421. * If socket closed, the handler will start up but notice closed and quit
  2422. */
  2423. peerName.set("UNKNOWN PEER NAME");
  2424. }
  2425. {
  2426. CriticalBlock block(ClientCountSect);
  2427. if (++ClientCount>MaxClientCount)
  2428. MaxClientCount = ClientCount;
  2429. if (TF_TRACE_CLIENT_CONN)
  2430. {
  2431. StringBuffer s;
  2432. s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
  2433. s.append(peerName);
  2434. PROGLOG("%s", s.str());
  2435. }
  2436. }
  2437. parent = _parent;
  2438. left = 0;
  2439. msg.setEndian(__BIG_ENDIAN);
  2440. selecthandled = false;
  2441. touch();
  2442. }
  2443. ~CRemoteClientHandler()
  2444. {
  2445. {
  2446. CriticalBlock block(ClientCountSect);
  2447. ClientCount--;
  2448. if (TF_TRACE_CLIENT_CONN) {
  2449. PROGLOG("Disconnecting(%p) [%d,%d] ",this,ClientCount,MaxClientCount);
  2450. }
  2451. }
  2452. ISocket *sock = socket.getClear();
  2453. try {
  2454. sock->Release();
  2455. }
  2456. catch (IException *e) {
  2457. EXCLOG(e,"~CRemoteClientHandler");
  2458. e->Release();
  2459. }
  2460. }
  2461. bool isRowServiceClient() const { return calledByRowService; }
  2462. bool notifySelected(ISocket *sock,unsigned selected)
  2463. {
  2464. if (TF_TRACE_FULL)
  2465. PROGLOG("notifySelected(%p)",this);
  2466. if (sock!=socket)
  2467. WARNLOG("notifySelected - invalid socket passed");
  2468. size32_t avail = (size32_t)socket->avail_read();
  2469. if (avail)
  2470. touch();
  2471. else if (left)
  2472. {
  2473. WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
  2474. msg.clear();
  2475. parent->notify(this, msg); // notifying of graceful close
  2476. return false;
  2477. }
  2478. if (left==0)
  2479. {
  2480. try
  2481. {
  2482. left = avail?receiveDaFsBufferSize(socket):0;
  2483. }
  2484. catch (IException *e)
  2485. {
  2486. EXCLOG(e,"notifySelected(1)");
  2487. e->Release();
  2488. left = 0;
  2489. }
  2490. if (left)
  2491. {
  2492. avail = (size32_t)socket->avail_read();
  2493. try
  2494. {
  2495. msg.ensureCapacity(left);
  2496. }
  2497. catch (IException *e)
  2498. {
  2499. EXCLOG(e,"notifySelected(2)");
  2500. e->Release();
  2501. left = 0;
  2502. // if too big then corrupted packet so read avail to try and consume
  2503. char fbuf[1024];
  2504. while (avail)
  2505. {
  2506. size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail;
  2507. try
  2508. {
  2509. socket->read(fbuf, rd); // don't need timeout here
  2510. avail -= rd;
  2511. }
  2512. catch (IException *e)
  2513. {
  2514. EXCLOG(e,"notifySelected(2) flush");
  2515. e->Release();
  2516. break;
  2517. }
  2518. }
  2519. avail = 0;
  2520. left = 0;
  2521. }
  2522. }
  2523. }
  2524. size32_t toread = left>avail?avail:left;
  2525. if (toread)
  2526. {
  2527. try
  2528. {
  2529. socket->read(msg.reserve(toread), toread); // don't need timeout here
  2530. }
  2531. catch (IException *e)
  2532. {
  2533. EXCLOG(e,"notifySelected(3)");
  2534. e->Release();
  2535. toread = left;
  2536. msg.clear();
  2537. }
  2538. }
  2539. if (TF_TRACE_FULL)
  2540. PROGLOG("notifySelected %d,%d",toread,left);
  2541. left -= toread;
  2542. if (left==0)
  2543. {
  2544. // DEBUG
  2545. parent->notify(this, msg); // consumes msg
  2546. }
  2547. return false;
  2548. }
  2549. void logPrevHandle()
  2550. {
  2551. if (previdx<openFiles.ordinality())
  2552. {
  2553. const OpenFileInfo &fileInfo = openFiles.item(previdx);
  2554. PROGLOG("Previous handle(%d): %s", fileInfo.handle, fileInfo.filename->text.get());
  2555. }
  2556. }
  2557. bool throttleCommand(MemoryBuffer &msg)
  2558. {
  2559. RemoteFileCommandType cmd = RFCunknown;
  2560. Owned<IException> e;
  2561. try
  2562. {
  2563. msg.read(cmd);
  2564. parent->throttleCommand(cmd, msg, this);
  2565. return true;
  2566. }
  2567. catch (IException *_e)
  2568. {
  2569. e.setown(_e);
  2570. }
  2571. /* processCommand() will handle most exception and replies,
  2572. * but if throttleCommand fails before it gets that far, this will handle
  2573. */
  2574. MemoryBuffer reply;
  2575. initSendBuffer(reply);
  2576. unsigned err = (cmd == RFCopenIO) ? RFSERR_OpenFailed : 0;
  2577. parent->formatException(reply, e, cmd, false, err, this);
  2578. sendDaFsBuffer(socket, reply);
  2579. return false;
  2580. }
  2581. void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
  2582. {
  2583. MemoryBuffer reply;
  2584. bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
  2585. sendDaFsBuffer(socket, reply, testSocketFlag);
  2586. }
  2587. bool immediateCommand() // returns false if socket closed or failure
  2588. {
  2589. MemoryBuffer msg;
  2590. msg.setEndian(__BIG_ENDIAN);
  2591. touch();
  2592. size32_t avail = (size32_t)socket->avail_read();
  2593. if (avail==0)
  2594. return false;
  2595. receiveDaFsBuffer(socket, msg, 5); // shouldn't timeout as data is available
  2596. touch();
  2597. if (msg.length()==0)
  2598. return false;
  2599. return throttleCommand(msg);
  2600. }
  2601. void process(MemoryBuffer &msg)
  2602. {
  2603. if (selecthandled)
  2604. throttleCommand(msg);
  2605. else
  2606. {
  2607. // msg only used/filled if process() has been triggered by notify()
  2608. while (parent->threadRunningCount()<=parent->targetActiveThreads) // if too many threads add to select handler
  2609. {
  2610. int w;
  2611. try
  2612. {
  2613. w = socket->wait_read(1000);
  2614. }
  2615. catch (IException *e)
  2616. {
  2617. EXCLOG(e, "CRemoteClientHandler::main wait_read error");
  2618. e->Release();
  2619. parent->onCloseSocket(this,1);
  2620. return;
  2621. }
  2622. if (w==0)
  2623. break;
  2624. if ((w<0)||!immediateCommand())
  2625. {
  2626. if (w<0)
  2627. WARNLOG("CRemoteClientHandler::main wait_read error");
  2628. parent->onCloseSocket(this,1);
  2629. return;
  2630. }
  2631. }
  2632. /* This is a bit confusing..
  2633. * The addClient below, adds this request to a selecthandler handled by another thread
  2634. * and passes ownership of 'this' (CRemoteClientHandler)
  2635. *
  2636. * When notified, the selecthandler will launch a new pool thread to handle the request
  2637. * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
  2638. *
  2639. * Either way, a thread pool slot is occupied when processing a request.
  2640. * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
  2641. * IOW, if there are lots of incoming clients that can't be serviced by the CThrottler limit,
  2642. * a large number of pool threads will build up after a while.
  2643. *
  2644. * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
  2645. * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
  2646. * in this additional layer of throttling..
  2647. */
  2648. selecthandled = true;
  2649. parent->addClient(this); // add to select handler
  2650. // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
  2651. }
  2652. }
  2653. bool timedOut()
  2654. {
  2655. return (msTick()-lasttick)>CLIENT_TIMEOUT;
  2656. }
  2657. bool inactiveTimedOut()
  2658. {
  2659. unsigned ms = msTick();
  2660. if ((ms-lastInactiveTick)>CLIENT_INACTIVEWARNING_TIMEOUT)
  2661. {
  2662. lastInactiveTick = ms;
  2663. return true;
  2664. }
  2665. return false;
  2666. }
  2667. void touch()
  2668. {
  2669. lastInactiveTick = lasttick = msTick();
  2670. atomic_set(&globallasttick,lasttick);
  2671. }
  2672. const char *queryPeerName()
  2673. {
  2674. return peerName;
  2675. }
  2676. bool getInfo(StringBuffer &str)
  2677. {
  2678. str.append("client(");
  2679. const char *name = queryPeerName();
  2680. bool ok;
  2681. if (name)
  2682. {
  2683. ok = true;
  2684. str.append(name);
  2685. }
  2686. else
  2687. ok = false;
  2688. unsigned ms = msTick();
  2689. str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
  2690. ForEachItemIn(i, openFiles)
  2691. {
  2692. const OpenFileInfo &fileInfo = openFiles.item(i);
  2693. str.appendf("\n %d: ", fileInfo.handle);
  2694. str.append(fileInfo.filename->text.get());
  2695. }
  2696. return ok;
  2697. }
  2698. };
  2699. class CThrottleQueueItem : public CSimpleInterface
  2700. {
  2701. public:
  2702. RemoteFileCommandType cmd;
  2703. Linked<CRemoteClientHandler> client;
  2704. MemoryBuffer msg;
  2705. CCycleTimer timer;
  2706. CThrottleQueueItem(RemoteFileCommandType _cmd, MemoryBuffer &_msg, CRemoteClientHandler *_client) : cmd(_cmd), client(_client)
  2707. {
  2708. msg.swapWith(_msg);
  2709. }
  2710. };
  2711. class CThrottler
  2712. {
  2713. Semaphore sem;
  2714. CriticalSection crit, configureCrit;
  2715. StringAttr title;
  2716. unsigned limit, delayMs, cpuThreshold, queueLimit;
  2717. unsigned disabledLimit;
  2718. unsigned __int64 totalThrottleDelay;
  2719. CCycleTimer totalThrottleDelayTimer;
  2720. QueueOf<CThrottleQueueItem, false> queue;
  2721. unsigned statsIntervalSecs;
  2722. public:
  2723. CThrottler(const char *_title) : title(_title)
  2724. {
  2725. totalThrottleDelay = 0;
  2726. limit = 0;
  2727. delayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
  2728. cpuThreshold = DEFAULT_STDCMD_THROTTLECPULIMIT;
  2729. disabledLimit = 0;
  2730. queueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
  2731. statsIntervalSecs = DEFAULT_STDCMD_THROTTLECPULIMIT;
  2732. }
  2733. ~CThrottler()
  2734. {
  2735. for (;;)
  2736. {
  2737. Owned<CThrottleQueueItem> item = queue.dequeue();
  2738. if (!item)
  2739. break;
  2740. }
  2741. }
  2742. unsigned queryLimit() const { return limit; }
  2743. unsigned queryDelayMs() const { return delayMs; };;
  2744. unsigned queryCpuThreshold() const { return cpuThreshold; }
  2745. unsigned queryQueueLimit() const { return queueLimit; }
  2746. StringBuffer &getInfoSummary(StringBuffer &info)
  2747. {
  2748. info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
  2749. unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
  2750. time_t simple;
  2751. time(&simple);
  2752. simple -= elapsedSecs;
  2753. CDateTime dt;
  2754. dt.set(simple);
  2755. StringBuffer dateStr;
  2756. dt.getTimeString(dateStr, true);
  2757. info.appendf("Throttler(%s): statistics since %s", title.get(), dateStr.str()).newline();
  2758. info.appendf("Total delay of %0.2f seconds", ((double)totalThrottleDelay)/1000).newline();
  2759. info.appendf("Requests currently queued: %u", queue.ordinality());
  2760. return info;
  2761. }
  2762. void getInfo(StringBuffer &info)
  2763. {
  2764. CriticalBlock b(crit);
  2765. getInfoSummary(info).newline();
  2766. }
  2767. void configure(unsigned _limit, unsigned _delayMs, unsigned _cpuThreshold, unsigned _queueLimit)
  2768. {
  2769. if (_limit > THROTTLE_MAX_LIMIT || _delayMs > THROTTLE_MAX_DELAYMS || _cpuThreshold > THROTTLE_MAX_CPUTHRESHOLD || _queueLimit > THROTTLE_MAX_QUEUELIMIT)
  2770. throw MakeStringException(0, "Throttler(%s), rejecting configure command: limit=%u (max permitted=%u), delayMs=%u (max permitted=%u), cpuThreshold=%u (max permitted=%u), queueLimit=%u (max permitted=%u)",
  2771. title.str(), _limit, THROTTLE_MAX_LIMIT, _delayMs, THROTTLE_MAX_DELAYMS, _cpuThreshold,
  2772. THROTTLE_MAX_CPUTHRESHOLD, _queueLimit, THROTTLE_MAX_QUEUELIMIT);
  2773. CriticalBlock b(configureCrit);
  2774. int delta = 0;
  2775. if (_limit)
  2776. {
  2777. if (disabledLimit) // if transitioning from disabled to some throttling
  2778. {
  2779. assertex(0 == limit);
  2780. delta = _limit - disabledLimit; // + or -
  2781. disabledLimit = 0;
  2782. }
  2783. else
  2784. delta = _limit - limit; // + or -
  2785. }
  2786. else if (0 == disabledLimit)
  2787. {
  2788. PROGLOG("Throttler(%s): disabled, previous limit: %u", title.get(), limit);
  2789. /* disabling - set limit immediately to let all new transaction through.
  2790. * NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
  2791. * Instead the existing 'limit' is kept in 'disabledLimit', so that if/when throttling is
  2792. * re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
  2793. */
  2794. disabledLimit = limit;
  2795. limit = 0;
  2796. }
  2797. if (delta > 0)
  2798. {
  2799. PROGLOG("Throttler(%s): Increasing limit from %u to %u", title.get(), limit, _limit);
  2800. sem.signal(delta);
  2801. limit = _limit;
  2802. // NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
  2803. }
  2804. else if (delta < 0)
  2805. {
  2806. PROGLOG("Throttler(%s): Reducing limit from %u to %u", title.get(), limit, _limit);
  2807. // NB: This is not expected to take long
  2808. CCycleTimer timer;
  2809. while (delta < 0)
  2810. {
  2811. if (sem.wait(1000))
  2812. ++delta;
  2813. else
  2814. PROGLOG("Throttler(%s): Waited %0.2f seconds so far for up to a maximum of %u (previous limit) transactions to complete, %u completed", title.get(), ((double)timer.elapsedMs())/1000, limit, -delta);
  2815. }
  2816. limit = _limit;
  2817. // NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
  2818. }
  2819. if (_delayMs != delayMs)
  2820. {
  2821. PROGLOG("Throttler(%s): New delayMs=%u, previous: %u", title.get(), _delayMs, delayMs);
  2822. delayMs = _delayMs;
  2823. }
  2824. if (_cpuThreshold != cpuThreshold)
  2825. {
  2826. PROGLOG("Throttler(%s): New cpuThreshold=%u, previous: %u", title.get(), _cpuThreshold, cpuThreshold);
  2827. cpuThreshold = _cpuThreshold;
  2828. }
  2829. if (((unsigned)-1) != _queueLimit && _queueLimit != queueLimit)
  2830. {
  2831. PROGLOG("Throttler(%s): New queueLimit=%u%s, previous: %u", title.get(), _queueLimit, 0==_queueLimit?"(disabled)":"", queueLimit);
  2832. queueLimit = _queueLimit;
  2833. }
  2834. }
  2835. void setStatsInterval(unsigned _statsIntervalSecs)
  2836. {
  2837. if (_statsIntervalSecs != statsIntervalSecs)
  2838. {
  2839. PROGLOG("Throttler(%s): New statsIntervalSecs=%u, previous: %u", title.get(), _statsIntervalSecs, statsIntervalSecs);
  2840. statsIntervalSecs = _statsIntervalSecs;
  2841. }
  2842. }
  2843. void take(RemoteFileCommandType cmd) // cmd for info. only
  2844. {
  2845. for (;;)
  2846. {
  2847. if (sem.wait(delayMs))
  2848. return;
  2849. PROGLOG("Throttler(%s): transaction delayed [cmd=%s]", title.get(), getRFCText(cmd));
  2850. }
  2851. }
  2852. void release()
  2853. {
  2854. sem.signal();
  2855. }
  2856. StringBuffer &getStats(StringBuffer &stats, bool reset)
  2857. {
  2858. CriticalBlock b(crit);
  2859. getInfoSummary(stats);
  2860. if (reset)
  2861. {
  2862. totalThrottleDelayTimer.reset();
  2863. totalThrottleDelay = 0;
  2864. }
  2865. return stats;
  2866. }
  2867. void addCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  2868. {
  2869. CCycleTimer timer;
  2870. Owned<IException> exception;
  2871. bool hadSem = true;
  2872. if (!sem.wait(delayMs))
  2873. {
  2874. CriticalBlock b(crit);
  2875. if (!sem.wait(0)) // check hasn't become available
  2876. {
  2877. unsigned cpu = getLatestCPUUsage();
  2878. if (getLatestCPUUsage()<cpuThreshold)
  2879. {
  2880. /* Allow to proceed, despite hitting throttle limit because CPU < threshold
  2881. * NB: The overall number of threads is still capped by the thread pool.
  2882. */
  2883. unsigned ms = timer.elapsedMs();
  2884. totalThrottleDelay += ms;
  2885. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
  2886. hadSem = false;
  2887. }
  2888. else
  2889. {
  2890. if (queueLimit && queue.ordinality()>=queueLimit)
  2891. throw MakeStringException(0, "Throttler(%s), the maxiumum number of items are queued (%u), rejecting new command[%s]", title.str(), queue.ordinality(), getRFCText(cmd));
  2892. queue.enqueue(new CThrottleQueueItem(cmd, msg, client)); // NB: takes over ownership of 'client' from running thread
  2893. PROGLOG("Throttler(%s): transaction delayed [cmd=%s], queuing (%u queueud), [client=%p, sock=%u]", title.get(), getRFCText(cmd), queue.ordinality(), client, client->socket->OShandle());
  2894. return;
  2895. }
  2896. }
  2897. }
  2898. /* Guarantee that sem is released.
  2899. * Should normally release on clean exit when queue is empty.
  2900. */
  2901. struct ReleaseSem
  2902. {
  2903. Semaphore *sem;
  2904. ReleaseSem(Semaphore *_sem) { sem = _sem; }
  2905. ~ReleaseSem() { if (sem) sem->signal(); }
  2906. } releaseSem(hadSem?&sem:NULL);
  2907. /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
  2908. * queued items. NB: other threads that are finishing will do also.
  2909. * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
  2910. * Throttle slot (semaphore) is only given back when no more to do.
  2911. */
  2912. Linked<CRemoteClientHandler> currentClient;
  2913. MemoryBuffer currentMsg;
  2914. unsigned ms;
  2915. for (;;)
  2916. {
  2917. RemoteFileCommandType currentCmd;
  2918. {
  2919. CriticalBlock b(crit);
  2920. Owned<CThrottleQueueItem> item = queue.dequeue();
  2921. if (item)
  2922. {
  2923. currentCmd = item->cmd;
  2924. currentClient.setown(item->client.getClear());
  2925. currentMsg.swapWith(item->msg);
  2926. ms = item->timer.elapsedMs();
  2927. }
  2928. else
  2929. {
  2930. if (NULL == client) // previously handled and queue empty
  2931. {
  2932. /* Commands are only queued if semaphore is exhaused (checked inside crit)
  2933. * so only signal the semaphore inside the crit, after checking if there are no queued items
  2934. */
  2935. if (hadSem)
  2936. {
  2937. releaseSem.sem = NULL;
  2938. sem.signal();
  2939. }
  2940. break;
  2941. }
  2942. currentCmd = cmd;
  2943. currentClient.set(client); // process current request after dealing with queue
  2944. currentMsg.swapWith(msg);
  2945. ms = timer.elapsedMs();
  2946. client = NULL;
  2947. }
  2948. }
  2949. if (ms >= 1000)
  2950. {
  2951. if (ms>delayMs)
  2952. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u seconds", title.get(), getRFCText(currentCmd), ms/1000);
  2953. }
  2954. {
  2955. CriticalBlock b(crit);
  2956. totalThrottleDelay += ms;
  2957. }
  2958. try
  2959. {
  2960. currentClient->processCommand(currentCmd, currentMsg, this);
  2961. }
  2962. catch (IException *e)
  2963. {
  2964. EXCLOG(e, "addCommand: processCommand failed");
  2965. e->Release();
  2966. }
  2967. }
  2968. }
  2969. };
  2970. // temporarily release a throttler slot
  2971. class CThrottleReleaseBlock
  2972. {
  2973. CThrottler &throttler;
  2974. RemoteFileCommandType cmd;
  2975. public:
  2976. CThrottleReleaseBlock(CThrottler &_throttler, RemoteFileCommandType _cmd) : throttler(_throttler), cmd(_cmd)
  2977. {
  2978. throttler.release();
  2979. }
  2980. ~CThrottleReleaseBlock()
  2981. {
  2982. throttler.take(cmd);
  2983. }
  2984. };
  2985. int lasthandle;
  2986. CriticalSection sect;
  2987. Owned<ISocket> acceptsock;
  2988. Owned<ISocket> securesock;
  2989. Owned<ISocket> rowServiceSock;
  2990. bool rowServiceOnStdPort = true; // should row service commands be processed on std. service port
  2991. bool rowServiceSSL = false;
  2992. Owned<ISocketSelectHandler> selecthandler;
  2993. Owned<IThreadPool> threads; // for commands
  2994. bool stopping;
  2995. unsigned clientcounttick;
  2996. unsigned closedclients;
  2997. CAsyncCommandManager asyncCommandManager;
  2998. CThrottler stdCmdThrottler, slowCmdThrottler;
  2999. CClientStatsTable clientStatsTable;
  3000. atomic_t globallasttick;
  3001. unsigned targetActiveThreads;
  3002. Linked<IPropertyTree> keyPairInfo;
  3003. class CHandleTracer
  3004. {
  3005. CTimeMon timer;
  3006. CriticalSection crit;
  3007. Owned<IFile> stdIOIFile;
  3008. std::vector<Owned<IFileIO>> reservedHandles;
  3009. unsigned handlesToReserve = 3; // need a few for pipe process to succeed
  3010. void reserveHandles()
  3011. {
  3012. if (stdIOIFile)
  3013. {
  3014. for (unsigned r=0; r<handlesToReserve; r++)
  3015. {
  3016. IFileIO *iFileIO = stdIOIFile->open(IFOread);
  3017. if (iFileIO)
  3018. reservedHandles.push_back(iFileIO);
  3019. }
  3020. }
  3021. }
  3022. void releaseHandles()
  3023. {
  3024. reservedHandles.clear();
  3025. }
  3026. public:
  3027. CHandleTracer()
  3028. {
  3029. /* Reserve handles, so that when we run out, we hope to release them
  3030. * and thereby have enough to use when reading current state.
  3031. */
  3032. stdIOIFile.setown(createIFile("stdout:"));
  3033. timer.reset(0);
  3034. reserveHandles();
  3035. }
  3036. void traceIfReady()
  3037. {
  3038. CriticalBlock b(crit);
  3039. if (timer.timedout())
  3040. {
  3041. DBGLOG("Open handles:");
  3042. releaseHandles();
  3043. /* NB: can't guarantee that handles will be available after releaseHandles(), if other threads have allocated them.
  3044. * If printLsOf fails, mark timer to retry again on next event in shorter time period.
  3045. */
  3046. if (!printLsOf())
  3047. {
  3048. DBGLOG("Failed to run lsof");
  3049. timer.reset(1000); // next attempt in >=1 second
  3050. }
  3051. else
  3052. timer.reset(60*1000); // next trace in >=1 minute
  3053. reserveHandles();
  3054. }
  3055. }
  3056. } handleTracer;
  3057. int getNextHandle()
  3058. {
  3059. // called in sect critical block
  3060. for (;;) {
  3061. if (lasthandle==INT_MAX)
  3062. lasthandle = 1;
  3063. else
  3064. lasthandle++;
  3065. unsigned idx1;
  3066. unsigned idx2;
  3067. if (!findHandle(lasthandle,idx1,idx2))
  3068. return lasthandle;
  3069. }
  3070. }
  3071. bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx)
  3072. {
  3073. // called in sect critical block
  3074. clientidx = (unsigned)-1;
  3075. handleidx = (unsigned)-1;
  3076. ForEachItemIn(i,clients) {
  3077. CRemoteClientHandler &client = clients.item(i);
  3078. ForEachItemIn(j, client.openFiles)
  3079. {
  3080. if (client.openFiles.item(j).handle==handle)
  3081. {
  3082. handleidx = j;
  3083. clientidx = i;
  3084. return true;
  3085. }
  3086. }
  3087. }
  3088. return false;
  3089. }
  3090. unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply, bool &maxHit)
  3091. {
  3092. DelayedSizeMarker keyDataSzReturned(reply);
  3093. unsigned numRecs = 0;
  3094. maxHit = false;
  3095. unsigned pos = reply.length();
  3096. while (keyManager->lookup(true))
  3097. {
  3098. unsigned size = keyManager->queryRowSize();
  3099. const byte *result = keyManager->queryKeyBuffer();
  3100. reply.append(size);
  3101. reply.append(size, result);
  3102. ++numRecs;
  3103. if (maxRecs && (0 == --maxRecs))
  3104. {
  3105. maxHit = true;
  3106. break;
  3107. }
  3108. if (reply.length()-pos >= MAX_KEYDATA_SZ)
  3109. {
  3110. maxHit = true;
  3111. break;
  3112. }
  3113. }
  3114. keyDataSzReturned.write();
  3115. return numRecs;
  3116. }
  3117. class cCommandProcessor: public CInterface, implements IPooledThread
  3118. {
  3119. Owned<CRemoteClientHandler> client;
  3120. MemoryBuffer msg;
  3121. public:
  3122. IMPLEMENT_IINTERFACE;
  3123. struct cCommandProcessorParams
  3124. {
  3125. cCommandProcessorParams() { msg.setEndian(__BIG_ENDIAN); }
  3126. CRemoteClientHandler *client;
  3127. MemoryBuffer msg;
  3128. };
  3129. virtual void init(void *_params) override
  3130. {
  3131. cCommandProcessorParams &params = *(cCommandProcessorParams *)_params;
  3132. client.setown(params.client);
  3133. msg.swapWith(params.msg);
  3134. }
  3135. virtual void threadmain() override
  3136. {
  3137. // idea is that initially we process commands inline then pass over to select handler
  3138. try
  3139. {
  3140. client->process(msg);
  3141. }
  3142. catch (IException *e)
  3143. {
  3144. // suppress some errors
  3145. EXCLOG(e,"cCommandProcessor::threadmain");
  3146. e->Release();
  3147. }
  3148. try
  3149. {
  3150. client.clear();
  3151. }
  3152. catch (IException *e)
  3153. {
  3154. // suppress some more errors clearing client
  3155. EXCLOG(e,"cCommandProcessor::threadmain(2)");
  3156. e->Release();
  3157. }
  3158. }
  3159. virtual bool stop() override
  3160. {
  3161. return true;
  3162. }
  3163. virtual bool canReuse() const override
  3164. {
  3165. return false; // want to free owned socket
  3166. }
  3167. };
  3168. IArrayOf<CRemoteClientHandler> clients;
  3169. void validateSSLSetup()
  3170. {
  3171. if (!securitySettings.certificate)
  3172. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
  3173. if (!checkFileExists(securitySettings.certificate))
  3174. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
  3175. if (!securitySettings.privateKey)
  3176. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
  3177. if (!checkFileExists(securitySettings.privateKey))
  3178. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
  3179. }
  3180. public:
  3181. IMPLEMENT_IINTERFACE
  3182. CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *_keyPairInfo)
  3183. : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter"), keyPairInfo(_keyPairInfo)
  3184. {
  3185. lasthandle = 0;
  3186. selecthandler.setown(createSocketSelectHandler(NULL));
  3187. stdCmdThrottler.configure(DEFAULT_STDCMD_PARALLELREQUESTLIMIT, DEFAULT_STDCMD_THROTTLEDELAYMS, DEFAULT_STDCMD_THROTTLECPULIMIT, DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
  3188. slowCmdThrottler.configure(DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT, DEFAULT_SLOWCMD_THROTTLEDELAYMS, DEFAULT_SLOWCMD_THROTTLECPULIMIT, DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
  3189. unsigned targetMinThreads=maxThreads*20/100; // 20%
  3190. if (0 == targetMinThreads) targetMinThreads = 1;
  3191. targetActiveThreads=maxThreads*80/100; // 80%
  3192. if (0 == targetActiveThreads) targetActiveThreads = 1;
  3193. class CCommandFactory : public CSimpleInterfaceOf<IThreadFactory>
  3194. {
  3195. CRemoteFileServer &parent;
  3196. public:
  3197. CCommandFactory(CRemoteFileServer &_parent) : parent(_parent) { }
  3198. virtual IPooledThread *createNew()
  3199. {
  3200. return parent.createCommandProcessor();
  3201. }
  3202. };
  3203. Owned<IThreadFactory> factory = new CCommandFactory(*this); // NB: pool links factory, so takes ownership
  3204. threads.setown(createThreadPool("CRemoteFileServerPool", factory, NULL, maxThreads, maxThreadsDelayMs,
  3205. #ifdef __64BIT__
  3206. 0, // Unlimited stack size
  3207. #else
  3208. 0x10000,
  3209. #endif
  3210. INFINITE,targetMinThreads));
  3211. threads->setStartDelayTracing(60); // trace amount delayed every minute.
  3212. PROGLOG("CRemoteFileServer: maxThreads = %u, maxThreadsDelayMs = %u, maxAsyncCopy = %u", maxThreads, maxThreadsDelayMs, maxAsyncCopy);
  3213. stopping = false;
  3214. clientcounttick = msTick();
  3215. closedclients = 0;
  3216. atomic_set(&globallasttick,msTick());
  3217. }
  3218. ~CRemoteFileServer()
  3219. {
  3220. #ifdef _DEBUG
  3221. PROGLOG("Exiting CRemoteFileServer");
  3222. #endif
  3223. asyncCommandManager.join();
  3224. clients.kill();
  3225. #ifdef _DEBUG
  3226. PROGLOG("Exited CRemoteFileServer");
  3227. #endif
  3228. }
  3229. bool lookupFileIOHandle(int handle, OpenFileInfo &fileInfo, unsigned newFlags=0)
  3230. {
  3231. if (handle<=0)
  3232. return false;
  3233. CriticalBlock block(sect);
  3234. unsigned clientidx;
  3235. unsigned handleidx;
  3236. if (!findHandle(handle,clientidx,handleidx))
  3237. return false;
  3238. CRemoteClientHandler &client = clients.item(clientidx);
  3239. OpenFileInfo &openFileInfo = client.openFiles.element(handleidx); // NB: links members
  3240. openFileInfo.flags |= newFlags;
  3241. fileInfo = openFileInfo;
  3242. client.previdx = handleidx;
  3243. return true;
  3244. }
  3245. //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
  3246. // should throw a different exception
  3247. bool checkFileIOHandle(int handle, IFileIO *&fileio, bool del=false)
  3248. {
  3249. fileio = NULL;
  3250. if (handle<=0)
  3251. return false;
  3252. CriticalBlock block(sect);
  3253. unsigned clientidx;
  3254. unsigned handleidx;
  3255. if (findHandle(handle,clientidx,handleidx))
  3256. {
  3257. CRemoteClientHandler &client = clients.item(clientidx);
  3258. const OpenFileInfo &fileInfo = client.openFiles.item(handleidx);
  3259. if (del)
  3260. {
  3261. if (fileInfo.flags & of_key)
  3262. clearKeyStoreCacheEntry(fileInfo.fileIO);
  3263. client.openFiles.remove(handleidx);
  3264. client.previdx = (unsigned)-1;
  3265. }
  3266. else
  3267. {
  3268. fileio = client.openFiles.item(handleidx).fileIO;
  3269. client.previdx = handleidx;
  3270. }
  3271. return true;
  3272. }
  3273. return false;
  3274. }
  3275. void checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
  3276. {
  3277. if (!checkFileIOHandle(handle, fileio, del))
  3278. throw createDafsException(RFSERR_InvalidFileIOHandle, nullptr);
  3279. }
  3280. void onCloseSocket(CRemoteClientHandler *client, int which)
  3281. {
  3282. if (!client)
  3283. return;
  3284. CriticalBlock block(sect);
  3285. #ifdef _DEBUG
  3286. StringBuffer s(client->queryPeerName());
  3287. PROGLOG("onCloseSocket(%d) %s",which,s.str());
  3288. #endif
  3289. if (client->socket)
  3290. {
  3291. try
  3292. {
  3293. /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
  3294. * It has not been added to the selecthandler
  3295. * Harmless, but wasteful if so.
  3296. */
  3297. selecthandler->remove(client->socket);
  3298. }
  3299. catch (IException *e) {
  3300. EXCLOG(e,"CRemoteFileServer::onCloseSocket.1");
  3301. e->Release();
  3302. }
  3303. }
  3304. try {
  3305. clients.zap(*client);
  3306. }
  3307. catch (IException *e) {
  3308. EXCLOG(e,"CRemoteFileServer::onCloseSocket.2");
  3309. e->Release();
  3310. }
  3311. }
  3312. bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3313. {
  3314. Owned<StringAttrItem> name = new StringAttrItem;
  3315. byte mode;
  3316. byte share;
  3317. msg.read(name->text).read(mode).read(share);
  3318. // also try to recv extra byte
  3319. byte extra = 0;
  3320. unsigned short sMode = IFUnone;
  3321. unsigned short cFlags = IFUnone;
  3322. if (msg.remaining() >= sizeof(byte))
  3323. {
  3324. msg.read(extra);
  3325. // and then try to recv extra sMode, cFlags (always sent together)
  3326. if (msg.remaining() >= (sizeof(sMode) + sizeof(cFlags)))
  3327. msg.read(sMode).read(cFlags);
  3328. }
  3329. IFEflags extraFlags = (IFEflags)extra;
  3330. // none => nocache for remote (hint)
  3331. // can revert to previous behavior with conf file setting "allow_pgcache_flush=false"
  3332. if (extraFlags == IFEnone)
  3333. extraFlags = IFEnocache;
  3334. Owned<IFile> file = createIFile(name->text);
  3335. switch ((compatIFSHmode)share) {
  3336. case compatIFSHnone:
  3337. file->setCreateFlags(S_IRUSR|S_IWUSR);
  3338. file->setShareMode(IFSHnone);
  3339. break;
  3340. case compatIFSHread:
  3341. file->setShareMode(IFSHread);
  3342. break;
  3343. case compatIFSHwrite:
  3344. file->setShareMode(IFSHfull);
  3345. break;
  3346. case compatIFSHexec:
  3347. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
  3348. break;
  3349. case compatIFSHall:
  3350. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive
  3351. file->setShareMode(IFSHfull);
  3352. break;
  3353. }
  3354. // use sMode, cFlags if sent
  3355. if (sMode != IFUnone && cFlags != IFUnone)
  3356. {
  3357. file->setCreateFlags(cFlags);
  3358. file->setShareMode((IFSHmode)sMode);
  3359. }
  3360. if (TF_TRACE_PRE_IO)
  3361. PROGLOG("before open file '%s', (%d,%d,%d,%d,0%o)",name->text.get(),(int)mode,(int)share,extraFlags,sMode,cFlags);
  3362. Owned<IFileIO> fileio = file->open((IFOmode)mode,extraFlags);
  3363. int handle;
  3364. if (fileio)
  3365. {
  3366. CriticalBlock block(sect);
  3367. handle = getNextHandle();
  3368. client.previdx = client.openFiles.ordinality();
  3369. client.openFiles.append(OpenFileInfo(handle, fileio, name));
  3370. }
  3371. else
  3372. handle = 0;
  3373. reply.append(RFEnoerror);
  3374. reply.append(handle);
  3375. if (TF_TRACE)
  3376. PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle);
  3377. return true;
  3378. }
  3379. bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply)
  3380. {
  3381. int handle;
  3382. msg.read(handle);
  3383. IFileIO *fileio;
  3384. checkFileIOHandle(reply, handle, fileio, true);
  3385. if (TF_TRACE)
  3386. PROGLOG("close file, handle = %d",handle);
  3387. reply.append(RFEnoerror);
  3388. return true;
  3389. }
  3390. void cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  3391. {
  3392. int handle;
  3393. __int64 pos;
  3394. size32_t len;
  3395. msg.read(handle).read(pos).read(len);
  3396. IFileIO *fileio;
  3397. checkFileIOHandle(reply, handle, fileio);
  3398. //arrange it so we read directly into the reply buffer...
  3399. unsigned posOfErr = reply.length();
  3400. reply.append((unsigned)RFEnoerror);
  3401. size32_t numRead;
  3402. unsigned posOfLength = reply.length();
  3403. if (TF_TRACE_PRE_IO)
  3404. PROGLOG("before read file, handle = %d, toread = %d",handle,len);
  3405. reply.reserve(sizeof(numRead));
  3406. void *data = reply.reserve(len);
  3407. numRead = fileio->read(pos,len,data);
  3408. stats.addRead(len);
  3409. if (TF_TRACE)
  3410. PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
  3411. reply.setLength(posOfLength + sizeof(numRead) + numRead);
  3412. reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead);
  3413. }
  3414. void cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3415. {
  3416. int handle;
  3417. msg.read(handle);
  3418. IFileIO *fileio;
  3419. checkFileIOHandle(reply, handle, fileio);
  3420. __int64 size = fileio->size();
  3421. reply.append((unsigned)RFEnoerror).append(size);
  3422. if (TF_TRACE)
  3423. PROGLOG("size file, handle = %d, size = %" I64F "d",handle,size);
  3424. }
  3425. void cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3426. {
  3427. int handle;
  3428. offset_t size;
  3429. msg.read(handle).read(size);
  3430. IFileIO *fileio;
  3431. if (TF_TRACE)
  3432. PROGLOG("set size file, handle = %d, size = %" I64F "d",handle,size);
  3433. checkFileIOHandle(reply, handle, fileio);
  3434. fileio->setSize(size);
  3435. reply.append((unsigned)RFEnoerror);
  3436. }
  3437. void cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  3438. {
  3439. int handle;
  3440. __int64 pos;
  3441. size32_t len;
  3442. msg.read(handle).read(pos).read(len);
  3443. IFileIO *fileio;
  3444. checkFileIOHandle(reply, handle, fileio);
  3445. const byte *data = (const byte *)msg.readDirect(len);
  3446. if (TF_TRACE_PRE_IO)
  3447. PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
  3448. size32_t numWritten = fileio->write(pos,len,data);
  3449. stats.addWrite(numWritten);
  3450. if (TF_TRACE)
  3451. PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
  3452. reply.append((unsigned)RFEnoerror).append(numWritten);
  3453. }
  3454. void cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3455. {
  3456. StringAttr name;
  3457. msg.read(name);
  3458. if (TF_TRACE)
  3459. PROGLOG("exists, '%s'",name.get());
  3460. Owned<IFile> file=createIFile(name);
  3461. bool e = file->exists();
  3462. reply.append((unsigned)RFEnoerror).append(e);
  3463. }
  3464. void cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3465. {
  3466. StringAttr name;
  3467. msg.read(name);
  3468. if (TF_TRACE)
  3469. PROGLOG("remove, '%s'",name.get());
  3470. Owned<IFile> file=createIFile(name);
  3471. bool e = file->remove();
  3472. reply.append((unsigned)RFEnoerror).append(e);
  3473. }
  3474. void cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply)
  3475. {
  3476. if (TF_TRACE)
  3477. PROGLOG("getVer");
  3478. /* weird backward compatibility convention,
  3479. * newer clients will send another unsigned to denote
  3480. * and result in the numeric DAFILESRV_VERSION being returned
  3481. * Ancient clients will get back the string form only (SERVER_VERSTRING)
  3482. */
  3483. if (msg.getPos()+sizeof(unsigned)>msg.length())
  3484. reply.append((unsigned)RFEnoerror);
  3485. else
  3486. reply.append((unsigned)DAFILESRV_VERSION+0x10000);
  3487. reply.append(DAFILESRV_VERSIONSTRING);
  3488. }
  3489. void cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3490. {
  3491. StringAttr fromname;
  3492. msg.read(fromname);
  3493. StringAttr toname;
  3494. msg.read(toname);
  3495. if (TF_TRACE)
  3496. PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get());
  3497. Owned<IFile> file=createIFile(fromname);
  3498. file->rename(toname);
  3499. reply.append((unsigned)RFEnoerror);
  3500. }
  3501. void cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3502. {
  3503. StringAttr fromname;
  3504. msg.read(fromname);
  3505. StringAttr toname;
  3506. msg.read(toname);
  3507. if (TF_TRACE)
  3508. PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get());
  3509. Owned<IFile> file=createIFile(fromname);
  3510. file->move(toname);
  3511. reply.append((unsigned)RFEnoerror);
  3512. }
  3513. void cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3514. {
  3515. StringAttr fromname;
  3516. msg.read(fromname);
  3517. StringAttr toname;
  3518. msg.read(toname);
  3519. if (TF_TRACE)
  3520. PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get());
  3521. copyFile(toname, fromname);
  3522. reply.append((unsigned)RFEnoerror);
  3523. }
  3524. void cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  3525. {
  3526. int handle;
  3527. __int64 pos;
  3528. __int64 len;
  3529. StringAttr srcname;
  3530. msg.read(handle).read(srcname).read(pos).read(len);
  3531. IFileIO *fileio;
  3532. checkFileIOHandle(reply, handle, fileio);
  3533. Owned<IFile> file = createIFile(srcname.get());
  3534. __int64 written = fileio->appendFile(file,pos,len);
  3535. stats.addWrite(written);
  3536. if (TF_TRACE)
  3537. PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
  3538. reply.append((unsigned)RFEnoerror).append(written);
  3539. }
  3540. void cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3541. {
  3542. StringAttr name;
  3543. msg.read(name);
  3544. if (TF_TRACE)
  3545. PROGLOG("isFile, '%s'",name.get());
  3546. Owned<IFile> file=createIFile(name);
  3547. unsigned ret = (unsigned)file->isFile();
  3548. reply.append((unsigned)RFEnoerror).append(ret);
  3549. }
  3550. void cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3551. {
  3552. StringAttr name;
  3553. msg.read(name);
  3554. if (TF_TRACE)
  3555. PROGLOG("isDir, '%s'",name.get());
  3556. Owned<IFile> file=createIFile(name);
  3557. unsigned ret = (unsigned)file->isDirectory();
  3558. reply.append((unsigned)RFEnoerror).append(ret);
  3559. }
  3560. void cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3561. {
  3562. StringAttr name;
  3563. msg.read(name);
  3564. if (TF_TRACE)
  3565. PROGLOG("isReadOnly, '%s'",name.get());
  3566. Owned<IFile> file=createIFile(name);
  3567. unsigned ret = (unsigned)file->isReadOnly();
  3568. reply.append((unsigned)RFEnoerror).append(ret);
  3569. }
  3570. void cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3571. {
  3572. StringAttr name;
  3573. bool set;
  3574. msg.read(name).read(set);
  3575. if (TF_TRACE)
  3576. PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set);
  3577. Owned<IFile> file=createIFile(name);
  3578. file->setReadOnly(set);
  3579. reply.append((unsigned)RFEnoerror);
  3580. }
  3581. void cmdSetFilePerms(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3582. {
  3583. StringAttr name;
  3584. unsigned fPerms;
  3585. msg.read(name).read(fPerms);
  3586. if (TF_TRACE)
  3587. PROGLOG("setFilePerms, '%s' 0%o",name.get(),fPerms);
  3588. Owned<IFile> file=createIFile(name);
  3589. file->setFilePermissions(fPerms);
  3590. reply.append((unsigned)RFEnoerror);
  3591. }
  3592. void cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3593. {
  3594. StringAttr name;
  3595. msg.read(name);
  3596. if (TF_TRACE)
  3597. PROGLOG("getTime, '%s'",name.get());
  3598. Owned<IFile> file=createIFile(name);
  3599. CDateTime createTime;
  3600. CDateTime modifiedTime;
  3601. CDateTime accessedTime;
  3602. bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime);
  3603. reply.append((unsigned)RFEnoerror).append(ret);
  3604. if (ret)
  3605. {
  3606. createTime.serialize(reply);
  3607. modifiedTime.serialize(reply);
  3608. accessedTime.serialize(reply);
  3609. }
  3610. }
  3611. void cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3612. {
  3613. StringAttr name;
  3614. bool creategot;
  3615. CDateTime createTime;
  3616. bool modifiedgot;
  3617. CDateTime modifiedTime;
  3618. bool accessedgot;
  3619. CDateTime accessedTime;
  3620. msg.read(name);
  3621. msg.read(creategot);
  3622. if (creategot)
  3623. createTime.deserialize(msg);
  3624. msg.read(modifiedgot);
  3625. if (modifiedgot)
  3626. modifiedTime.deserialize(msg);
  3627. msg.read(accessedgot);
  3628. if (accessedgot)
  3629. accessedTime.deserialize(msg);
  3630. if (TF_TRACE)
  3631. PROGLOG("setTime, '%s'",name.get());
  3632. Owned<IFile> file=createIFile(name);
  3633. bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL);
  3634. reply.append((unsigned)RFEnoerror).append(ret);
  3635. }
  3636. void cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3637. {
  3638. StringAttr name;
  3639. msg.read(name);
  3640. if (TF_TRACE)
  3641. PROGLOG("CreateDir, '%s'",name.get());
  3642. Owned<IFile> dir=createIFile(name);
  3643. bool ret = dir->createDirectory();
  3644. reply.append((unsigned)RFEnoerror).append(ret);
  3645. }
  3646. void cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3647. {
  3648. StringAttr name;
  3649. StringAttr mask;
  3650. bool includedir;
  3651. bool sub;
  3652. byte stream = 0;
  3653. msg.read(name).read(mask).read(includedir).read(sub);
  3654. if (msg.remaining()>=sizeof(byte))
  3655. {
  3656. msg.read(stream);
  3657. if (stream==1)
  3658. client.opendir.clear();
  3659. }
  3660. if (TF_TRACE)
  3661. PROGLOG("GetDir, '%s', '%s', stream='%u'",name.get(),mask.get(),stream);
  3662. if (!stream && !containsFileWildcard(mask))
  3663. {
  3664. // if no streaming, and mask contains no wildcard, it is much more efficient to get the info without a directory iterator!
  3665. StringBuffer fullFilename(name);
  3666. addPathSepChar(fullFilename).append(mask);
  3667. Owned<IFile> iFile = createIFile(fullFilename);
  3668. // NB: This must preserve same serialization format as CRemoteDirectoryIterator::serialize produces for <=1 file.
  3669. reply.append((unsigned)RFEnoerror);
  3670. if (!iFile->exists())
  3671. reply.append((byte)0);
  3672. else
  3673. {
  3674. byte b=1;
  3675. reply.append(b);
  3676. bool isDir = fileBool::foundYes == iFile->isDirectory();
  3677. reply.append(isDir);
  3678. reply.append(isDir ? 0 : iFile->size());
  3679. CDateTime dt;
  3680. iFile->getTime(nullptr, &dt, nullptr);
  3681. dt.serialize(reply);
  3682. reply.append(mask);
  3683. b = 0;
  3684. reply.append(b);
  3685. }
  3686. }
  3687. else
  3688. {
  3689. Owned<IFile> dir=createIFile(name);
  3690. Owned<IDirectoryIterator> iter;
  3691. if (stream>1)
  3692. iter.set(client.opendir);
  3693. else
  3694. {
  3695. iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
  3696. if (stream != 0)
  3697. client.opendir.set(iter);
  3698. }
  3699. if (!iter)
  3700. throw createDafsException(RFSERR_GetDirFailed, nullptr);
  3701. reply.append((unsigned)RFEnoerror);
  3702. if (serializeRemoteDirectoryIterator(reply,iter,stream?0x100000:0,stream<2))
  3703. {
  3704. if (stream != 0)
  3705. client.opendir.clear();
  3706. }
  3707. else
  3708. {
  3709. bool cont=true;
  3710. reply.append(cont);
  3711. }
  3712. }
  3713. }
  3714. void cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3715. {
  3716. StringAttr name;
  3717. StringAttr mask;
  3718. bool includedir;
  3719. bool sub;
  3720. unsigned checkinterval;
  3721. unsigned timeout;
  3722. __int64 cancelid; // not yet used
  3723. msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid);
  3724. byte isprev;
  3725. msg.read(isprev);
  3726. Owned<IDirectoryIterator> prev;
  3727. if (isprev==1)
  3728. {
  3729. SocketEndpoint ep;
  3730. prev.setown(createRemoteDirectorIterator(ep, name, msg));
  3731. }
  3732. if (TF_TRACE)
  3733. PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get());
  3734. Owned<IFile> dir=createIFile(name);
  3735. Owned<IDirectoryDifferenceIterator> iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout);
  3736. reply.append((unsigned)RFEnoerror);
  3737. byte state = (iter.get()==NULL)?0:1;
  3738. reply.append(state);
  3739. if (state==1)
  3740. serializeRemoteDirectoryDiff(reply, iter);
  3741. }
  3742. void cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3743. {
  3744. StringAttr uuid;
  3745. StringAttr fromFile;
  3746. StringAttr toFile;
  3747. offset_t toOfs;
  3748. offset_t fromOfs;
  3749. offset_t size;
  3750. offset_t sizeDone=0;
  3751. offset_t totalSize=(offset_t)-1;
  3752. unsigned timeout;
  3753. msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout);
  3754. AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout);
  3755. reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize);
  3756. }
  3757. static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
  3758. {
  3759. unsigned start = msTick();
  3760. Owned<IFile> dstfile = createIFile(dstfn);
  3761. // the following is really to check the dest node is up and working (otherwise not much point in continuing!)
  3762. if (dstfile->exists())
  3763. PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename());
  3764. Owned<IFile> srcfile = createIFile(srcfn);
  3765. unsigned lastmin = 0;
  3766. if (!srcfn.queryIP().ipequals(dstfn.queryIP())) {
  3767. CriticalBlock block(treeCopyCrit);
  3768. for (;;) {
  3769. CDateTime dt;
  3770. offset_t sz;
  3771. try {
  3772. sz = srcfile->size();
  3773. if (sz==(offset_t)-1) {
  3774. if (TF_TRACE_TREE_COPY)
  3775. PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename());
  3776. break;
  3777. }
  3778. srcfile->getTime(NULL,&dt,NULL);
  3779. }
  3780. catch (IException *e) {
  3781. EXCLOG(e,"treeCopyFile(1)");
  3782. e->Release();
  3783. break;
  3784. }
  3785. Linked<CTreeCopyItem> tc;
  3786. unsigned now = msTick();
  3787. ForEachItemInRev(i1,treeCopyArray) {
  3788. CTreeCopyItem &item = treeCopyArray.item(i1);
  3789. // prune old entries (not strictly needed buf I think better)
  3790. if (now-item.lastused>TREECOPYPRUNETIME)
  3791. treeCopyArray.remove(i1);
  3792. else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) {
  3793. tc.set(&item);
  3794. item.lastused = now;
  3795. }
  3796. }
  3797. if (!tc.get()) {
  3798. if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE)
  3799. treeCopyArray.remove(0);
  3800. tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt));
  3801. treeCopyArray.append(*tc.getLink());
  3802. }
  3803. ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?)
  3804. if (!tc->busy->testSet(cand)) {
  3805. // check file accessible and matches
  3806. if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor
  3807. continue;
  3808. bool ok = true;
  3809. Owned<IFile> rmtfile = createIFile(tc->loc.item(cand));
  3810. if (cand) { // only need to check if remote
  3811. try {
  3812. if (rmtfile->size()!=sz)
  3813. ok = false;
  3814. else {
  3815. CDateTime fdt;
  3816. rmtfile->getTime(NULL,&fdt,NULL);
  3817. ok = fdt.equals(dt);
  3818. }
  3819. }
  3820. catch (IException *e) {
  3821. EXCLOG(e,"treeCopyFile(2)");
  3822. e->Release();
  3823. ok = false;
  3824. }
  3825. }
  3826. if (ok) { // if not ok leave 'busy'
  3827. // finally lets try and copy!
  3828. try {
  3829. if (TF_TRACE_TREE_COPY)
  3830. PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  3831. {
  3832. CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
  3833. rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  3834. }
  3835. if (TF_TRACE_TREE_COPY)
  3836. PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  3837. tc->busy->set(cand,false);
  3838. if (treeCopyWaiting)
  3839. treeCopySem.signal((treeCopyWaiting>1)?2:1);
  3840. // add to known locations
  3841. tc->busy->set(tc->loc.ordinality(),false); // prob already is clear
  3842. tc->loc.append(dstfn);
  3843. ip.ipset(tc->loc.item(cand).queryIP());
  3844. return;
  3845. }
  3846. catch (IException *e) {
  3847. if (cand==0) {
  3848. tc->busy->set(0,false); // don't leave busy
  3849. if (treeCopyWaiting)
  3850. treeCopySem.signal();
  3851. throw; // what more can we do!
  3852. }
  3853. EXCLOG(e,"treeCopyFile(3)");
  3854. e->Release();
  3855. }
  3856. }
  3857. }
  3858. }
  3859. // all locations busy
  3860. if (msTick()-start>TREECOPYTIMEOUT) {
  3861. WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename());
  3862. break;
  3863. }
  3864. treeCopyWaiting++; // note this isn't precise - just indication
  3865. {
  3866. CriticalUnblock unblock(treeCopyCrit);
  3867. if (throttler)
  3868. {
  3869. CThrottleReleaseBlock block(*throttler, RFCtreecopy);
  3870. treeCopySem.wait(TREECOPYPOLLTIME);
  3871. }
  3872. else
  3873. treeCopySem.wait(TREECOPYPOLLTIME);
  3874. }
  3875. treeCopyWaiting--;
  3876. if ((msTick()-start)/10*1000!=lastmin) {
  3877. lastmin = (msTick()-start)/10*1000;
  3878. PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3879. }
  3880. }
  3881. }
  3882. else if (TF_TRACE_TREE_COPY)
  3883. PROGLOG("TREECOPY source on same node as destination");
  3884. if (TF_TRACE_TREE_COPY)
  3885. PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3886. try {
  3887. GetHostIp(ip);
  3888. srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  3889. }
  3890. catch (IException *e) {
  3891. EXCLOG(e,"TREECOPY(done,fallback)");
  3892. throw;
  3893. }
  3894. if (TF_TRACE_TREE_COPY)
  3895. PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3896. }
  3897. void cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler, bool usetmp=false)
  3898. {
  3899. RemoteFilename src;
  3900. src.deserialize(msg);
  3901. RemoteFilename dst;
  3902. dst.deserialize(msg);
  3903. StringAttr net;
  3904. StringAttr mask;
  3905. msg.read(net).read(mask);
  3906. IpAddress ip;
  3907. treeCopyFile(src,dst,net,mask,ip,usetmp,throttler);
  3908. unsigned status = 0;
  3909. reply.append((unsigned)RFEnoerror).append((unsigned)status);
  3910. ip.ipserialize(reply);
  3911. }
  3912. void cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler)
  3913. {
  3914. cmdTreeCopy(msg, reply, client, throttler, true);
  3915. }
  3916. void cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3917. {
  3918. StringAttr name;
  3919. msg.read(name);
  3920. if (TF_TRACE)
  3921. PROGLOG("getCRC, '%s'",name.get());
  3922. Owned<IFile> file=createIFile(name);
  3923. unsigned ret = file->getCRC();
  3924. reply.append((unsigned)RFEnoerror).append(ret);
  3925. }
  3926. void cmdStop(MemoryBuffer &msg, MemoryBuffer &reply)
  3927. {
  3928. PROGLOG("Abort request received");
  3929. stopping = true;
  3930. if (acceptsock)
  3931. acceptsock->cancel_accept();
  3932. if (securesock)
  3933. securesock->cancel_accept();
  3934. if (rowServiceSock)
  3935. rowServiceSock->cancel_accept();
  3936. reply.append((unsigned)RFEnoerror);
  3937. }
  3938. void cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3939. {
  3940. StringAttr cmdLine;
  3941. msg.read(cmdLine);
  3942. // NB: legacy remoteExec used to simply pass error code and buffer back to caller.
  3943. VStringBuffer errMsg("Remote command execution no longer supported. Trying to execute cmdline=%s", cmdLine.get());
  3944. WARNLOG("%s", errMsg.str());
  3945. size32_t outSz = errMsg.length()+1; // reply with null terminated string
  3946. // reply with error code -1
  3947. reply.append((unsigned)-1).append((unsigned)0).append(outSz).append(outSz, errMsg.str());
  3948. }
  3949. void cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply)
  3950. {
  3951. byte flags;
  3952. msg.read(flags);
  3953. int retcode=-1;
  3954. if (flags!=255) // escape
  3955. {
  3956. retcode = traceFlags;
  3957. traceFlags = flags;
  3958. }
  3959. reply.append(retcode);
  3960. }
  3961. void cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply)
  3962. {
  3963. unsigned level=1;
  3964. if (msg.remaining() >= sizeof(unsigned))
  3965. msg.read(level);
  3966. StringBuffer retstr;
  3967. getInfo(retstr, level);
  3968. reply.append(RFEnoerror).append(retstr.str());
  3969. }
  3970. void cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply)
  3971. {
  3972. // TBD
  3973. StringBuffer retstr;
  3974. getInfo(retstr);
  3975. reply.append(RFEnoerror).append(retstr.str());
  3976. }
  3977. void cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3978. {
  3979. StringAttr prefix;
  3980. StringAttr filename;
  3981. msg.read(prefix).read(filename);
  3982. RemoteFilename rfn;
  3983. rfn.setLocalPath(filename);
  3984. ExtractedBlobArray extracted;
  3985. extractBlobElements(prefix, rfn, extracted);
  3986. unsigned n = extracted.ordinality();
  3987. reply.append((unsigned)RFEnoerror).append(n);
  3988. for (unsigned i=0;i<n;i++)
  3989. extracted.item(i).serialize(reply);
  3990. }
  3991. void cmdStreamGeneral(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CClientStats &stats)
  3992. {
  3993. size32_t jsonSz;
  3994. msg.read(jsonSz);
  3995. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
  3996. cmdStreamCommon(requestTree, msg, reply, client, stats);
  3997. }
  3998. /* Notes on protocol:
  3999. *
  4000. * A JSON request with these top-level fields:
  4001. * "format" - the format of the reply. Supported formats = "binary", "xml", "json"
  4002. * "handle" - the handle of for a file session that was previously open (for continuation)
  4003. * "commCompression" - compression format of the communication protocol. Supports "LZ4", "LZW", "FLZ" (TBD: "ZLIB")
  4004. * "replyLimit" - Number of K to limit each reply size to. (default 1024)
  4005. * "node" - contains all 'activity' properties below:
  4006. *
  4007. * For a secured dafilesrv (streaming protocol), requests will only be accepted if the meta blob ("metaInfo") has a matching signature.
  4008. * The request must specify "filePart" (1 based) to denote the partition # being read from or written to.
  4009. *
  4010. * "filePartCopy" (1 based) defaults to 1
  4011. *
  4012. * "kind" - supported kinds = "diskread", "diskwrite", "indexread", "indexcount" (TBD: "diskcount", "indexwrite", "disklookup")
  4013. * NB: disk vs index will be auto detected if "kind" is absent.
  4014. *
  4015. * "action" - supported actions = "count" (used if "kind" is auto-detected to specify count should be performed instead of read)
  4016. *
  4017. * "keyFilter" - filter the results by this expression (See: HPCC-18474 for more details).
  4018. *
  4019. * "chooseN" - maximum # of results to return
  4020. *
  4021. * "compressed" - specifies whether input file is compressed. NB: not relevant to "index" types. Default = false. Auto-detected.
  4022. *
  4023. * "input" - specifies layout on disk of the file being read.
  4024. *
  4025. * "output" - where relavant, specifies the output format to be returned
  4026. *
  4027. * "fileName" is only used for unsecured non signed connections (normally forbidden), and specifies the fully qualified path to a physical file.
  4028. *
  4029. */
  4030. void cmdStreamCommon(IPropertyTree *requestTree, MemoryBuffer &rest, MemoryBuffer &reply, CRemoteClientHandler &client, CClientStats &stats)
  4031. {
  4032. /* Example JSON request:
  4033. *
  4034. * {
  4035. * "format" : "binary",
  4036. * "handle" : "1234",
  4037. * "replyLimit" : "64",
  4038. * "commCompression" : "LZ4",
  4039. * "node" : {
  4040. * "metaInfo" : "",
  4041. * "filePart" : 2,
  4042. * "filePartCopy" : 1,
  4043. * "kind" : "diskread",
  4044. * "fileName": "examplefilename",
  4045. * "keyFilter" : "f1='1 '",
  4046. * "chooseN" : 5,
  4047. * "compressed" : "false"
  4048. * "input" : {
  4049. * "f1" : "string5",
  4050. * "f2" : "string5"
  4051. * },
  4052. * "output" : {
  4053. * "f2" : "string",
  4054. * "f1" : "real"
  4055. * }
  4056. * }
  4057. * }
  4058. * OR
  4059. * {
  4060. * "format" : "binary",
  4061. * "handle" : "1234",
  4062. * "replyLimit" : "64",
  4063. * "commCompression" : "LZ4",
  4064. * "node" : {
  4065. * "kind" : "diskread",
  4066. * "fileName": "examplefilename",
  4067. * "keyFilter" : "f1='1 '",
  4068. * "chooseN" : 5,
  4069. * "compressed" : "false"
  4070. * "input" : {
  4071. * "f1" : "string5",
  4072. * "f2" : "string5"
  4073. * },
  4074. * "output" : {
  4075. * "f2" : "string",
  4076. * "f1" : "real"
  4077. * }
  4078. * }
  4079. * }
  4080. * OR
  4081. * {
  4082. * "format" : "xml",
  4083. * "handle" : "1234",
  4084. * "replyLimit" : "64",
  4085. * "node" : {
  4086. * "kind" : "diskread",
  4087. * "fileName": "examplefilename",
  4088. * "keyFilter" : "f1='1 '",
  4089. * "chooseN" : 5,
  4090. * "compressed" : "false"
  4091. * "input" : {
  4092. * "f1" : "string5",
  4093. * "f2" : "string5"
  4094. * },
  4095. * "output" : {
  4096. * "f2" : "string",
  4097. * "f1" : "real"
  4098. * }
  4099. * }
  4100. * }
  4101. * OR
  4102. * {
  4103. * "format" : "xml",
  4104. * "handle" : "1234",
  4105. * "node" : {
  4106. * "kind" : "indexread",
  4107. * "fileName": "examplefilename",
  4108. * "keyFilter" : "f1='1 '",
  4109. * "input" : {
  4110. * "f1" : "string5",
  4111. * "f2" : "string5"
  4112. * },
  4113. * "output" : {
  4114. * "f2" : "string",
  4115. * "f1" : "real"
  4116. * }
  4117. * }
  4118. * OR
  4119. * {
  4120. * "format" : "xml",
  4121. * "node" : {
  4122. * "kind" : "xmlread",
  4123. * "fileName": "examplefilename",
  4124. * "keyFilter" : "f1='1 '",
  4125. * "input" : {
  4126. * "f1" : "string5",
  4127. * "f2" : "string5"
  4128. * },
  4129. * "output" : {
  4130. * "f2" : "string",
  4131. * "f1" : "real"
  4132. * }
  4133. * "ActivityOptions" : { // usually not required, options here may override file meta info.
  4134. * "rowTag" : "/Dataset/OtherRow"
  4135. * }
  4136. * }
  4137. * OR
  4138. * {
  4139. * "format" : "xml",
  4140. * "node" : {
  4141. * "kind" : "csvread",
  4142. * "fileName": "examplefilename",
  4143. * "keyFilter" : "f1='1 '",
  4144. * "input" : {
  4145. * "f1" : "string5",
  4146. * "f2" : "string5"
  4147. * },
  4148. * "output" : {
  4149. * "f2" : "string",
  4150. * "f1" : "real"
  4151. * }
  4152. * "ActivityOptions" : { // usually not required, options here may override file meta info.
  4153. * "csvQuote" : "\"",
  4154. * "csvSeparate" : ","
  4155. * "csvTerminate" : "\\n,\\r\\n",
  4156. * }
  4157. * }
  4158. * OR
  4159. * {
  4160. * "format" : "xml",
  4161. * "node" : {
  4162. * "action" : "count", // if present performs count with/without filter and returns count
  4163. * "fileName": "examplefilename", // can be either index or flat file
  4164. * "keyFilter" : "f1='1 '",
  4165. * "input" : {
  4166. * "f1" : "string5",
  4167. * "f2" : "string5"
  4168. * },
  4169. * }
  4170. * }
  4171. * OR
  4172. * {
  4173. * "format" : "binary",
  4174. * "handle" : "1234",
  4175. * "replyLimit" : "64",
  4176. * "commCompression" : "LZ4",
  4177. * "node" : {
  4178. * "kind" : "diskwrite",
  4179. * "fileName": "examplefilename",
  4180. * "compressed" : "false" (or "LZ4", "FLZ", "LZW")
  4181. * "input" : {
  4182. * "f1" : "string5",
  4183. * "f2" : "string5"
  4184. * }
  4185. * }
  4186. * }
  4187. * OR
  4188. * {
  4189. * "format" : "binary",
  4190. * "handle" : "1234",
  4191. * "replyLimit" : "64",
  4192. * "node" : {
  4193. * "kind" : "indexwrite",
  4194. * "fileName": "examplefilename",
  4195. * "input" : {
  4196. * "f1" : "string5",
  4197. * "f2" : "string5"
  4198. * }
  4199. * }
  4200. * }
  4201. *
  4202. */
  4203. int cursorHandle = requestTree->getPropInt("handle");
  4204. OutputFormat outputFormat = outFmt_Xml;
  4205. Owned<ICompressor> compressor;
  4206. Owned<IExpander> expander;
  4207. Owned<CRemoteRequest> remoteRequest;
  4208. Owned<IRemoteActivity> outputActivity;
  4209. OpenFileInfo fileInfo;
  4210. if (!cursorHandle)
  4211. {
  4212. const char *outputFmtStr = requestTree->queryProp("format");
  4213. if (nullptr == outputFmtStr)
  4214. outputFormat = outFmt_Xml; // default
  4215. else if (strieq("xml", outputFmtStr))
  4216. outputFormat = outFmt_Xml;
  4217. else if (strieq("json", outputFmtStr))
  4218. outputFormat = outFmt_Json;
  4219. else if (strieq("binary", outputFmtStr))
  4220. outputFormat = outFmt_Binary;
  4221. else
  4222. throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
  4223. /* pre-version 2.4, "outputCompression" denoted data was compressed in communication protocol and only applied to reply row data
  4224. * Since 2.5 "commCompression" replaces "outputCompression", and applies to both incoming row data (write) and outgoing row data (read).
  4225. * But "outputCompression" is checked for backward compatibility.
  4226. */
  4227. if (requestTree->hasProp("outputCompression") || requestTree->hasProp("commCompression"))
  4228. {
  4229. const char *commCompressionType = requestTree->queryProp("commCompression");
  4230. if (isEmptyString(commCompressionType))
  4231. commCompressionType = requestTree->queryProp("outputCompression");
  4232. if (isEmptyString(commCompressionType))
  4233. {
  4234. compressor.setown(queryDefaultCompressHandler()->getCompressor());
  4235. expander.setown(queryDefaultCompressHandler()->getExpander());
  4236. }
  4237. else if (outFmt_Binary == outputFormat)
  4238. {
  4239. compressor.setown(getCompressor(commCompressionType));
  4240. expander.setown(getExpander(commCompressionType));
  4241. if (!compressor)
  4242. WARNLOG("Unknown compressor type specified: %s", commCompressionType);
  4243. }
  4244. else
  4245. WARNLOG("Communication protocol compression not supported for format: %s", outputFmtStr);
  4246. }
  4247. /* NB: unless client call is on dedicated service, allow non-authorized requests through, e.g. from engines talking to unsecured port
  4248. * In a secure setup, this service will be configured on a dedicated port, and the std. insecure dafilesrv will be unreachable.
  4249. */
  4250. bool authorizedOnly = rowServiceSock && client.isRowServiceClient();
  4251. // In future this may be passed the request and build a chain of activities and return sink.
  4252. outputActivity.setown(createOutputActivity(*requestTree, authorizedOnly, keyPairInfo));
  4253. {
  4254. CriticalBlock block(sect);
  4255. cursorHandle = getNextHandle();
  4256. }
  4257. remoteRequest.setown(new CRemoteRequest(cursorHandle, outputFormat, compressor, expander, outputActivity));
  4258. StringBuffer requestStr("jsonrequest:");
  4259. outputActivity->getInfoStr(requestStr);
  4260. Owned<StringAttrItem> name = new StringAttrItem(requestStr);
  4261. CriticalBlock block(sect);
  4262. client.previdx = client.openFiles.ordinality();
  4263. client.openFiles.append(OpenFileInfo(cursorHandle, remoteRequest, name));
  4264. }
  4265. else if (!lookupFileIOHandle(cursorHandle, fileInfo))
  4266. cursorHandle = 0; // challenge response ..
  4267. else // known handle, continuation
  4268. {
  4269. remoteRequest.set(fileInfo.remoteRequest);
  4270. outputFormat = fileInfo.remoteRequest->queryFormat();
  4271. }
  4272. if (cursorHandle)
  4273. remoteRequest->process(requestTree, rest, reply, stats);
  4274. else
  4275. {
  4276. const char *outputFmtStr = requestTree->queryProp("format");
  4277. if (nullptr == outputFmtStr)
  4278. outputFormat = outFmt_Xml; // default
  4279. else if (strieq("xml", outputFmtStr))
  4280. outputFormat = outFmt_Xml;
  4281. else if (strieq("json", outputFmtStr))
  4282. outputFormat = outFmt_Json;
  4283. else if (strieq("binary", outputFmtStr))
  4284. outputFormat = outFmt_Binary;
  4285. else
  4286. throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
  4287. if (outFmt_Binary == outputFormat)
  4288. reply.append(cursorHandle);
  4289. else // outFmt_Xml || outFmt_Json
  4290. {
  4291. Owned<IXmlWriterExt> responseWriter = createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == outputFormat ? WTStandard : WTJSONObject);
  4292. responseWriter->outputBeginNested("Response", true);
  4293. if (outFmt_Xml == outputFormat)
  4294. responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
  4295. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  4296. responseWriter->outputEndNested("Response");
  4297. responseWriter->finalize();
  4298. reply.append(responseWriter->length(), responseWriter->str());
  4299. }
  4300. }
  4301. }
  4302. void cmdStreamReadCommon(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  4303. {
  4304. size32_t jsonSz = msg.remaining();
  4305. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
  4306. cmdStreamCommon(requestTree, msg, reply, client, stats);
  4307. }
  4308. // NB: JSON header to message, for some requests (e.g. write), there will be trailing raw data (e.g. row data)
  4309. void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  4310. {
  4311. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4312. cmdStreamReadCommon(msg, reply, client, stats);
  4313. }
  4314. void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  4315. {
  4316. /* NB: exactly the same handling as cmdStreamReadStd(RFCStreamRead) for now,
  4317. * may want to differentiate later
  4318. * i.e. return format is { len[unsigned4-bigendian], errorcode[unsigned4-bigendian], result } - where result format depends on request output type.
  4319. * errorcode = 0 means no error
  4320. */
  4321. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4322. cmdStreamReadCommon(msg, reply, client, stats);
  4323. }
  4324. void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  4325. {
  4326. reply.append('J');
  4327. cmdStreamReadCommon(msg, reply, client, stats);
  4328. }
  4329. // legacy version
  4330. void cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
  4331. {
  4332. unsigned limit, delayMs, cpuThreshold;
  4333. msg.read(limit);
  4334. msg.read(delayMs);
  4335. msg.read(cpuThreshold);
  4336. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, (unsigned)-1);
  4337. reply.append((unsigned)RFEnoerror);
  4338. }
  4339. void cmdSetThrottle2(MemoryBuffer & msg, MemoryBuffer & reply)
  4340. {
  4341. unsigned throttleClass, limit, delayMs, cpuThreshold, queueLimit;
  4342. msg.read(throttleClass);
  4343. msg.read(limit);
  4344. msg.read(delayMs);
  4345. msg.read(cpuThreshold);
  4346. msg.read(queueLimit);
  4347. setThrottle((ThrottleClass)throttleClass, limit, delayMs, cpuThreshold, queueLimit);
  4348. reply.append((unsigned)RFEnoerror);
  4349. }
  4350. void formatException(MemoryBuffer &reply, IException *e, RemoteFileCommandType cmd, bool testSocketFlag, unsigned _dfsErrorCode, CRemoteClientHandler *client)
  4351. {
  4352. unsigned dfsErrorCode = _dfsErrorCode;
  4353. if (!dfsErrorCode)
  4354. {
  4355. if (e)
  4356. dfsErrorCode = (QUERYINTERFACE(e, IDAFS_Exception)) ? e->errorCode() : RFSERR_InternalError;
  4357. else
  4358. dfsErrorCode = RFSERR_InternalError;
  4359. }
  4360. VStringBuffer errMsg("ERROR: cmd=%s, error=%s", getRFCText(cmd), getRFSERRText(dfsErrorCode));
  4361. if (e)
  4362. {
  4363. errMsg.appendf(" (%u, ", e->errorCode());
  4364. unsigned len = errMsg.length();
  4365. e->errorMessage(errMsg);
  4366. if (len == errMsg.length())
  4367. errMsg.setLength(len-2); // strip off ", " if no message in exception
  4368. errMsg.append(")");
  4369. }
  4370. if (testSocketFlag)
  4371. reply.append('-');
  4372. else
  4373. reply.append(dfsErrorCode);
  4374. reply.append(errMsg.str());
  4375. if (client && cmd!=RFCunlock)
  4376. {
  4377. const char *peer = client->queryPeerName();
  4378. if (peer)
  4379. {
  4380. VStringBuffer err("%s. Client: %s", errMsg.str(), peer);
  4381. PROGLOG("%s", err.str());
  4382. }
  4383. client->logPrevHandle();
  4384. }
  4385. }
  4386. void throttleCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  4387. {
  4388. switch (cmd)
  4389. {
  4390. case RFCexec:
  4391. case RFCgetcrc:
  4392. case RFCcopy:
  4393. case RFCappend:
  4394. case RFCtreecopy:
  4395. case RFCtreecopytmp:
  4396. case RFCremove:
  4397. case RFCcopysection:
  4398. slowCmdThrottler.addCommand(cmd, msg, client);
  4399. return;
  4400. case RFCcloseIO:
  4401. case RFCopenIO:
  4402. case RFCread:
  4403. case RFCsize:
  4404. case RFCwrite:
  4405. case RFCexists:
  4406. case RFCrename:
  4407. case RFCgetver:
  4408. case RFCisfile:
  4409. case RFCisdirectory:
  4410. case RFCisreadonly:
  4411. case RFCsetreadonly:
  4412. case RFCsetfileperms:
  4413. case RFCreadfilteredindex:
  4414. case RFCreadfilteredindexcount:
  4415. case RFCreadfilteredindexblob:
  4416. case RFCgettime:
  4417. case RFCsettime:
  4418. case RFCcreatedir:
  4419. case RFCgetdir:
  4420. case RFCmonitordir:
  4421. case RFCstop:
  4422. case RFCextractblobelements:
  4423. case RFCredeploy:
  4424. case RFCmove:
  4425. case RFCsetsize:
  4426. case RFCsettrace:
  4427. case RFCgetinfo:
  4428. case RFCfirewall:
  4429. case RFCStreamRead:
  4430. case RFCStreamReadTestSocket:
  4431. case RFCStreamReadJSON:
  4432. stdCmdThrottler.addCommand(cmd, msg, client);
  4433. return;
  4434. // NB: The following commands are still bound by the the thread pool
  4435. case RFCsetthrottle: // legacy version
  4436. case RFCsetthrottle2:
  4437. default:
  4438. {
  4439. client->processCommand(cmd, msg, NULL);
  4440. break;
  4441. }
  4442. }
  4443. }
  4444. void checkAuthorizedStreamCommand(CRemoteClientHandler &client)
  4445. {
  4446. if (!rowServiceOnStdPort && !client.isRowServiceClient())
  4447. throw createDafsException(DAFSERR_cmdstream_unauthorized, "Unauthorized command");
  4448. }
  4449. bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
  4450. {
  4451. Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
  4452. bool testSocketFlag = false;
  4453. unsigned posOfErr = reply.length();
  4454. try
  4455. {
  4456. switch(cmd)
  4457. {
  4458. MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
  4459. MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
  4460. MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
  4461. MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
  4462. MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
  4463. MAPCOMMAND(RFCsize, cmdSize);
  4464. MAPCOMMANDCLIENT(RFCexists, cmdExists, *client);
  4465. MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client);
  4466. MAPCOMMANDCLIENT(RFCrename, cmdRename, *client);
  4467. MAPCOMMAND(RFCgetver, cmdGetVer);
  4468. MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client);
  4469. MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client);
  4470. MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client);
  4471. MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client);
  4472. MAPCOMMANDCLIENT(RFCsetfileperms, cmdSetFilePerms, *client);
  4473. MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client);
  4474. MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client);
  4475. MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client);
  4476. MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client);
  4477. MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client);
  4478. MAPCOMMAND(RFCstop, cmdStop);
  4479. MAPCOMMANDCLIENT(RFCexec, cmdExec, *client);
  4480. MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client);
  4481. MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client);
  4482. MAPCOMMANDCLIENT(RFCmove, cmdMove, *client);
  4483. MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client);
  4484. MAPCOMMAND(RFCsetsize, cmdSetSize);
  4485. MAPCOMMAND(RFCsettrace, cmdSetTrace);
  4486. MAPCOMMAND(RFCgetinfo, cmdGetInfo);
  4487. MAPCOMMAND(RFCfirewall, cmdFirewall);
  4488. MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
  4489. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
  4490. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
  4491. MAPCOMMAND(RFCsetthrottle, cmdSetThrottle); // legacy version
  4492. MAPCOMMAND(RFCsetthrottle2, cmdSetThrottle2);
  4493. // row service commands
  4494. case RFCStreamGeneral:
  4495. {
  4496. checkAuthorizedStreamCommand(*client);
  4497. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4498. cmdStreamGeneral(msg, reply, *client, *stats);
  4499. break;
  4500. }
  4501. case RFCStreamRead:
  4502. {
  4503. checkAuthorizedStreamCommand(*client);
  4504. cmdStreamReadStd(msg, reply, *client, *stats);
  4505. break;
  4506. }
  4507. case RFCStreamReadJSON:
  4508. {
  4509. checkAuthorizedStreamCommand(*client);
  4510. cmdStreamReadJSON(msg, reply, *client, *stats);
  4511. break;
  4512. }
  4513. case RFCStreamReadTestSocket:
  4514. {
  4515. testSocketFlag = true;
  4516. checkAuthorizedStreamCommand(*client);
  4517. cmdStreamReadTestSocket(msg, reply, *client, *stats);
  4518. break;
  4519. }
  4520. default:
  4521. formatException(reply, nullptr, cmd, false, RFSERR_InvalidCommand, client);
  4522. break;
  4523. }
  4524. }
  4525. catch (IException *e)
  4526. {
  4527. checkOutOfHandles(e);
  4528. reply.setWritePos(posOfErr);
  4529. formatException(reply, e, cmd, testSocketFlag, 0, client);
  4530. e->Release();
  4531. }
  4532. return testSocketFlag;
  4533. }
  4534. IPooledThread *createCommandProcessor()
  4535. {
  4536. return new cCommandProcessor();
  4537. }
  4538. void checkOutOfHandles(IException *exception)
  4539. {
  4540. if (EMFILE == exception->errorCode())
  4541. handleTracer.traceIfReady();
  4542. }
  4543. virtual void run(DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
  4544. {
  4545. SocketEndpoint sslep(listenep);
  4546. if (sslPort)
  4547. sslep.port = sslPort;
  4548. else
  4549. sslep.port = securitySettings.daFileSrvSSLPort;
  4550. Owned<ISocket> acceptSock, secureSock, rowServiceSock;
  4551. if (_connectMethod != SSLOnly)
  4552. {
  4553. if (listenep.port == 0)
  4554. throw createDafsException(DAFSERR_serverinit_failed, "dafilesrv port not specified");
  4555. if (listenep.isNull())
  4556. acceptSock.setown(ISocket::create(listenep.port));
  4557. else
  4558. {
  4559. StringBuffer ips;
  4560. listenep.getIpText(ips);
  4561. acceptSock.setown(ISocket::create_ip(listenep.port,ips.str()));
  4562. }
  4563. }
  4564. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  4565. {
  4566. if (sslep.port == 0)
  4567. throw createDafsException(DAFSERR_serverinit_failed, "Secure dafilesrv port not specified");
  4568. if (_connectMethod == UnsecureFirst)
  4569. {
  4570. // don't fail, but warn - this allows for fast SSL client rejections
  4571. if (!securitySettings.certificate)
  4572. WARNLOG("SSL Certificate information not found in environment.conf, cannot accept SSL connections");
  4573. else if ( !checkFileExists(securitySettings.certificate) )
  4574. {
  4575. WARNLOG("SSL Certificate File not found in environment.conf, cannot accept SSL connections");
  4576. securitySettings.certificate = nullptr;
  4577. }
  4578. if (!securitySettings.privateKey)
  4579. WARNLOG("SSL Key information not found in environment.conf, cannot accept SSL connections");
  4580. else if ( !checkFileExists(securitySettings.privateKey) )
  4581. {
  4582. WARNLOG("SSL Key File not found in environment.conf, cannot accept SSL connections");
  4583. securitySettings.privateKey = nullptr;
  4584. }
  4585. }
  4586. else
  4587. validateSSLSetup();
  4588. if (sslep.isNull())
  4589. secureSock.setown(ISocket::create(sslep.port));
  4590. else
  4591. {
  4592. StringBuffer ips;
  4593. sslep.getIpText(ips);
  4594. secureSock.setown(ISocket::create_ip(sslep.port,ips.str()));
  4595. }
  4596. }
  4597. if (rowServiceEp)
  4598. {
  4599. rowServiceSSL = _rowServiceSSL;
  4600. rowServiceOnStdPort = _rowServiceOnStdPort;
  4601. if (rowServiceEp->isNull())
  4602. rowServiceSock.setown(ISocket::create(rowServiceEp->port));
  4603. else
  4604. {
  4605. StringBuffer ips;
  4606. rowServiceEp->getIpText(ips);
  4607. rowServiceSock.setown(ISocket::create_ip(rowServiceEp->port, ips.str()));
  4608. }
  4609. #ifdef _USE_OPENSSL
  4610. if (rowServiceSSL)
  4611. validateSSLSetup();
  4612. #else
  4613. rowServiceSSL = false;
  4614. #endif
  4615. }
  4616. run(_connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
  4617. }
  4618. virtual void run(DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock) override
  4619. {
  4620. acceptsock.setown(_acceptSock);
  4621. securesock.setown(_secureSock);
  4622. rowServiceSock.setown(_rowServiceSock);
  4623. if (_connectMethod != SSLOnly)
  4624. {
  4625. if (!acceptsock)
  4626. throw createDafsException(DAFSERR_serverinit_failed, "Invalid non-secure socket");
  4627. }
  4628. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  4629. {
  4630. if (!securesock)
  4631. throw createDafsException(DAFSERR_serverinit_failed, "Invalid secure socket");
  4632. }
  4633. selecthandler->start();
  4634. Owned<IException> exception;
  4635. for (;;)
  4636. {
  4637. Owned<ISocket> sock;
  4638. Owned<ISocket> sockSSL;
  4639. Owned<ISocket> acceptedRSSock;
  4640. bool sockavail = false;
  4641. bool securesockavail = false;
  4642. bool rowServiceSockAvail = false;
  4643. if (_connectMethod == SSLNone && (nullptr == rowServiceSock.get()))
  4644. sockavail = acceptsock->wait_read(1000*60*1)!=0;
  4645. else if (_connectMethod == SSLOnly && (nullptr == rowServiceSock.get()))
  4646. securesockavail = securesock->wait_read(1000*60*1)!=0;
  4647. else
  4648. {
  4649. UnsignedArray readSocks;
  4650. UnsignedArray waitingSocks;
  4651. if (acceptsock)
  4652. readSocks.append(acceptsock->OShandle());
  4653. if (securesock)
  4654. readSocks.append(securesock->OShandle());
  4655. if (rowServiceSock)
  4656. readSocks.append(rowServiceSock->OShandle());
  4657. int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
  4658. if (numReady > 0)
  4659. {
  4660. for (int idx = 0; idx < numReady; idx++)
  4661. {
  4662. unsigned waitingSock = waitingSocks.item(idx);
  4663. if (acceptsock && (waitingSock == acceptsock->OShandle()))
  4664. sockavail = true;
  4665. else if (securesock && (waitingSock == securesock->OShandle()))
  4666. securesockavail = true;
  4667. else if (rowServiceSock && (waitingSock == rowServiceSock->OShandle()))
  4668. rowServiceSockAvail = true;
  4669. }
  4670. }
  4671. }
  4672. #if 0
  4673. if (!sockavail && !securesockavail && !rowServiceSockAvail)
  4674. {
  4675. JSocketStatistics stats;
  4676. getSocketStatistics(stats);
  4677. StringBuffer s;
  4678. getSocketStatisticsString(stats,s);
  4679. PROGLOG( "Socket statistics : \n%s\n",s.str());
  4680. }
  4681. #endif
  4682. if (stopping)
  4683. break;
  4684. if (sockavail || securesockavail || rowServiceSockAvail)
  4685. {
  4686. if (sockavail)
  4687. {
  4688. try
  4689. {
  4690. sock.setown(acceptsock->accept(true));
  4691. if (!sock||stopping)
  4692. break;
  4693. }
  4694. catch (IException *e)
  4695. {
  4696. exception.setown(e);
  4697. }
  4698. if (exception)
  4699. {
  4700. EXCLOG(exception, "CRemoteFileServer");
  4701. checkOutOfHandles(exception);
  4702. exception.clear();
  4703. sockavail = false;
  4704. }
  4705. }
  4706. if (securesockavail)
  4707. {
  4708. Owned<ISecureSocket> ssock;
  4709. try
  4710. {
  4711. sockSSL.setown(securesock->accept(true));
  4712. if (!sockSSL||stopping)
  4713. break;
  4714. if ( (_connectMethod == UnsecureFirst) && (!securitySettings.certificate || !securitySettings.privateKey) )
  4715. {
  4716. // for client secure_connect() to fail quickly ...
  4717. cleanupDaFsSocket(sockSSL);
  4718. sockSSL.clear();
  4719. securesockavail = false;
  4720. }
  4721. else
  4722. {
  4723. ssock.setown(createSecureSocket(sockSSL.getClear(), ServerSocket));
  4724. int status = ssock->secure_accept();
  4725. if (status < 0)
  4726. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection");
  4727. sockSSL.setown(ssock.getLink());
  4728. }
  4729. }
  4730. catch (IException *e)
  4731. {
  4732. exception.setown(e);
  4733. }
  4734. if (exception)
  4735. {
  4736. EXCLOG(exception, "CRemoteFileServer (secure)");
  4737. cleanupDaFsSocket(sockSSL);
  4738. sockSSL.clear();
  4739. cleanupDaFsSocket(ssock);
  4740. ssock.clear();
  4741. checkOutOfHandles(exception);
  4742. exception.clear();
  4743. securesockavail = false;
  4744. }
  4745. }
  4746. if (rowServiceSockAvail)
  4747. {
  4748. Owned<ISecureSocket> ssock;
  4749. try
  4750. {
  4751. acceptedRSSock.setown(rowServiceSock->accept(true));
  4752. if (!acceptedRSSock||stopping)
  4753. break;
  4754. if (rowServiceSSL) // NB: will be disabled if !_USE_OPENSLL
  4755. {
  4756. ssock.setown(createSecureSocket(acceptedRSSock.getClear(), ServerSocket));
  4757. int status = ssock->secure_accept();
  4758. if (status < 0)
  4759. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish SSL row service connection");
  4760. acceptedRSSock.setown(ssock.getLink());
  4761. }
  4762. }
  4763. catch (IException *e)
  4764. {
  4765. exception.setown(e);
  4766. }
  4767. if (exception)
  4768. {
  4769. EXCLOG(exception, "CRemoteFileServer (row service)");
  4770. cleanupDaFsSocket(acceptedRSSock);
  4771. acceptedRSSock.clear();
  4772. cleanupDaFsSocket(ssock);
  4773. ssock.clear();
  4774. checkOutOfHandles(exception);
  4775. exception.clear();
  4776. rowServiceSockAvail = false;
  4777. }
  4778. }
  4779. #ifdef _DEBUG
  4780. SocketEndpoint eps;
  4781. StringBuffer peerURL;
  4782. #endif
  4783. if (sockavail)
  4784. {
  4785. #ifdef _DEBUG
  4786. sock->getPeerEndpoint(eps);
  4787. eps.getUrlStr(peerURL);
  4788. PROGLOG("Server accepting from %s", peerURL.str());
  4789. #endif
  4790. runClient(sock.getClear(), false);
  4791. }
  4792. if (securesockavail)
  4793. {
  4794. #ifdef _DEBUG
  4795. sockSSL->getPeerEndpoint(eps);
  4796. eps.getUrlStr(peerURL.clear());
  4797. PROGLOG("Server accepting SECURE from %s", peerURL.str());
  4798. #endif
  4799. runClient(sockSSL.getClear(), false);
  4800. }
  4801. if (rowServiceSockAvail)
  4802. {
  4803. #ifdef _DEBUG
  4804. acceptedRSSock->getPeerEndpoint(eps);
  4805. eps.getUrlStr(peerURL.clear());
  4806. PROGLOG("Server accepting row service socket from %s", peerURL.str());
  4807. #endif
  4808. runClient(acceptedRSSock.getClear(), true);
  4809. }
  4810. }
  4811. else
  4812. checkTimeout();
  4813. }
  4814. if (TF_TRACE_CLIENT_STATS)
  4815. PROGLOG("CRemoteFileServer:run exiting");
  4816. selecthandler->stop(true);
  4817. }
  4818. void processUnauthenticatedCommand(RemoteFileCommandType cmd, ISocket *socket, MemoryBuffer &msg)
  4819. {
  4820. // these are unauthenticated commands
  4821. if (cmd != RFCgetver)
  4822. cmd = RFCinvalid;
  4823. MemoryBuffer reply;
  4824. bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
  4825. sendDaFsBuffer(socket, reply, testSocketFlag);
  4826. }
  4827. void runClient(ISocket *sock, bool rowService) // rowService used to distinguish client calls
  4828. {
  4829. cCommandProcessor::cCommandProcessorParams params;
  4830. params.client = new CRemoteClientHandler(this, sock, globallasttick, rowService);
  4831. {
  4832. CriticalBlock block(sect);
  4833. clients.append(*LINK(params.client));
  4834. }
  4835. // NB: This could be blocked, by thread pool limit
  4836. threads->start(&params);
  4837. }
  4838. void stop()
  4839. {
  4840. // stop accept loop
  4841. if (TF_TRACE_CLIENT_STATS)
  4842. PROGLOG("CRemoteFileServer::stop");
  4843. if (acceptsock)
  4844. acceptsock->cancel_accept();
  4845. if (securesock)
  4846. securesock->cancel_accept();
  4847. threads->stopAll();
  4848. threads->joinAll(true,60*1000);
  4849. }
  4850. bool notify(CRemoteClientHandler *_client, MemoryBuffer &msg)
  4851. {
  4852. Linked<CRemoteClientHandler> client;
  4853. client.set(_client);
  4854. if (TF_TRACE_FULL)
  4855. PROGLOG("notify %d", msg.length());
  4856. if (msg.length())
  4857. {
  4858. if (TF_TRACE_FULL)
  4859. PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
  4860. cCommandProcessor::cCommandProcessorParams params;
  4861. params.client = client.getClear();
  4862. params.msg.swapWith(msg);
  4863. /* This can block because the thread pool is full and therefore block the selecthandler
  4864. * This is akin to the main server blocking post accept() for the same reason.
  4865. */
  4866. threads->start(&params);
  4867. }
  4868. else
  4869. onCloseSocket(client,3); // removes owned handles
  4870. return false;
  4871. }
  4872. void addClient(CRemoteClientHandler *client)
  4873. {
  4874. if (client&&client->socket)
  4875. selecthandler->add(client->socket,SELECTMODE_READ,client);
  4876. }
  4877. void checkTimeout()
  4878. {
  4879. if (msTick()-clientcounttick>1000*60*60)
  4880. {
  4881. CriticalBlock block(ClientCountSect);
  4882. if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
  4883. PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
  4884. clientcounttick = msTick();
  4885. MaxClientCount = ClientCount;
  4886. if (closedclients)
  4887. {
  4888. if (TF_TRACE_CLIENT_STATS)
  4889. PROGLOG("Closed client count = %d",closedclients);
  4890. closedclients = 0;
  4891. }
  4892. }
  4893. CriticalBlock block(sect);
  4894. ForEachItemInRev(i,clients)
  4895. {
  4896. CRemoteClientHandler &client = clients.item(i);
  4897. if (client.timedOut())
  4898. {
  4899. StringBuffer s;
  4900. bool ok = client.getInfo(s); // will spot duff sockets
  4901. if (ok&&(client.openFiles.ordinality()!=0))
  4902. {
  4903. if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
  4904. WARNLOG("Inactive %s",s.str());
  4905. }
  4906. else
  4907. {
  4908. #ifndef _DEBUG
  4909. if (TF_TRACE_CLIENT_CONN)
  4910. #endif
  4911. PROGLOG("Timing out %s",s.str());
  4912. closedclients++;
  4913. onCloseSocket(&client,4); // removes owned handles
  4914. }
  4915. }
  4916. }
  4917. }
  4918. void getInfo(StringBuffer &info, unsigned level=1)
  4919. {
  4920. {
  4921. CriticalBlock block(ClientCountSect);
  4922. info.append(DAFILESRV_VERSIONSTRING).append('\n');
  4923. info.appendf("Client count = %d\n",ClientCount);
  4924. info.appendf("Max client count = %d",MaxClientCount);
  4925. }
  4926. CriticalBlock block(sect);
  4927. ForEachItemIn(i,clients)
  4928. {
  4929. info.newline().append(i).append(": ");
  4930. clients.item(i).getInfo(info);
  4931. }
  4932. info.newline().appendf("Running threads: %u", threadRunningCount());
  4933. info.newline();
  4934. stdCmdThrottler.getInfo(info);
  4935. info.newline();
  4936. slowCmdThrottler.getInfo(info);
  4937. clientStatsTable.getInfo(info, level);
  4938. }
  4939. unsigned threadRunningCount()
  4940. {
  4941. if (!threads)
  4942. return 0;
  4943. return threads->runningCount();
  4944. }
  4945. unsigned idleTime()
  4946. {
  4947. unsigned t = (unsigned)atomic_read(&globallasttick);
  4948. return msTick()-t;
  4949. }
  4950. void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs, unsigned cpuThreshold, unsigned queueLimit)
  4951. {
  4952. switch (throttleClass)
  4953. {
  4954. case ThrottleStd:
  4955. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  4956. break;
  4957. case ThrottleSlow:
  4958. slowCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  4959. break;
  4960. default:
  4961. {
  4962. StringBuffer availableClasses("{ ");
  4963. for (unsigned c=0; c<ThrottleClassMax; c++)
  4964. {
  4965. availableClasses.append(c).append(" = ").append(getThrottleClassText((ThrottleClass)c));
  4966. if (c+1<ThrottleClassMax)
  4967. availableClasses.append(", ");
  4968. }
  4969. availableClasses.append(" }");
  4970. throw MakeStringException(0, "Unknown throttle class: %u, available classes are: %s", (unsigned)throttleClass, availableClasses.str());
  4971. }
  4972. }
  4973. }
  4974. StringBuffer &getStats(StringBuffer &stats, bool reset)
  4975. {
  4976. CriticalBlock block(sect);
  4977. stdCmdThrottler.getStats(stats, reset).newline();
  4978. slowCmdThrottler.getStats(stats, reset);
  4979. if (reset)
  4980. clientStatsTable.reset();
  4981. return stats;
  4982. }
  4983. };
  4984. IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *keyPairInfo)
  4985. {
  4986. return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo);
  4987. }
  4988. int setDaliServerTrace(byte flags)
  4989. {
  4990. byte ret = traceFlags;
  4991. traceFlags = flags;
  4992. return ret;
  4993. }
  4994. #ifdef _USE_CPPUNIT
  4995. #include "unittests.hpp"
  4996. #include "rmtfile.hpp"
  4997. /* MP_START_PORT -> MP_END_PORT is the MP reserved dynamic port range, and is used here for convenience.
  4998. * MP_START_PORT is used as starting point to find an available port for the temporary dafilesrv service in these unittests.
  4999. * All (MP) components using this range always check and find an unused port.
  5000. */
  5001. static unsigned serverPort = MP_START_PORT;
  5002. static StringBuffer basePath;
  5003. static Owned<CSimpleInterface> serverThread;
  5004. class RemoteFileSlowTest : public CppUnit::TestFixture
  5005. {
  5006. CPPUNIT_TEST_SUITE(RemoteFileSlowTest);
  5007. CPPUNIT_TEST(testRemoteFilename);
  5008. CPPUNIT_TEST(testStartServer);
  5009. CPPUNIT_TEST(testBasicFunctionality);
  5010. CPPUNIT_TEST(testCopy);
  5011. CPPUNIT_TEST(testOther);
  5012. CPPUNIT_TEST(testConfiguration);
  5013. CPPUNIT_TEST(testDirectoryMonitoring);
  5014. CPPUNIT_TEST(testFinish);
  5015. CPPUNIT_TEST_SUITE_END();
  5016. size32_t testLen = 1024;
  5017. protected:
  5018. void testRemoteFilename()
  5019. {
  5020. const char *rfns = "//1.2.3.4/dir1/file1|//1.2.3.4:7100/dir1/file1,"
  5021. "//1.2.3.4:7100/dir1/file1|//1.2.3.4:7100/dir1/file1,"
  5022. "//1.2.3.4/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
  5023. "//1.2.3.4:7100/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
  5024. "//1.2.3.4:7100/d$/dir1/file1|//1.2.3.4:7100/d$/dir1/file1";
  5025. StringArray tests;
  5026. tests.appendList(rfns, ",");
  5027. ForEachItemIn(i, tests)
  5028. {
  5029. StringArray inOut;
  5030. const char *pair = tests.item(i);
  5031. inOut.appendList(pair, "|");
  5032. const char *rfn = inOut.item(0);
  5033. const char *expected = inOut.item(1);
  5034. Owned<IFile> iFile = createIFile(rfn);
  5035. const char *res = iFile->queryFilename();
  5036. if (!streq(expected, res))
  5037. {
  5038. StringBuffer errMsg("testRemoteFilename MISMATCH");
  5039. errMsg.newline().append("Expected: ").append(expected);
  5040. errMsg.newline().append("Got: ").append(res);
  5041. PROGLOG("%s", errMsg.str());
  5042. CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
  5043. }
  5044. else
  5045. PROGLOG("MATCH: %s", res);
  5046. }
  5047. }
  5048. void testStartServer()
  5049. {
  5050. Owned<ISocket> socket;
  5051. unsigned endPort = MP_END_PORT;
  5052. while (1)
  5053. {
  5054. try
  5055. {
  5056. socket.setown(ISocket::create(serverPort));
  5057. break;
  5058. }
  5059. catch (IJSOCK_Exception *e)
  5060. {
  5061. if (e->errorCode() != JSOCKERR_port_in_use)
  5062. {
  5063. StringBuffer eStr;
  5064. e->errorMessage(eStr);
  5065. e->Release();
  5066. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  5067. }
  5068. else if (serverPort == endPort)
  5069. {
  5070. e->Release();
  5071. CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
  5072. }
  5073. }
  5074. ++serverPort;
  5075. }
  5076. basePath.append("//");
  5077. SocketEndpoint ep(serverPort);
  5078. ep.getUrlStr(basePath);
  5079. char cpath[_MAX_DIR];
  5080. if (!GetCurrentDirectory(_MAX_DIR, cpath))
  5081. CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
  5082. else
  5083. basePath.append(cpath);
  5084. addPathSepChar(basePath);
  5085. PROGLOG("basePath = %s", basePath.str());
  5086. class CServerThread : public CSimpleInterface, implements IThreaded
  5087. {
  5088. CThreaded threaded;
  5089. Owned<CRemoteFileServer> server;
  5090. Linked<ISocket> socket;
  5091. public:
  5092. CServerThread(CRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
  5093. {
  5094. threaded.init(this);
  5095. }
  5096. ~CServerThread()
  5097. {
  5098. threaded.join();
  5099. }
  5100. // IThreaded
  5101. virtual void threadmain() override
  5102. {
  5103. DAFSConnectCfg sslCfg = SSLNone;
  5104. server->run(sslCfg, socket, nullptr, nullptr);
  5105. }
  5106. };
  5107. Owned<IRemoteFileServer> server = createRemoteFileServer();
  5108. serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), CRemoteFileServer), socket.getClear()));
  5109. }
  5110. void testBasicFunctionality()
  5111. {
  5112. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5113. // create file
  5114. Owned<IFile> iFile = createIFile(filePath);
  5115. CPPUNIT_ASSERT(iFile);
  5116. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  5117. CPPUNIT_ASSERT(iFileIO);
  5118. // write out 1k of random data and crc
  5119. MemoryBuffer mb;
  5120. char *buf = (char *)mb.reserveTruncate(testLen);
  5121. for (unsigned b=0; b<1024; b++)
  5122. buf[b] = getRandom()%256;
  5123. CRC32 crc;
  5124. crc.tally(testLen, buf);
  5125. unsigned writeCrc = crc.get();
  5126. size32_t sz = iFileIO->write(0, testLen, buf);
  5127. CPPUNIT_ASSERT(sz == testLen);
  5128. // close file
  5129. iFileIO.clear();
  5130. // validate remote crc
  5131. CPPUNIT_ASSERT(writeCrc == iFile->getCRC());
  5132. // exists
  5133. CPPUNIT_ASSERT(iFile->exists());
  5134. // validate size
  5135. CPPUNIT_ASSERT(iFile->size() == testLen);
  5136. // read back and validate read data's crc against written
  5137. iFileIO.setown(iFile->open(IFOread));
  5138. CPPUNIT_ASSERT(iFileIO);
  5139. sz = iFileIO->read(0, testLen, buf);
  5140. iFileIO.clear();
  5141. CPPUNIT_ASSERT(sz == testLen);
  5142. crc.reset();
  5143. crc.tally(testLen, buf);
  5144. CPPUNIT_ASSERT(writeCrc == crc.get());
  5145. }
  5146. void testCopy()
  5147. {
  5148. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5149. Owned<IFile> iFile = createIFile(filePath);
  5150. // test file copy
  5151. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  5152. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  5153. iFile->copyTo(iFile1Copy);
  5154. // read back copy and validate read data's crc against written
  5155. Owned<IFileIO> iFileIO = iFile1Copy->open(IFOreadwrite); // open read/write for appendFile in next step.
  5156. CPPUNIT_ASSERT(iFileIO);
  5157. MemoryBuffer mb;
  5158. char *buf = (char *)mb.reserveTruncate(testLen);
  5159. size32_t sz = iFileIO->read(0, testLen, buf);
  5160. CPPUNIT_ASSERT(sz == testLen);
  5161. CRC32 crc;
  5162. crc.tally(testLen, buf);
  5163. CPPUNIT_ASSERT(iFile->getCRC() == crc.get());
  5164. // check appendFile functionality. NB after this "file1copy" should be 2*testLen
  5165. CPPUNIT_ASSERT(testLen == iFileIO->appendFile(iFile));
  5166. iFileIO.clear();
  5167. // validate new size
  5168. CPPUNIT_ASSERT(iFile1Copy->size() == 2 * testLen);
  5169. // setSize test, truncate copy to original size
  5170. iFileIO.setown(iFile1Copy->open(IFOreadwrite));
  5171. iFileIO->setSize(testLen);
  5172. // validate new size
  5173. CPPUNIT_ASSERT(iFile1Copy->size() == testLen);
  5174. }
  5175. void testOther()
  5176. {
  5177. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5178. Owned<IFile> iFile = createIFile(filePath);
  5179. // rename
  5180. iFile->rename("file2");
  5181. // create a directory
  5182. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5183. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5184. subDirIFile->createDirectory();
  5185. // check isDirectory result
  5186. CPPUNIT_ASSERT(subDirIFile->isDirectory()==fileBool::foundYes);
  5187. // move previous created and renamed file into new sub-directory
  5188. // ensure not present before move
  5189. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  5190. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  5191. iFile2->remove();
  5192. iFile->move(subDirFilePath);
  5193. // open sub-directory file2 explicitly
  5194. RemoteFilename rfn;
  5195. rfn.setRemotePath(subDirPath.str());
  5196. Owned<IFile> dir = createIFile(rfn);
  5197. Owned<IDirectoryIterator> diriter = dir->directoryFiles("file2");
  5198. if (!diriter->first())
  5199. {
  5200. CPPUNIT_ASSERT_MESSAGE("Error, file2 diriter->first() is null", 0);
  5201. }
  5202. Linked<IFile> iFile3 = &diriter->query();
  5203. diriter.clear();
  5204. dir.clear();
  5205. OwnedIFileIO iFile3IO = iFile3->openShared(IFOread, IFSHfull);
  5206. if (!iFile3IO)
  5207. {
  5208. CPPUNIT_ASSERT_MESSAGE("Error, file2 openShared() failed", 0);
  5209. }
  5210. iFile3IO->close();
  5211. // count sub-directory files with a wildcard
  5212. unsigned count=0;
  5213. Owned<IDirectoryIterator> iter = subDirIFile->directoryFiles("*2");
  5214. ForEach(*iter)
  5215. ++count;
  5216. CPPUNIT_ASSERT(1 == count);
  5217. // check isFile result
  5218. CPPUNIT_ASSERT(iFile2->isFile()==fileBool::foundYes);
  5219. // validate isReadOnly before after setting
  5220. CPPUNIT_ASSERT(iFile2->isReadOnly()==fileBool::foundNo);
  5221. iFile2->setReadOnly(true);
  5222. CPPUNIT_ASSERT(iFile2->isReadOnly()==fileBool::foundYes);
  5223. // get/set Time and validate result
  5224. CDateTime createTime, modifiedTime, accessedTime;
  5225. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  5226. CDateTime newModifiedTime = modifiedTime;
  5227. newModifiedTime.adjustTime(-86400); // -1 day
  5228. CPPUNIT_ASSERT(subDirIFile->setTime(&createTime, &newModifiedTime, &accessedTime));
  5229. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  5230. CPPUNIT_ASSERT(modifiedTime == newModifiedTime);
  5231. // test set file permissions
  5232. try
  5233. {
  5234. iFile2->setFilePermissions(0777);
  5235. }
  5236. catch (...)
  5237. {
  5238. CPPUNIT_ASSERT_MESSAGE("iFile2->setFilePermissions() exception", 0);
  5239. }
  5240. }
  5241. void testConfiguration()
  5242. {
  5243. SocketEndpoint ep(serverPort); // test trace open connections
  5244. CPPUNIT_ASSERT(setDafileSvrTraceFlags(ep, 0x08));
  5245. StringBuffer infoStr;
  5246. CPPUNIT_ASSERT(RFEnoerror == getDafileSvrInfo(ep, 10, infoStr));
  5247. CPPUNIT_ASSERT(RFEnoerror == setDafileSvrThrottleLimit(ep, ThrottleStd, DEFAULT_STDCMD_PARALLELREQUESTLIMIT+1, DEFAULT_STDCMD_THROTTLEDELAYMS+1, DEFAULT_STDCMD_THROTTLECPULIMIT+1, DEFAULT_STDCMD_THROTTLEQUEUELIMIT+1));
  5248. }
  5249. void testDirectoryMonitoring()
  5250. {
  5251. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5252. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5253. subDirIFile->createDirectory();
  5254. VStringBuffer filePath("%s/%s", subDirPath.str(), "file1");
  5255. class CDelayedFileCreate : implements IThreaded
  5256. {
  5257. CThreaded threaded;
  5258. StringAttr filePath;
  5259. Semaphore doneSem;
  5260. public:
  5261. CDelayedFileCreate(const char *_filePath) : filePath(_filePath), threaded("CDelayedFileCreate")
  5262. {
  5263. threaded.init(this);
  5264. }
  5265. ~CDelayedFileCreate()
  5266. {
  5267. stop();
  5268. }
  5269. void stop()
  5270. {
  5271. doneSem.signal();
  5272. threaded.join();
  5273. }
  5274. // IThreaded impl.
  5275. virtual void threadmain() override
  5276. {
  5277. MilliSleep(1000); // give monitorDirectory a chance to be monitoring
  5278. // create file
  5279. Owned<IFile> iFile = createIFile(filePath);
  5280. CPPUNIT_ASSERT(iFile);
  5281. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  5282. CPPUNIT_ASSERT(iFileIO);
  5283. iFileIO.clear();
  5284. doneSem.wait(60 * 1000);
  5285. CPPUNIT_ASSERT(iFile->remove());
  5286. }
  5287. } delayedFileCreate(filePath);
  5288. Owned<IDirectoryDifferenceIterator> iter = subDirIFile->monitorDirectory(nullptr, nullptr, false, false, 2000, 60 * 1000);
  5289. ForEach(*iter)
  5290. {
  5291. StringBuffer fname;
  5292. iter->getName(fname);
  5293. PROGLOG("fname = %s", fname.str());
  5294. }
  5295. delayedFileCreate.stop();
  5296. }
  5297. void testFinish()
  5298. {
  5299. // clearup
  5300. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  5301. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  5302. CPPUNIT_ASSERT(iFile1Copy->remove());
  5303. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5304. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  5305. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  5306. CPPUNIT_ASSERT(iFile2->remove());
  5307. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5308. CPPUNIT_ASSERT(subDirIFile->remove());
  5309. SocketEndpoint ep(serverPort);
  5310. Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
  5311. CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
  5312. serverThread.clear();
  5313. }
  5314. };
  5315. CPPUNIT_TEST_SUITE_REGISTRATION( RemoteFileSlowTest );
  5316. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RemoteFileSlowTest, "RemoteFileSlowTests" );
  5317. #endif // _USE_CPPUNIT