1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- // todo look at IRemoteFileServer stop
- #include "platform.h"
- #include "limits.h"
- #include "jlib.hpp"
- #include "jio.hpp"
- #include "jmutex.hpp"
- #include "jfile.hpp"
- #include "jmisc.hpp"
- #include "jthread.hpp"
- #include "jqueue.tpp"
- #include "securesocket.hpp"
- #include "portlist.h"
- #include "jsocket.hpp"
- #include "jencrypt.hpp"
- #include "jlzw.hpp"
- #include "jset.hpp"
- #include "jhtree.hpp"
- #include "dadfs.hpp"
- #include "remoteerr.hpp"
- #include <atomic>
- #include <string>
- #include <unordered_map>
- #include "rtldynfield.hpp"
- #include "rtlds_imp.hpp"
- #include "rtlread_imp.hpp"
- #include "rtlrecord.hpp"
- #include "eclhelper_dyn.hpp"
- #include "rtlcommon.hpp"
- #include "rtlformat.hpp"
- #include "jflz.hpp"
- #include "digisign.hpp"
- #include "dafdesc.hpp"
- #include "thorcommon.hpp"
- #include "csvsplitter.hpp"
- #include "thorxmlread.hpp"
- #include "dafscommon.hpp"
- #include "rmtfile.hpp"
- #include "rmtclient_impl.hpp"
- #include "dafsserver.hpp"
- using namespace cryptohelper;
- #define SOCKET_CACHE_MAX 500
- #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger)
- #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed
- #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day
- static const unsigned __int64 defaultFileStreamChooseNLimit = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
- static const unsigned __int64 defaultFileStreamSkipN = 0;
- static const unsigned defaultDaFSReplyLimitKB = 1024; // 1MB
- enum OutputFormat:byte { outFmt_Binary, outFmt_Xml, outFmt_Json };
- ///////////////////////////
- static unsigned maxConnectTime = 0;
- static unsigned maxReceiveTime = 0;
- //Security and default port attributes
- static class _securitySettings
- {
- public:
- DAFSConnectCfg connectMethod;
- unsigned short daFileSrvPort;
- unsigned short daFileSrvSSLPort;
- const char * certificate;
- const char * privateKey;
- const char * passPhrase;
- _securitySettings()
- {
- queryDafsSecSettings(&connectMethod, &daFileSrvPort, &daFileSrvSSLPort, &certificate, &privateKey, &passPhrase);
- }
- } securitySettings;
- static CriticalSection secureContextCrit;
- static Owned<ISecureSocketContext> secureContextServer;
- static Owned<ISecureSocketContext> secureContextClient;
- #ifdef _USE_OPENSSL
- static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
- {
- {
- CriticalBlock b(secureContextCrit);
- if (type == ServerSocket)
- {
- if (!secureContextServer)
- secureContextServer.setown(createSecureSocketContextEx(securitySettings.certificate, securitySettings.privateKey, securitySettings.passPhrase, type));
- }
- else if (!secureContextClient)
- secureContextClient.setown(createSecureSocketContext(type));
- }
- int loglevel = SSLogNormal;
- #ifdef _DEBUG
- loglevel = SSLogMax;
- #endif
- if (type == ServerSocket)
- return secureContextServer->createSecureSocket(sock, loglevel);
- else
- return secureContextClient->createSecureSocket(sock, loglevel);
- }
- #else
- static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
- {
- throwUnexpected();
- }
- #endif
- struct sRFTM
- {
- CTimeMon *timemon;
- sRFTM(unsigned limit) { timemon = limit ? new CTimeMon(limit) : NULL; }
- ~sRFTM() { delete timemon; }
- };
- const char *remoteServerVersionString() { return DAFILESRV_VERSIONSTRING; }
- #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
- #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
- #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
- // (increased when waiting for large block)
- #define RFCText(cmd) #cmd
- const char *RFCStrings[] =
- {
- RFCText(RFCopenIO),
- RFCText(RFCcloseIO),
- RFCText(RFCread),
- RFCText(RFCwrite),
- RFCText(RFCsize),
- RFCText(RFCexists),
- RFCText(RFCremove),
- RFCText(RFCrename),
- RFCText(RFCgetver),
- RFCText(RFCisfile),
- RFCText(RFCisdirectory),
- RFCText(RFCisreadonly),
- RFCText(RFCsetreadonly),
- RFCText(RFCgettime),
- RFCText(RFCsettime),
- RFCText(RFCcreatedir),
- RFCText(RFCgetdir),
- RFCText(RFCstop),
- RFCText(RFCexec),
- RFCText(RFCdummy1),
- RFCText(RFCredeploy),
- RFCText(RFCgetcrc),
- RFCText(RFCmove),
- RFCText(RFCsetsize),
- RFCText(RFCextractblobelements),
- RFCText(RFCcopy),
- RFCText(RFCappend),
- RFCText(RFCmonitordir),
- RFCText(RFCsettrace),
- RFCText(RFCgetinfo),
- RFCText(RFCfirewall),
- RFCText(RFCunlock),
- RFCText(RFCunlockreply),
- RFCText(RFCinvalid),
- RFCText(RFCcopysection),
- RFCText(RFCtreecopy),
- RFCText(RFCtreecopytmp),
- RFCText(RFCsetthrottle), // legacy version
- RFCText(RFCsetthrottle2),
- RFCText(RFCsetfileperms),
- RFCText(RFCreadfilteredindex),
- RFCText(RFCreadfilteredcount),
- RFCText(RFCreadfilteredblob),
- RFCText(RFCStreamRead),
- RFCText(RFCStreamReadTestSocket),
- };
- static const char *getRFCText(RemoteFileCommandType cmd)
- {
- if (cmd==RFCStreamReadJSON)
- return "RFCStreamReadJSON";
- else
- {
- unsigned elems = sizeof(RFCStrings) / sizeof(RFCStrings[0]);
- if (cmd >= elems)
- return "RFCunknown";
- return RFCStrings[cmd];
- }
- }
- static const char *getRFSERRText(unsigned err)
- {
- switch (err)
- {
- case RFSERR_InvalidCommand:
- return "RFSERR_InvalidCommand";
- case RFSERR_NullFileIOHandle:
- return "RFSERR_NullFileIOHandle";
- case RFSERR_InvalidFileIOHandle:
- return "RFSERR_InvalidFileIOHandle";
- case RFSERR_TimeoutFileIOHandle:
- return "RFSERR_TimeoutFileIOHandle";
- case RFSERR_OpenFailed:
- return "RFSERR_OpenFailed";
- case RFSERR_ReadFailed:
- return "RFSERR_ReadFailed";
- case RFSERR_WriteFailed:
- return "RFSERR_WriteFailed";
- case RFSERR_RenameFailed:
- return "RFSERR_RenameFailed";
- case RFSERR_ExistsFailed:
- return "RFSERR_ExistsFailed";
- case RFSERR_RemoveFailed:
- return "RFSERR_RemoveFailed";
- case RFSERR_CloseFailed:
- return "RFSERR_CloseFailed";
- case RFSERR_IsFileFailed:
- return "RFSERR_IsFileFailed";
- case RFSERR_IsDirectoryFailed:
- return "RFSERR_IsDirectoryFailed";
- case RFSERR_IsReadOnlyFailed:
- return "RFSERR_IsReadOnlyFailed";
- case RFSERR_SetReadOnlyFailed:
- return "RFSERR_SetReadOnlyFailed";
- case RFSERR_GetTimeFailed:
- return "RFSERR_GetTimeFailed";
- case RFSERR_SetTimeFailed:
- return "RFSERR_SetTimeFailed";
- case RFSERR_CreateDirFailed:
- return "RFSERR_CreateDirFailed";
- case RFSERR_GetDirFailed:
- return "RFSERR_GetDirFailed";
- case RFSERR_GetCrcFailed:
- return "RFSERR_GetCrcFailed";
- case RFSERR_MoveFailed:
- return "RFSERR_MoveFailed";
- case RFSERR_ExtractBlobElementsFailed:
- return "RFSERR_ExtractBlobElementsFailed";
- case RFSERR_CopyFailed:
- return "RFSERR_CopyFailed";
- case RFSERR_AppendFailed:
- return "RFSERR_AppendFailed";
- case RFSERR_AuthenticateFailed:
- return "RFSERR_AuthenticateFailed";
- case RFSERR_CopySectionFailed:
- return "RFSERR_CopySectionFailed";
- case RFSERR_TreeCopyFailed:
- return "RFSERR_TreeCopyFailed";
- case RAERR_InvalidUsernamePassword:
- return "RAERR_InvalidUsernamePassword";
- case RFSERR_MasterSeemsToHaveDied:
- return "RFSERR_MasterSeemsToHaveDied";
- case RFSERR_TimeoutWaitSlave:
- return "RFSERR_TimeoutWaitSlave";
- case RFSERR_TimeoutWaitConnect:
- return "RFSERR_TimeoutWaitConnect";
- case RFSERR_TimeoutWaitMaster:
- return "RFSERR_TimeoutWaitMaster";
- case RFSERR_NoConnectSlave:
- return "RFSERR_NoConnectSlave";
- case RFSERR_NoConnectSlaveXY:
- return "RFSERR_NoConnectSlaveXY";
- case RFSERR_VersionMismatch:
- return "RFSERR_VersionMismatch";
- case RFSERR_SetThrottleFailed:
- return "RFSERR_SetThrottleFailed";
- case RFSERR_MaxQueueRequests:
- return "RFSERR_MaxQueueRequests";
- case RFSERR_KeyIndexFailed:
- return "RFSERR_MaxQueueRequests";
- case RFSERR_StreamReadFailed:
- return "RFSERR_StreamReadFailed";
- case RFSERR_InternalError:
- return "Internal Error";
- }
- return "RFSERR_Unknown";
- }
- #define ThrottleText(throttleClass) #throttleClass
- const char *ThrottleStrings[] =
- {
- ThrottleText(ThrottleStd),
- ThrottleText(ThrottleSlow),
- };
- // very high upper limits that configure can't exceed
- #define THROTTLE_MAX_LIMIT 1000000
- #define THROTTLE_MAX_DELAYMS 3600000
- #define THROTTLE_MAX_CPUTHRESHOLD 100
- #define THROTTLE_MAX_QUEUELIMIT 10000000
- static const char *getThrottleClassText(ThrottleClass throttleClass) { return ThrottleStrings[throttleClass]; }
- //---------------------------------------------------------------------------
- // TreeCopy
- #define TREECOPY_CACHE_SIZE 50
- struct CTreeCopyItem: public CInterface
- {
- StringAttr net;
- StringAttr mask;
- offset_t sz; // original size
- CDateTime dt; // original date
- RemoteFilenameArray loc; // locations for file - 0 is original
- Owned<IBitSet> busy;
- unsigned lastused;
- CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
- : net(_net), mask(_mask)
- {
- loc.append(orig);
- dt.set(_dt);
- sz = _sz;
- busy.setown(createThreadSafeBitSet());
- lastused = msTick();
- }
- bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
- {
- if (!orig.equals(loc.item(0)))
- return false;
- if (strcmp(_net,net)!=0)
- return false;
- if (strcmp(_mask,mask)!=0)
- return false;
- if (sz!=_sz)
- return false;
- return (dt.equals(_dt,false));
- }
- };
- static CIArrayOf<CTreeCopyItem> treeCopyArray;
- static CriticalSection treeCopyCrit;
- static unsigned treeCopyWaiting=0;
- static Semaphore treeCopySem;
- /////////////////////////
- //====================================================================================================
- class CAsyncCommandManager
- {
- class CAsyncJob: public CInterface
- {
- class cThread: public Thread
- {
- CAsyncJob *parent;
- public:
- cThread(CAsyncJob *_parent)
- : Thread("CAsyncJob")
- {
- parent = _parent;
- }
- int run()
- {
- int ret = -1;
- try {
- ret = parent->run();
- parent->setDone();
- }
- catch (IException *e)
- {
- parent->setException(e);
- }
- parent->signal();
- return ret;
- }
- } *thread;
- StringAttr uuid;
- CAsyncCommandManager &parent;
- public:
- CAsyncJob(CAsyncCommandManager &_parent, const char *_uuid)
- : uuid(_uuid), parent(_parent)
- {
- thread = new cThread(this);
- hash = hashc((const byte *)uuid.get(),uuid.length(),~0U);
- }
- ~CAsyncJob()
- {
- thread->join();
- thread->Release();
- }
- static void destroy(CAsyncJob *j)
- {
- j->Release();
- }
- void signal()
- {
- parent.signal();
- }
- void start()
- {
- parent.wait();
- thread->start();
- }
- void join()
- {
- thread->join();
- }
- static unsigned getHash(const char *key)
- {
- return hashc((const byte *)key,strlen(key),~0U);
- }
- static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; }
- unsigned hash;
- bool eq(const char *key)
- {
- return stricmp(key,uuid.get())==0;
- }
- virtual int run()=0;
- virtual void setException(IException *e)=0;
- virtual void setDone()=0;
- };
- class CAsyncCopySection: public CAsyncJob
- {
- Owned<IFile> src;
- RemoteFilename dst;
- offset_t toOfs;
- offset_t fromOfs;
- offset_t size;
- CFPmode mode; // not yet supported
- CriticalSection sect;
- offset_t done;
- offset_t total;
- Semaphore finished;
- AsyncCommandStatus status;
- Owned<IException> exc;
- public:
- CAsyncCopySection(CAsyncCommandManager &parent, const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size)
- : CAsyncJob(parent, _uuid)
- {
- status = ACScontinue;
- src.setown(createIFile(fromFile));
- dst.setRemotePath(toFile);
- toOfs = _toOfs;
- fromOfs = _fromOfs;
- size = _size;
- mode = CFPcontinue;
- done = 0;
- total = (offset_t)-1;
- }
- AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout)
- {
- if (timeout&&finished.wait(timeout))
- finished.signal(); // may need to call again
- CriticalBlock block(sect);
- if (exc)
- throw exc.getClear();
- _done = done;
- _total = total;
- return status;
- }
- int run()
- {
- class cProgress: implements ICopyFileProgress
- {
- CriticalSection §
- CFPmode &mode;
- offset_t &done;
- offset_t &total;
- public:
- cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode)
- : sect(_sect), mode(_mode), done(_done), total(_total)
- {
- }
- CFPmode onProgress(offset_t sizeDone, offset_t totalSize)
- {
- CriticalBlock block(sect);
- done = sizeDone;
- total = totalSize;
- return mode;
- }
- } progress(sect,total,done,mode);
- src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class
- return 0;
- }
- void setException(IException *e)
- {
- EXCLOG(e,"CAsyncCommandManager::CAsyncJob");
- CriticalBlock block(sect);
- if (exc.get())
- e->Release();
- else
- exc.setown(e);
- status = ACSerror;
- }
- void setDone()
- {
- CriticalBlock block(sect);
- finished.signal();
- status = ACSdone;
- }
- };
- CMinHashTable<CAsyncJob> jobtable;
- CriticalSection sect;
- Semaphore threadsem;
- unsigned limit;
- public:
- CAsyncCommandManager(unsigned _limit) : limit(_limit)
- {
- if (limit) // 0 == unbound
- threadsem.signal(limit); // max number of async jobs
- }
- void join()
- {
- CriticalBlock block(sect);
- unsigned i;
- CAsyncJob *j=jobtable.first(i);
- while (j) {
- j->join();
- j=jobtable.next(i);
- }
- }
- void signal()
- {
- if (limit)
- threadsem.signal();
- }
- void wait()
- {
- if (limit)
- threadsem.wait();
- }
- AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout)
- {
- // return 0 if continuing, 1 if done
- CAsyncCopySection * job;
- Linked<CAsyncJob> cjob;
- {
- CriticalBlock block(sect);
- cjob.set(jobtable.find(uuid,false));
- if (cjob) {
- job = QUERYINTERFACE(cjob.get(),CAsyncCopySection);
- if (!job) {
- throw MakeStringException(-1,"Async job ID mismatch");
- }
- }
- else {
- job = new CAsyncCopySection(*this, uuid, fromFile, toFile, toOfs, fromOfs, size);
- cjob.setown(job);
- jobtable.add(cjob.getLink());
- cjob->start();
- }
- }
- AsyncCommandStatus ret = ACSerror;
- Owned<IException> rete;
- try {
- ret = job->poll(done,total,timeout);
- }
- catch (IException * e) {
- rete.setown(e);
- }
- if ((ret!=ACScontinue)||rete.get()) {
- job->join();
- CriticalBlock block(sect);
- jobtable.remove(job);
- if (rete.get())
- throw rete.getClear();
- }
- return ret;
- }
- };
- //====================================================================================================
- inline void appendErr(MemoryBuffer &reply, unsigned e)
- {
- reply.append(e).append(getRFSERRText(e));
- }
- #define MAPCOMMAND(c,p) case c: { this->p(msg, reply) ; break; }
- #define MAPCOMMANDCLIENT(c,p,client) case c: { this->p(msg, reply, client); break; }
- #define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { testSocketFlag = true; this->p(msg, reply, client); break; }
- #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { this->p(msg, reply, client, throttler); break; }
- #define MAPCOMMANDSTATS(c,p,stats) case c: { this->p(msg, reply, stats); break; }
- #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { this->p(msg, reply, client, stats); break; }
- static unsigned ClientCount = 0;
- static unsigned MaxClientCount = 0;
- static CriticalSection ClientCountSect;
- #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
- class CClientStats : public CInterface
- {
- public:
- CClientStats(const char *_client) : client(_client), count(0), bRead(0), bWritten(0) { }
- const char *queryFindString() const { return client; }
- inline void addRead(unsigned __int64 len)
- {
- bRead += len;
- }
- inline void addWrite(unsigned __int64 len)
- {
- bWritten += len;
- }
- void getStatus(StringBuffer & info) const
- {
- info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
- client.get(), count, bRead.load(), bWritten.load()).newline();
- }
- StringAttr client;
- unsigned __int64 count;
- std::atomic<unsigned __int64> bRead;
- std::atomic<unsigned __int64> bWritten;
- };
- class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
- {
- typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
- CriticalSection crit;
- unsigned cmdStats[RFCmax];
- static int compareElement(void* const *ll, void* const *rr)
- {
- const CClientStats *l = (const CClientStats *) *ll;
- const CClientStats *r = (const CClientStats *) *rr;
- if (l->count == r->count)
- return 0;
- else if (l->count<r->count)
- return 1;
- else
- return -1;
- }
- public:
- CClientStatsTable()
- {
- memset(&cmdStats[0], 0, sizeof(cmdStats));
- }
- ~CClientStatsTable()
- {
- _releaseAll();
- }
- CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
- {
- CriticalBlock b(crit);
- CClientStats *stats = PARENT::find(client);
- if (!stats)
- {
- stats = new CClientStats(client);
- PARENT::replace(*stats);
- }
- if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
- cmdStats[cmd]++;
- ++stats->count;
- return LINK(stats);
- }
- StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
- {
- CriticalBlock b(crit);
- unsigned __int64 totalCmds = 0;
- for (unsigned c=0; c<RFCmax; c++)
- totalCmds += cmdStats[c];
- unsigned totalClients = PARENT::ordinality();
- info.appendf("Commands processed = %" I64F "u, unique clients = %u", totalCmds, totalClients);
- if (totalCmds)
- {
- info.append("Command stats:").newline();
- for (unsigned c=0; c<RFCmax; c++)
- {
- unsigned __int64 count = cmdStats[c];
- if (count)
- info.append(getRFCText(c)).append(": ").append(count).newline();
- }
- }
- if (totalClients)
- {
- SuperHashIteratorOf<CClientStats> iter(*this);
- PointerArrayOf<CClientStats> elements;
- ForEach(iter)
- {
- CClientStats &elem = iter.query();
- elements.append(&elem);
- }
- elements.sort(&compareElement);
- if (level < 10)
- {
- // list up to 10 clients ordered by # of commands processed
- unsigned max=elements.ordinality();
- if (max>10)
- max = 10; // cap
- info.append("Top 10 clients:").newline();
- for (unsigned e=0; e<max; e++)
- {
- const CClientStats &element = *elements.item(e);
- element.getStatus(info);
- }
- }
- else // list all
- {
- info.append("All clients:").newline();
- ForEachItemIn(e, elements)
- {
- const CClientStats &element = *elements.item(e);
- element.getStatus(info);
- }
- }
- }
- return info;
- }
- void reset()
- {
- CriticalBlock b(crit);
- memset(&cmdStats[0], 0, sizeof(cmdStats));
- kill();
- }
- };
- interface IRemoteReadActivity;
- interface IRemoteWriteActivity;
- interface IRemoteActivity : extends IInterface
- {
- virtual unsigned __int64 queryProcessed() const = 0;
- virtual IOutputMetaData *queryOutputMeta() const = 0;
- virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
- virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
- virtual void restoreCursor(MemoryBuffer &src) = 0;
- virtual bool isGrouped() const = 0;
- virtual IRemoteReadActivity *queryIsReadActivity() { return nullptr; }
- virtual IRemoteWriteActivity *queryIsWriteActivity() { return nullptr; }
- };
- interface IRemoteReadActivity : extends IRemoteActivity
- {
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &sz) = 0;
- virtual bool requiresPostProject() const = 0;
- };
- interface IRemoteWriteActivity : extends IRemoteActivity
- {
- virtual void write(size32_t sz, const void *row) = 0;
- };
- class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
- {
- int cursorHandle;
- OutputFormat format;
- unsigned __int64 replyLimit = defaultDaFSReplyLimitKB * 1024;
- Linked<IRemoteActivity> activity;
- Linked<ICompressor> compressor;
- Linked<IExpander> expander;
- MemoryBuffer expandMb;
- Owned<IXmlWriterExt> responseWriter; // for xml or json response
- bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
- {
- size32_t sz = inMb.length()-inPos;
- if (sz < replyLimit)
- return false;
- if (!compressor)
- return true;
- // consumes data from inMb into compressor
- totalSz += sz;
- const void *data = inMb.bytes()+inPos;
- assertex(compressor->write(data, sz) == sz);
- inMb.setLength(inPos);
- return compressMb.capacity() > replyLimit;
- }
- void processRead(IPropertyTree *requestTree, MemoryBuffer &responseMb)
- {
- IRemoteReadActivity *readActivity = activity->queryIsReadActivity();
- assertex(readActivity);
- MemoryBuffer compressMb;
- IOutputMetaData *outMeta = readActivity->queryOutputMeta();
- bool eoi=false;
- bool grouped = readActivity->isGrouped();
- MemoryBuffer resultBuffer;
- MemoryBufferBuilder outBuilder(resultBuffer, outMeta->getMinRecordSize());
- if (outFmt_Binary == format)
- {
- if (compressor)
- {
- compressMb.setEndian(__BIG_ENDIAN);
- compressMb.append(responseMb);
- }
- DelayedMarker<size32_t> dataLenMarker(compressor ? compressMb : responseMb); // uncompressed data size
- if (compressor)
- {
- size32_t initialSz = replyLimit >= 0x10000 ? 0x10000 : replyLimit;
- compressor->open(compressMb, initialSz);
- }
- outBuilder.setBuffer(responseMb); // write direct to responseMb buffer for efficiency
- unsigned __int64 numProcessedStart = readActivity->queryProcessed();
- size32_t totalDataSz = 0;
- size32_t dataStartPos = responseMb.length();
- if (grouped)
- {
- bool pastFirstRow = numProcessedStart>0;
- do
- {
- size32_t eogPos = 0;
- if (pastFirstRow)
- {
- /* this is for last row output, which might have been returned in the previous request
- * The eog marker may change as a result of the next row (see writeDirect() call below);
- */
- eogPos = responseMb.length();
- responseMb.append(false);
- }
- size32_t rowSz;
- const void *row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- if (!pastFirstRow)
- {
- eoi = true;
- break;
- }
- else
- {
- bool eog = true;
- responseMb.writeDirect(eogPos, sizeof(eog), &eog);
- row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- eoi = true;
- break;
- }
- }
- }
- pastFirstRow = true;
- }
- while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
- }
- else
- {
- do
- {
- size32_t rowSz;
- const void *row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- eoi = true;
- break;
- }
- }
- while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
- }
- // Consume any trailing data remaining
- if (compressor)
- {
- size32_t sz = responseMb.length()-dataStartPos;
- if (sz)
- {
- // consumes data built up in responseMb buffer into compressor
- totalDataSz += sz;
- const void *data = responseMb.bytes()+dataStartPos;
- assertex(compressor->write(data, sz) == sz);
- responseMb.setLength(dataStartPos);
- }
- }
- // finalize responseMb
- dataLenMarker.write(compressor ? totalDataSz : responseMb.length()-dataStartPos);
- DelayedSizeMarker cursorLenMarker(responseMb); // cursor length
- if (!eoi)
- readActivity->serializeCursor(responseMb);
- cursorLenMarker.write();
- if (compressor)
- {
- // consume cursor into compressor
- size32_t sz = responseMb.length()-dataStartPos;
- const void *data = responseMb.bytes()+dataStartPos;
- assertex(compressor->write(data, sz) == sz);
- compressor->close();
- // now ready to swap compressed output into responseMb
- responseMb.swapWith(compressMb);
- }
- }
- else
- {
- responseWriter->outputBeginArray("Row");
- if (grouped)
- {
- bool pastFirstRow = readActivity->queryProcessed()>0;
- bool first = true;
- do
- {
- size32_t rowSz;
- const void *row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- if (!pastFirstRow)
- {
- eoi = true;
- break;
- }
- else
- {
- row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- eoi = true;
- break;
- }
- if (first) // possible if eog was 1st row on next packet
- responseWriter->outputBeginNested("Row", false);
- responseWriter->outputBool(true, "dfs:Eog"); // field name cannot clash with an ecl field name
- }
- }
- if (pastFirstRow)
- responseWriter->outputEndNested("Row"); // close last row
- responseWriter->outputBeginNested("Row", false);
- outMeta->toXML((const byte *)row, *responseWriter);
- resultBuffer.clear();
- pastFirstRow = true;
- first = false;
- }
- while (responseWriter->length() < replyLimit);
- if (pastFirstRow)
- responseWriter->outputEndNested("Row"); // close last row
- }
- else
- {
- do
- {
- size32_t rowSz;
- const void *row = readActivity->nextRow(outBuilder, rowSz);
- if (!row)
- {
- eoi = true;
- break;
- }
- responseWriter->outputBeginNested("Row", false);
- outMeta->toXML((const byte *)row, *responseWriter);
- responseWriter->outputEndNested("Row");
- resultBuffer.clear();
- }
- while (responseWriter->length() < replyLimit);
- }
- responseWriter->outputEndArray("Row");
- if (!eoi)
- {
- MemoryBuffer cursorMb;
- cursorMb.setEndian(__BIG_ENDIAN);
- readActivity->serializeCursor(cursorMb);
- StringBuffer cursorBinStr;
- JBASE64_Encode(cursorMb.toByteArray(), cursorMb.length(), cursorBinStr);
- responseWriter->outputString(cursorBinStr.length(), cursorBinStr.str(), "cursorBin");
- }
- }
- }
- void processWrite(IPropertyTree *requestTree, MemoryBuffer &rowDataMb, MemoryBuffer &responseMb)
- {
- IRemoteWriteActivity *writeActivity = activity->queryIsWriteActivity();
- assertex(writeActivity);
- /* row data is in serialized disk format already, and do not need to look at individual rows
- * so simply dump to disk
- */
- size32_t rowDataSz;
- rowDataMb.read(rowDataSz);
- const void *rowData;
- if (expander)
- {
- rowDataSz = expander->init(rowDataMb.readDirect(rowDataSz));
- expandMb.clear().reserve(rowDataSz);
- expander->expand(expandMb.bufferBase());
- rowData = expandMb.bufferBase();
- }
- else
- rowData = rowDataMb.readDirect(rowDataSz);
- writeActivity->write(rowDataSz, rowData);
- }
- public:
- CRemoteRequest(int _cursorHandle, OutputFormat _format, ICompressor *_compressor, IExpander *_expander, IRemoteActivity *_activity)
- : cursorHandle(_cursorHandle), format(_format), compressor(_compressor), expander(_expander), activity(_activity)
- {
- if (outFmt_Binary != format)
- {
- responseWriter.setown(createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == format ? WTStandard : WTJSONObject));
- responseWriter->outputBeginNested("Response", true);
- if (outFmt_Xml == format)
- responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
- responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
- }
- }
- OutputFormat queryFormat() const { return format; }
- unsigned __int64 queryReplyLimit() const { return replyLimit; }
- IRemoteActivity *queryActivity() const { return activity; }
- ICompressor *queryCompressor() const { return compressor; }
- void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb)
- {
- if (requestTree->hasProp("replyLimit"))
- replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
- if (outFmt_Binary == format)
- responseMb.append(cursorHandle);
- else // outFmt_Xml || outFmt_Json
- responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
- if (requestTree->hasProp("cursorBin")) // use handle if one provided
- {
- MemoryBuffer cursorMb;
- cursorMb.setEndian(__BIG_ENDIAN);
- JBASE64_Decode(requestTree->queryProp("cursorBin"), cursorMb);
- activity->restoreCursor(cursorMb);
- }
- if (activity->queryIsReadActivity())
- processRead(requestTree, responseMb);
- else if (activity->queryIsWriteActivity())
- processWrite(requestTree, restMb, responseMb);
- if (outFmt_Binary != format)
- {
- responseWriter->outputEndNested("Response");
- responseWriter->finalize();
- PROGLOG("Response: %s", responseWriter->str());
- responseMb.append(responseWriter->length(), responseWriter->str());
- }
- }
- };
- enum OpenFileFlag { of_null=0x0, of_key=0x01 };
- struct OpenFileInfo
- {
- OpenFileInfo() { }
- OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
- OpenFileInfo(int _handle, CRemoteRequest *_remoteRequest, StringAttrItem *_filename)
- : handle(_handle), remoteRequest(_remoteRequest), filename(_filename) { }
- Linked<IFileIO> fileIO;
- Linked<CRemoteRequest> remoteRequest;
- Linked<StringAttrItem> filename; // for debug
- int handle = 0;
- unsigned flags = 0;
- };
- static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName, bool grouped)
- {
- IPropertyTree *json = actNode.queryPropTree(typePropName);
- if (json)
- return createTypeInfoOutputMetaData(*json, grouped);
- else
- {
- StringBuffer binTypePropName(typePropName);
- const char *jsonBin = actNode.queryProp(binTypePropName.append("Bin"));
- if (!jsonBin)
- return nullptr;
- MemoryBuffer mb;
- JBASE64_Decode(jsonBin, mb);
- return createTypeInfoOutputMetaData(mb, grouped);
- }
- }
- class CRemoteDiskBaseActivity : public CSimpleInterfaceOf<IRemoteReadActivity>, implements IVirtualFieldCallback
- {
- typedef CSimpleInterfaceOf<IRemoteReadActivity> PARENT;
- protected:
- StringAttr fileName; // physical filename
- Linked<IOutputMetaData> inMeta, outMeta;
- unsigned __int64 processed = 0;
- bool outputGrouped = false;
- bool opened = false;
- bool eofSeen = false;
- const RtlRecord *record = nullptr;
- RowFilter filters;
- RtlDynRow *filterRow = nullptr;
- // virtual field values
- StringAttr logicalFilename;
- unsigned numInputFields = 0;
- inline bool fieldFilterMatch(const void * buffer)
- {
- if (filterRow)
- {
- filterRow->setRow(buffer, filters.getNumFieldsRequired());
- return filters.matches(*filterRow);
- }
- else
- return true;
- }
- public:
- IMPLEMENT_IINTERFACE_USING(PARENT);
- CRemoteDiskBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
- {
- fileName.set(config.queryProp("fileName"));
- if (isEmptyString(fileName))
- throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteDiskBaseActivity: fileName missing");
- logicalFilename.set(config.queryProp("virtualFields/logicalFilename"));
- }
- ~CRemoteDiskBaseActivity()
- {
- delete filterRow;
- }
- void setupInputMeta(const IPropertyTree &config, IOutputMetaData *_inMeta)
- {
- inMeta.setown(_inMeta);
- record = &inMeta->queryRecordAccessor(true);
- numInputFields = record->getNumFields();
- if (config.hasProp("keyFilter"))
- {
- filterRow = new RtlDynRow(*record);
- Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
- ForEach(*filterIter)
- filters.addFilter(*record, filterIter->query().queryProp(nullptr));
- }
- }
- // IRemoteReadActivity impl.
- virtual unsigned __int64 queryProcessed() const override
- {
- return processed;
- }
- virtual IOutputMetaData *queryOutputMeta() const override
- {
- return outMeta;
- }
- virtual bool isGrouped() const override
- {
- return outputGrouped;
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- throwUnexpected();
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- throwUnexpected();
- }
- virtual IRemoteReadActivity *queryIsReadActivity()
- {
- return this;
- }
- virtual bool requiresPostProject() const override
- {
- return false;
- }
- //interface IVirtualFieldCallback
- virtual const char * queryLogicalFilename(const void * row) override
- {
- return logicalFilename.str();
- }
- virtual unsigned __int64 getFilePosition(const void * row) override
- {
- throwUnexpected();
- }
- virtual unsigned __int64 getLocalFilePosition(const void * row) override
- {
- throwUnexpected();
- }
- virtual const byte * lookupBlob(unsigned __int64 id) override
- {
- throwUnexpected();
- }
- };
- class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity
- {
- typedef CRemoteDiskBaseActivity PARENT;
- protected:
- Owned<ISerialStream> inputStream;
- Owned<IFileIO> iFileIO;
- unsigned __int64 chooseN = 0;
- unsigned __int64 startPos = 0;
- bool compressed = false;
- bool cursorDirty = false;
- // virtual field values
- unsigned partNum = 0;
- offset_t baseFpos = 0;
- virtual bool refreshCursor()
- {
- if (inputStream->tell() != startPos)
- {
- inputStream->reset(startPos);
- return true;
- }
- return false;
- }
- bool checkOpen() // NB: returns true if this call opened file
- {
- if (opened)
- {
- if (!cursorDirty)
- return false;
- refreshCursor();
- eofSeen = false;
- cursorDirty = false;
- return false;
- }
- cursorDirty = false;
- OwnedIFile iFile = createIFile(fileName);
- assertex(iFile);
- iFileIO.setown(createCompressedFileReader(iFile));
- if (iFileIO)
- {
- if (!compressed)
- {
- WARNLOG("meta info did not mark file '%s' as compressed, but detected file as compressed", fileName.get());
- compressed = true;
- }
- }
- else
- {
- iFileIO.setown(iFile->open(IFOread));
- if (!iFileIO)
- throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Failed to open: '%s'", fileName.get());
- if (compressed)
- {
- WARNLOG("meta info marked file '%s' as compressed, but detected file as uncompressed", fileName.get());
- compressed = false;
- }
- }
- inputStream.setown(createFileSerialStream(iFileIO, startPos));
- opened = true;
- eofSeen = false;
- return true;
- }
- void close()
- {
- iFileIO.clear();
- opened = false;
- eofSeen = true;
- }
- public:
- CRemoteStreamReadBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- compressed = config.getPropBool("compressed");
- chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
- partNum = config.getPropInt("virtualFields/partNum");
- baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
- }
- // IVirtualFieldCallback impl.
- virtual unsigned __int64 getFilePosition(const void * row) override
- {
- return inputStream->tell() + baseFpos;
- }
- virtual unsigned __int64 getLocalFilePosition(const void * row) override
- {
- return makeLocalFposOffset(partNum, inputStream->tell());
- }
- };
- class CRemoteDiskReadActivity : public CRemoteStreamReadBaseActivity
- {
- typedef CRemoteStreamReadBaseActivity PARENT;
- CThorContiguousRowBuffer prefetchBuffer;
- Owned<ISourceRowPrefetcher> prefetcher;
- bool inputGrouped = false;
- bool eogPending = false;
- bool someInGroup = false;
- Owned<const IDynamicTransform> translator;
- virtual bool refreshCursor() override
- {
- if (prefetchBuffer.tell() != startPos)
- {
- inputStream->reset(startPos);
- prefetchBuffer.clearStream();
- prefetchBuffer.setStream(inputStream);
- return true;
- }
- return false;
- }
- bool checkOpen()
- {
- if (!PARENT::checkOpen()) // returns true if it opened file
- return false;
- prefetchBuffer.setStream(inputStream);
- prefetcher.setown(inMeta->createDiskPrefetcher());
- return true;
- }
- public:
- CRemoteDiskReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc), prefetchBuffer(nullptr)
- {
- inputGrouped = config.getPropBool("inputGrouped", false);
- setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", inputGrouped));
- outputGrouped = config.getPropBool("outputGrouped", false);
- if (!inputGrouped && outputGrouped)
- outputGrouped = false; // perhaps should fire error
- outMeta.setown(getTypeInfoOutputMetaData(config, "output", outputGrouped));
- if (!outMeta)
- outMeta.set(inMeta);
- translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
- }
- // IRemoteReadActivity impl.
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- if (eogPending || eofSeen)
- {
- eogPending = false;
- someInGroup = false;
- retSz = 0;
- return nullptr;
- }
- checkOpen();
- while (!eofSeen && (processed < chooseN))
- {
- while (!prefetchBuffer.eos())
- {
- prefetcher->readAhead(prefetchBuffer);
- size32_t inputRowSz = prefetchBuffer.queryRowSize();
- bool eog = false;
- if (inputGrouped)
- {
- prefetchBuffer.skip(sizeof(eog));
- if (outputGrouped)
- {
- byte b = *(prefetchBuffer.queryRow()+inputRowSz);
- memcpy(&eog, prefetchBuffer.queryRow()+inputRowSz, sizeof(eog));
- }
- }
- const byte *next = prefetchBuffer.queryRow();
- size32_t rowSz; // use local var instead of reference param for efficiency
- if (fieldFilterMatch(next))
- rowSz = translator->translate(outBuilder, *this, next);
- else
- rowSz = 0;
- prefetchBuffer.finishedRow();
- const void *ret = outBuilder.getSelf();
- outBuilder.finishRow(rowSz);
- if (rowSz)
- {
- processed++;
- eogPending = eog;
- someInGroup = true;
- retSz = rowSz;
- return ret;
- }
- else if (eog)
- {
- eogPending = false;
- if (someInGroup)
- {
- someInGroup = false;
- return nullptr;
- }
- }
- }
- eofSeen = true;
- }
- close();
- retSz = 0;
- return nullptr;
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- tgt.append(prefetchBuffer.tell());
- tgt.append(processed);
- tgt.append(someInGroup);
- tgt.append(eogPending);
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- cursorDirty = true;
- src.read(startPos);
- src.read(processed);
- src.read(someInGroup);
- src.read(eogPending);
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("diskread[%s]", fileName.get());
- }
- //interface IVirtualFieldCallback
- virtual unsigned __int64 getFilePosition(const void * row) override
- {
- return prefetchBuffer.tell() + baseFpos;
- }
- };
- class CRemoteExternalFormatReadActivity : public CRemoteStreamReadBaseActivity
- {
- typedef CRemoteStreamReadBaseActivity PARENT;
- protected:
- Owned<const IDynamicFieldValueFetcher> fieldFetcher;
- Owned<const IDynamicTransform> translator;
- bool postProject = false;
- public:
- CRemoteExternalFormatReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
- outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
- const RtlRecord *outRecord = record;
- if (filterRow)
- {
- if (outMeta)
- postProject = true;
- outMeta.set(inMeta);
- }
- else
- {
- if (outMeta)
- outRecord = &outMeta->queryRecordAccessor(true);
- else
- outMeta.set(inMeta);
- }
- translator.setown(createRecordTranslatorViaCallback(*outRecord, *record));
- }
- virtual bool requiresPostProject() const override
- {
- return postProject;
- }
- };
- class CNullNestedRowIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
- {
- public:
- virtual bool first() override { return false; }
- virtual bool next() override { return false; }
- virtual bool isValid() override { return false; }
- virtual IDynamicFieldValueFetcher &query() override
- {
- throwUnexpected();
- }
- } nullNestedRowIterator;
- class CRemoteCsvReadActivity : public CRemoteExternalFormatReadActivity
- {
- typedef CRemoteExternalFormatReadActivity PARENT;
- StringBuffer csvQuote, csvSeparate, csvTerminate, csvEscape;
- unsigned __int64 headerLines = 0;
- unsigned __int64 maxRowSize = 0;
- bool preserveWhitespace = false;
- CSVSplitter csvSplitter;
- class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
- {
- CSVSplitter &csvSplitter;
- unsigned numInputFields;
- public:
- CFieldFetcher(CSVSplitter &_csvSplitter, unsigned _numInputFields) : csvSplitter(_csvSplitter), numInputFields(_numInputFields)
- {
- }
- virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
- {
- dbgassertex(fieldNum < numInputFields);
- sz = csvSplitter.queryLengths()[fieldNum];
- return csvSplitter.queryData()[fieldNum];
- }
- virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
- {
- return LINK(&nullNestedRowIterator);
- }
- virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
- virtual size32_t getRecordSize() const override { throwUnexpected(); }
- };
- bool checkOpen()
- {
- if (!PARENT::checkOpen())
- return false;
- csvSplitter.init(numInputFields, maxRowSize, csvQuote, csvSeparate, csvTerminate, csvEscape, preserveWhitespace);
- if (headerLines)
- {
- do
- {
- size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
- if (0 == lineLength)
- break;
- inputStream->skip(lineLength);
- }
- while (--headerLines);
- }
- if (!fieldFetcher)
- fieldFetcher.setown(new CFieldFetcher(csvSplitter, numInputFields));
- return true;
- }
- const unsigned defaultMaxCsvRowSize = 10; // MB
- public:
- CRemoteCsvReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- maxRowSize = config.getPropInt64("ActivityOptions/maxRowSize", defaultMaxCsvRowSize) * 1024 * 1024;
- preserveWhitespace = config.getPropBool("ActivityOptions/preserveWhitespace");
- if (!config.getProp("ActivityOptions/csvQuote", csvQuote))
- {
- if (!fileDesc->queryProperties().getProp("@csvQuote", csvQuote))
- csvQuote.append("\"");
- }
- if (!config.getProp("ActivityOptions/csvSeparate", csvSeparate))
- {
- if (!fileDesc->queryProperties().getProp("@csvSeparate", csvSeparate))
- csvSeparate.append("\\,");
- }
- if (!config.getProp("ActivityOptions/csvTerminate", csvTerminate))
- {
- if (!fileDesc->queryProperties().getProp("@csvTerminate", csvTerminate))
- csvTerminate.append("\\n,\\r\\n");
- }
- if (!config.getProp("ActivityOptions/csvEscape", csvEscape))
- fileDesc->queryProperties().getProp("@csvEscape", csvEscape);
- headerLines = config.getPropInt64("ActivityOptions/headerLines"); // really this should be a published attribute too
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("csvread[%s]", fileName.get());
- }
- // IRemoteReadActivity impl.
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- if (eofSeen)
- {
- retSz = 0;
- return nullptr;
- }
- checkOpen();
- while (!eofSeen && (processed < chooseN))
- {
- size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
- if (!lineLength)
- break;
- retSz = translator->translate(outBuilder, *this, *fieldFetcher);
- dbgassertex(retSz);
- const void *ret = outBuilder.getSelf();
- if (fieldFilterMatch(ret))
- {
- outBuilder.finishRow(retSz);
- ++processed;
- inputStream->skip(lineLength);
- return ret;
- }
- else
- outBuilder.removeBytes(retSz);
- inputStream->skip(lineLength);
- }
- eofSeen = true;
- close();
- retSz = 0;
- return nullptr;
- }
- };
- class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, implements IXMLSelect
- {
- typedef CRemoteExternalFormatReadActivity PARENT;
- ThorActivityKind kind;
- IXmlToRowTransformer *xmlTransformer;
- Linked<IColumnProvider> lastMatch;
- Owned<IXMLParse> xmlParser;
- bool noRoot = false;
- bool useXmlContents = false;
- // JCSMORE - it would be good if these were cached/reused (can I assume anything using fetcher is single threaded?)
- class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
- {
- unsigned numInputFields;
- const RtlRecord &recInfo;
- Linked<IColumnProvider> currentMatch;
- const char **compoundXPaths = nullptr;
- const char *queryCompoundXPath(unsigned fieldNum) const
- {
- if (compoundXPaths && compoundXPaths[fieldNum])
- return compoundXPaths[fieldNum];
- else
- return recInfo.queryXPath(fieldNum);
- }
- public:
- CFieldFetcher(const RtlRecord &_recInfo, IColumnProvider *_currentMatch) : recInfo(_recInfo), currentMatch(_currentMatch)
- {
- numInputFields = recInfo.getNumFields();
- // JCSMORE - should this be done (optionally) when RtlRecord is created?
- for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
- {
- if (recInfo.queryType(fieldNum)->queryChildType())
- {
- const char *xpath = recInfo.queryXPath(fieldNum);
- dbgassertex(xpath);
- const char *ptr = xpath;
- char *expandedXPath = nullptr;
- char *expandedXPathPtr = nullptr;
- while (true)
- {
- if (*ptr == xpathCompoundSeparatorChar)
- {
- if (!compoundXPaths)
- {
- compoundXPaths = new const char *[numInputFields];
- memset(compoundXPaths, 0, sizeof(const char *)*numInputFields);
- }
- size_t sz = strlen(xpath)+1;
- expandedXPath = new char[sz];
- expandedXPathPtr = expandedXPath;
- if (ptr == xpath) // if leading char, just skip
- ++ptr;
- else
- {
- size32_t len = ptr-xpath;
- memcpy(expandedXPath, xpath, len);
- expandedXPathPtr = expandedXPath + len;
- *expandedXPathPtr++ = '/';
- ++ptr;
- }
- while (*ptr)
- {
- if (*ptr == xpathCompoundSeparatorChar)
- {
- *expandedXPathPtr++ = '/';
- ++ptr;
- }
- else
- *expandedXPathPtr++ = *ptr++;
- }
- }
- else
- ptr++;
- if ('\0' == *ptr)
- {
- if (expandedXPath)
- {
- *expandedXPathPtr = '\0';
- compoundXPaths[fieldNum] = expandedXPath;
- }
- break;
- }
- }
- }
- }
- }
- ~CFieldFetcher()
- {
- if (compoundXPaths)
- {
- for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
- delete [] compoundXPaths[fieldNum];
- delete [] compoundXPaths;
- }
- }
- void setCurrentMatch(IColumnProvider *_currentMatch)
- {
- currentMatch.set(_currentMatch);
- }
- // IDynamicFieldValueFetcher impl.
- virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
- {
- dbgassertex(fieldNum < numInputFields);
- dbgassertex(currentMatch);
- size32_t rawSz;
- const char *ret = currentMatch->readRaw(recInfo.queryXPath(fieldNum), rawSz);
- sz = rawSz;
- return (const byte *)ret;
- }
- virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
- {
- dbgassertex(fieldNum < numInputFields);
- dbgassertex(currentMatch);
- const RtlRecord *nested = recInfo.queryNested(fieldNum);
- if (!nested)
- return nullptr;
- class CIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
- {
- XmlChildIterator xmlIter;
- Linked<IDynamicFieldValueFetcher> curFieldValueFetcher;
- Linked<IColumnProvider> parentMatch;
- const RtlRecord &nestedRecInfo;
- public:
- CIterator(const RtlRecord &_nestedRecInfo, IColumnProvider *_parentMatch, const char *xpath) : nestedRecInfo(_nestedRecInfo), parentMatch(_parentMatch)
- {
- xmlIter.initOwn(parentMatch->getChildIterator(xpath));
- }
- virtual bool first() override
- {
- IColumnProvider *child = xmlIter.first();
- if (!child)
- {
- curFieldValueFetcher.clear();
- return false;
- }
- curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
- return true;
- }
- virtual bool next() override
- {
- IColumnProvider *child = xmlIter.next();
- if (!child)
- {
- curFieldValueFetcher.clear();
- return false;
- }
- curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
- return true;
- }
- virtual bool isValid() override
- {
- return nullptr != curFieldValueFetcher.get();
- }
- virtual IDynamicFieldValueFetcher &query() override
- {
- assertex(curFieldValueFetcher);
- return *curFieldValueFetcher;
- }
- };
- // JCSMORE - it would be good if these were cached/reused (can I assume anything using parent fetcher is single threaded?)
- return new CIterator(*nested, currentMatch, queryCompoundXPath(fieldNum));
- }
- virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
- virtual size32_t getRecordSize() const override { throwUnexpected(); }
- };
- bool checkOpen()
- {
- if (!PARENT::checkOpen())
- return false;
- class CSimpleStream : public CSimpleInterfaceOf<ISimpleReadStream>
- {
- Linked<ISerialStream> stream;
- public:
- CSimpleStream(ISerialStream *_stream) : stream(_stream)
- {
- }
- // ISimpleReadStream impl.
- virtual size32_t read(size32_t max_len, void * data) override
- {
- size32_t got;
- const void *res = stream->peek(max_len, got);
- if (got)
- {
- if (got>max_len)
- got = max_len;
- memcpy(data, res, got);
- stream->skip(got);
- }
- return got;
- }
- };
- Owned<ISimpleReadStream> simpleStream = new CSimpleStream(inputStream);
- if (kind==TAKjsonread)
- xmlParser.setown(createJSONParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
- else
- xmlParser.setown(createXMLParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
- if (!fieldFetcher)
- fieldFetcher.setown(new CFieldFetcher(*record, nullptr));
- return true;
- }
- protected:
- StringBuffer xpath;
- StringBuffer customRowTag;
- public:
- IMPLEMENT_IINTERFACE_USING(PARENT);
- CRemoteMarkupReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc, ThorActivityKind _kind) : PARENT(config, fileDesc), kind(_kind)
- {
- config.getProp("ActivityOptions/rowTag", customRowTag);
- noRoot = config.getPropBool("noRoot");
- }
- IColumnProvider *queryMatch() const { return lastMatch; }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("%s[%s]", getActivityText(kind), fileName.get());
- }
- // IRemoteReadActivity impl.
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- if (eofSeen)
- {
- retSz = 0;
- return nullptr;
- }
- checkOpen();
- while (xmlParser->next())
- {
- if (lastMatch)
- {
- ((CFieldFetcher *)fieldFetcher.get())->setCurrentMatch(lastMatch);
- retSz = translator->translate(outBuilder, *this, *fieldFetcher);
- dbgassertex(retSz);
- lastMatch.clear();
- const void *ret = outBuilder.getSelf();
- if (fieldFilterMatch(ret))
- {
- outBuilder.finishRow(retSz);
- ++processed;
- return ret;
- }
- else
- outBuilder.removeBytes(retSz);
- }
- }
- eofSeen = true;
- close();
- retSz = 0;
- return nullptr;
- }
- // IXMLSelect impl.
- virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
- {
- lastMatch.set(&entry);
- }
- };
- class CRemoteXmlReadActivity : public CRemoteMarkupReadActivity
- {
- typedef CRemoteMarkupReadActivity PARENT;
- public:
- CRemoteXmlReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKxmlread)
- {
- xpath.set("/Dataset/");
- if (customRowTag.isEmpty()) // no override
- fileDesc->queryProperties().getProp("@rowTag", xpath);
- else
- xpath.append(customRowTag);
- }
- };
- class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
- {
- typedef CRemoteMarkupReadActivity PARENT;
- public:
- CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
- {
- xpath.set("/");
- if (customRowTag.isEmpty()) // no override
- fileDesc->queryProperties().getProp("@rowTag", xpath);
- else
- xpath.append(customRowTag);
- }
- };
- /* A IRemoteReadActivity that projects to output format
- * Created if input activity requires filtering, but it must 1st translate from external format to the actual format
- * NB: processor, grouped and cursor are same as input.
- */
- class CRemoteCompoundReadProjectActivity : public CSimpleInterfaceOf<IRemoteReadActivity>
- {
- Linked<IRemoteReadActivity> input;
- Owned<IOutputMetaData> outMeta;
- Owned<const IDynamicTransform> translator;
- UnexpectedVirtualFieldCallback fieldCallback;
- MemoryBuffer inputRowMb;
- MemoryBufferBuilder *inputRowBuilder;
- public:
- CRemoteCompoundReadProjectActivity(IPropertyTree &config, IRemoteReadActivity *_input) : input(_input)
- {
- IOutputMetaData *inMeta = input->queryOutputMeta();
- outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
- dbgassertex(outMeta);
- const RtlRecord &inRecord = inMeta->queryRecordAccessor(true);
- const RtlRecord &outRecord = outMeta->queryRecordAccessor(true);
- translator.setown(createRecordTranslator(outRecord, inRecord));
- inputRowBuilder = new MemoryBufferBuilder(inputRowMb, inMeta->getMinRecordSize());
- }
- ~CRemoteCompoundReadProjectActivity()
- {
- delete inputRowBuilder;
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return input->getInfoStr(out).append(" - CompoundProject");
- }
- // IRemoteReadActivity impl.
- virtual unsigned __int64 queryProcessed() const override
- {
- return input->queryProcessed();
- }
- virtual IOutputMetaData *queryOutputMeta() const override
- {
- return outMeta;
- }
- virtual bool isGrouped() const override
- {
- return input->isGrouped();
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- input->serializeCursor(tgt);
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- input->restoreCursor(src);
- }
- virtual IRemoteReadActivity *queryIsReadActivity()
- {
- return this;
- }
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- size32_t rowSz;
- const void *row = input->nextRow(*inputRowBuilder, rowSz);
- if (!row)
- {
- retSz = 0;
- return nullptr;
- }
- retSz = translator->translate(outBuilder, fieldCallback, (const byte *)row);
- const void *ret = outBuilder.getSelf();
- outBuilder.finishRow(retSz);
- return ret;
- }
- virtual bool requiresPostProject() const override
- {
- return false;
- }
- };
- class CRemoteIndexBaseActivity : public CRemoteDiskBaseActivity
- {
- typedef CRemoteDiskBaseActivity PARENT;
- protected:
- bool isTlk = false;
- bool allowPreload = false;
- unsigned fileCrc = 0;
- Owned<IKeyIndex> keyIndex;
- Owned<IKeyManager> keyManager;
- void checkOpen()
- {
- if (opened)
- return;
- Owned<IFile> indexFile = createIFile(fileName);
- CDateTime modTime;
- indexFile->getTime(nullptr, &modTime, nullptr);
- time_t modTimeTT = modTime.getSimple();
- CRC32 crc32(fileCrc);
- crc32.tally(sizeof(time_t), &modTimeTT);
- unsigned crc = crc32.get();
- keyIndex.setown(createKeyIndex(fileName, crc, isTlk, allowPreload));
- keyManager.setown(createLocalKeyManager(*record, keyIndex, nullptr, true, false));
- filters.createSegmentMonitors(keyManager);
- keyManager->finishSegmentMonitors();
- keyManager->reset();
- opened = true;
- }
- void close()
- {
- keyManager.clear();
- keyIndex.clear();
- opened = false;
- eofSeen = true;
- }
- public:
- CRemoteIndexBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
- isTlk = config.getPropBool("isTlk");
- allowPreload = config.getPropBool("allowPreload");
- fileCrc = config.getPropInt("crc");
- }
- };
- class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity
- {
- typedef CRemoteIndexBaseActivity PARENT;
- Owned<const IDynamicTransform> translator;
- unsigned __int64 chooseN = 0;
- public:
- CRemoteIndexReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
- outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
- if (outMeta)
- translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
- else
- outMeta.set(inMeta);
- }
- // IRemoteReadActivity impl.
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- if (eofSeen)
- {
- retSz = 0;
- return nullptr;
- }
- checkOpen();
- if (!eofSeen)
- {
- if (processed < chooseN)
- {
- while (keyManager->lookup(true))
- {
- const byte *keyRow = keyManager->queryKeyBuffer();
- if (fieldFilterMatch(keyRow))
- {
- if (translator)
- retSz = translator->translate(outBuilder, *this, keyRow);
- else
- {
- retSz = keyManager->queryRowSize();
- outBuilder.ensureCapacity(retSz, nullptr);
- memcpy(outBuilder.getSelf(), keyRow, retSz);
- }
- dbgassertex(retSz);
- const void *ret = outBuilder.getSelf();
- outBuilder.finishRow(retSz);
- ++processed;
- return ret;
- }
- }
- retSz = 0;
- }
- eofSeen = true;
- }
- close();
- return nullptr;
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- keyManager->serializeCursorPos(tgt);
- tgt.append(processed);
- /* JCSMORE (see HPCC-19640), serialize seek/scan data to client
- tgt.append(keyManager->querySeeks());
- tgt.append(keyManager->queryScans());
- */
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- checkOpen();
- eofSeen = false;
- keyManager->deserializeCursorPos(src);
- src.read(processed);
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("indexread[%s]", fileName.get());
- }
- };
- class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
- {
- protected:
- StringAttr fileName; // physical filename
- Linked<IOutputMetaData> meta;
- unsigned __int64 processed = 0;
- bool opened = false;
- bool eofSeen = false;
- Owned<IFileIO> iFileIO;
- bool grouped = false;
- void close()
- {
- iFileIO.clear();
- opened = false;
- eofSeen = true;
- }
- public:
- CRemoteWriteBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
- {
- fileName.set(config.queryProp("fileName"));
- if (isEmptyString(fileName))
- throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteWriteBaseActivity: fileName missing");
- grouped = config.getPropBool("inputGrouped");
- meta.setown(getTypeInfoOutputMetaData(config, "input", grouped));
- }
- ~CRemoteWriteBaseActivity()
- {
- }
- // IRemoteWriteActivity impl.
- virtual unsigned __int64 queryProcessed() const override
- {
- return processed;
- }
- virtual IOutputMetaData *queryOutputMeta() const override
- {
- return meta;
- }
- virtual bool isGrouped() const override
- {
- return grouped;
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- throwUnexpected();
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- throwUnexpected();
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("diskwrite[%s]", fileName.get());
- }
- virtual void write(size32_t sz, const void *rowData) override
- {
- throwUnexpected(); // method should be implemented in derived classes.
- }
- virtual IRemoteWriteActivity *queryIsWriteActivity()
- {
- return this;
- }
- };
- class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
- {
- typedef CRemoteWriteBaseActivity PARENT;
- unsigned compressionFormat = 0;
- bool eogPending = false;
- bool someInGroup = false;
- size32_t recordSize = 0;
- Owned<IFileIOStream> iFileIOStream;
- bool append = false;
- void checkOpen()
- {
- if (opened)
- return;
- if (!recursiveCreateDirectoryForFile(fileName))
- throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get());
- OwnedIFile iFile = createIFile(fileName);
- assertex(iFile);
- /* NB: if concurrent writers were supported, then would need mutex here, during open/create
- * multiple activities, each with there own handle would be possible, with mutex during write.
- * Would need mutex per physical filename active.
- */
- if (compressionFormat)
- iFileIO.setown(createCompressedFileWriter(iFile, recordSize, append, true, nullptr, compressionFormat));
- else
- {
- iFileIO.setown(iFile->open(append ? IFOwrite : IFOcreate));
- if (!iFileIO)
- throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to open: '%s' for write", fileName.get());
- }
- iFileIOStream.setown(createIOStream(iFileIO));
- if (append)
- iFileIOStream->seek(0, IFSend);
- opened = true;
- eofSeen = false;
- }
- public:
- CRemoteDiskWriteActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- const char *compressed = config.queryProp("compressed"); // the compression format for the serialized rows in the transport
- if (!isEmptyString(compressed))
- {
- // boolean or format allowed
- if (strieq("true", compressed))
- compressionFormat = translateToCompMethod(nullptr); // gets default
- else if (strieq("false", compressed))
- compressionFormat = 0;
- else
- compressionFormat = translateToCompMethod(compressed);
- }
- append = config.getPropBool("append");
- }
- virtual void write(size32_t sz, const void *rowData) override
- {
- checkOpen();
- iFileIOStream->write(sz, rowData);
- }
- virtual void serializeCursor(MemoryBuffer &tgt) const override
- {
- tgt.append(iFileIOStream->tell());
- }
- virtual void restoreCursor(MemoryBuffer &src) override
- {
- offset_t pos;
- src.read(pos);
- checkOpen();
- iFileIOStream->seek(pos, IFSbegin);
- }
- };
- // create a { unsigned8 } output meta for the count
- static const RtlIntTypeInfo indexCountFieldType(type_unsigned|type_int, 8);
- static const RtlFieldStrInfo indexCountField("count", nullptr, &indexCountFieldType);
- static const RtlFieldInfo * const indexCountFields[2] = { &indexCountField, nullptr };
- static const RtlRecordTypeInfo indexCountRecord(type_record, 2, indexCountFields);
- class CRemoteIndexCountActivity : public CRemoteIndexBaseActivity
- {
- typedef CRemoteIndexBaseActivity PARENT;
- unsigned __int64 rowLimit = 0;
- public:
- CRemoteIndexCountActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
- {
- rowLimit = config.getPropInt64("chooseN");
- outMeta.setown(new CDynamicOutputMetaData(indexCountRecord));
- }
- // IRemoteReadActivity impl.
- virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
- {
- if (eofSeen)
- {
- retSz = 0;
- return nullptr;
- }
- checkOpen();
- unsigned __int64 count = 0;
- if (!eofSeen)
- {
- if (rowLimit)
- count = keyManager->checkCount(rowLimit);
- else
- count = keyManager->getCount();
- }
- void *tgt = outBuilder.ensureCapacity(sizeof(count), "count");
- const void *ret = outBuilder.getSelf();
- memcpy(tgt, &count, sizeof(count));
- outBuilder.finishRow(sizeof(count));
- close();
- return ret;
- }
- virtual StringBuffer &getInfoStr(StringBuffer &out) const override
- {
- return out.appendf("indexcount[%s]", fileName.get());
- }
- };
- void checkExpiryTime(IPropertyTree &metaInfo)
- {
- const char *expiryTime = metaInfo.queryProp("expiryTime");
- if (isEmptyString(expiryTime))
- throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
- CDateTime expiryTimeDt;
- try
- {
- expiryTimeDt.setString(expiryTime);
- }
- catch (IException *e)
- {
- e->Release();
- throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
- }
- CDateTime nowDt;
- nowDt.setNow();
- if (nowDt >= expiryTimeDt)
- throw createDafsException(DAFSERR_cmdstream_authexpired, "createRemoteActivity: authorization expired");
- }
- IFileDescriptor *verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
- {
- if (!authorizedOnly) // if configured false, allows unencrypted meta info
- {
- if (actNode.hasProp("fileName"))
- return nullptr;
- }
- StringBuffer metaInfoB64;
- actNode.getProp("metaInfo", metaInfoB64);
- if (0 == metaInfoB64.length())
- throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: missing metaInfo");
- MemoryBuffer compressedMetaInfoMb;
- JBASE64_Decode(metaInfoB64.str(), compressedMetaInfoMb);
- MemoryBuffer decompressedMetaInfoMb;
- fastLZDecompressToBuffer(decompressedMetaInfoMb, compressedMetaInfoMb);
- Owned<IPropertyTree> metaInfoEnvelope = createPTree(decompressedMetaInfoMb);
- Owned<IPropertyTree> metaInfo;
- #if defined(_USE_OPENSSL)
- MemoryBuffer metaInfoBlob;
- metaInfoEnvelope->getPropBin("metaInfoBlob", metaInfoBlob);
- bool isSigned = metaInfoBlob.length() != 0;
- if (authorizedOnly && !isSigned)
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: unathorized");
- if (isSigned)
- {
- metaInfo.setown(createPTree(metaInfoBlob));
- const char *keyPairName = metaInfo->queryProp("keyPairName");
- StringBuffer metaInfoSignature;
- if (!metaInfoEnvelope->getProp("signature", metaInfoSignature))
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing signature");
- VStringBuffer keyPairPath("KeyPair[@name=\"%s\"]", keyPairName);
- IPropertyTree *keyPair = keyPairInfo->queryPropTree(keyPairPath);
- if (!keyPair)
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing key pair definition");
- const char *publicKeyFName = keyPair->queryProp("@publicKey");
- if (isEmptyString(publicKeyFName))
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing public key definition");
- Owned<CLoadedKey> publicKey = loadPublicKeyFromFile(publicKeyFName, nullptr); // NB: if cared could cache loaded keys
- if (!digiVerify(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *publicKey))
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: signature verification failed");
- checkExpiryTime(*metaInfo);
- }
- else
- #endif
- metaInfo.set(metaInfoEnvelope);
- assertex(actNode.hasProp("filePart"));
- unsigned partNum = actNode.getPropInt("filePart");
- assertex(partNum);
- unsigned partCopy = actNode.getPropInt("filePartCopy", 1);
- Owned<IFileDescriptor> fileDesc;
- unsigned metaInfoVersion = metaInfo->getPropInt("version");
- switch (metaInfoVersion)
- {
- case 0:
- // implies unsigned direct request from engines (on unsecure port)
- // fall through
- case 1: // legacy
- {
- IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
- assertex(fileInfo);
- VStringBuffer xpath("Part[%u]/Copy[%u]/@filePath", partNum, partCopy);
- StringBuffer partFileName;
- fileInfo->getProp(xpath, partFileName);
- if (!partFileName.length())
- throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: invalid file info");
- actNode.setProp("fileName", partFileName.str());
- break;
- }
- case 2: // serialized compact IFileDescriptor
- {
- IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
- fileDesc.setown(deserializeFileDescriptorTree(fileInfo));
- RemoteFilename rfn;
- fileDesc->getFilename(partNum-1, partCopy-1, rfn);
- StringBuffer path;
- rfn.getLocalPath(path);
- actNode.setProp("fileName", path.str());
- break;
- }
- default:
- throwUnexpected();
- }
- verifyex(actNode.removeProp("metaInfo")); // no longer needed
- return fileDesc.getClear();
- }
- template<class ActivityClass> IRemoteReadActivity *createConditionalProjectingActivity(IPropertyTree &actNode, IFileDescriptor *fileDesc)
- {
- Owned<IRemoteReadActivity> activity = new ActivityClass(actNode, fileDesc);
- if (activity->requiresPostProject())
- return new CRemoteCompoundReadProjectActivity(actNode, activity);
- else
- return activity.getClear();
- }
- IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
- {
- Owned<IFileDescriptor> fileDesc = verifyMetaInfo(actNode, authorizedOnly, keyPairInfo);
- const char *partFileName = actNode.queryProp("fileName");
- const char *kindStr = actNode.queryProp("kind");
- ThorActivityKind kind = TAKnone;
- if (kindStr)
- {
- if (strieq("diskread", kindStr))
- kind = TAKdiskread;
- if (strieq("csvread", kindStr))
- kind = TAKcsvread;
- else if (strieq("xmlread", kindStr))
- kind = TAKxmlread;
- else if (strieq("jsonread", kindStr))
- kind = TAKjsonread;
- else if (strieq("indexread", kindStr))
- kind = TAKindexread;
- else if (strieq("indexcount", kindStr))
- kind = TAKindexcount;
- else if (strieq("diskwrite", kindStr))
- kind = TAKdiskwrite;
- else if (strieq("indexwrite", kindStr))
- kind = TAKindexwrite;
- // else - auto-detect
- }
- Owned<IRemoteActivity> activity;
- switch (kind)
- {
- case TAKdiskread:
- {
- activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
- break;
- }
- case TAKcsvread:
- {
- activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
- break;
- }
- case TAKxmlread:
- {
- activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
- break;
- }
- case TAKjsonread:
- {
- activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
- break;
- }
- case TAKindexread:
- {
- activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
- break;
- }
- case TAKindexcount:
- {
- activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
- break;
- }
- case TAKdiskwrite:
- {
- activity.setown(new CRemoteDiskWriteActivity(actNode, fileDesc));
- break;
- }
- default: // in absense of type, read is assumed and file format is auto-detected.
- {
- const char *action = actNode.queryProp("action");
- if (isIndexFile(partFileName))
- {
- if (!isEmptyString(action))
- {
- if (streq("count", action))
- activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
- else
- throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on index '%s'", action, partFileName);
- }
- else
- activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
- }
- else
- {
- if (!isEmptyString(action))
- {
- if (streq("count", action))
- throw createDafsException(DAFSERR_cmdstream_protocol_failure, "Remote Disk Counts currently unsupported");
- else
- throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on flat file '%s'", action, partFileName);
- }
- else
- {
- const char *kind = queryFileKind(fileDesc);
- if (isEmptyString(kind) || (streq("flat", kind)))
- activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
- else if (streq("csv", kind))
- activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
- else if (streq("xml", kind))
- activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
- else if (streq("json", kind))
- activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
- else
- throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown file kind '%s'", kind);
- }
- }
- break;
- }
- }
- return activity.getClear();
- }
- IRemoteActivity *createOutputActivity(IPropertyTree &requestTree, bool authorizedOnly, const IPropertyTree *keyPairInfo)
- {
- IPropertyTree *actNode = requestTree.queryPropTree("node");
- assertex(actNode);
- return createRemoteActivity(*actNode, authorizedOnly, keyPairInfo);
- }
- #define MAX_KEYDATA_SZ 0x10000
- class CRemoteFileServer : implements IRemoteFileServer, public CInterface
- {
- class CThrottler;
- class CRemoteClientHandler : implements ISocketSelectNotify, public CInterface
- {
- bool calledByRowService;
- public:
- CRemoteFileServer *parent;
- Owned<ISocket> socket;
- StringAttr peerName;
- MemoryBuffer msg;
- bool selecthandled;
- size32_t left;
- StructArrayOf<OpenFileInfo> openFiles;
- Owned<IDirectoryIterator> opendir;
- unsigned lasttick, lastInactiveTick;
- atomic_t &globallasttick;
- unsigned previdx; // for debug
- IMPLEMENT_IINTERFACE;
- CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,atomic_t &_globallasttick, bool _calledByRowService)
- : socket(_socket), globallasttick(_globallasttick), calledByRowService(_calledByRowService)
- {
- previdx = (unsigned)-1;
- StringBuffer peerBuf;
- char name[256];
- name[0] = 0;
- int port = socket->peer_name(name,sizeof(name)-1);
- if (port>=0)
- {
- peerBuf.append(name);
- if (port)
- peerBuf.append(':').append(port);
- peerName.set(peerBuf);
- }
- else
- {
- /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
- * May potentially be unavailable for other reasons also.
- * Must be set, as used in client stats HT.
- * If socket closed, the handler will start up but notice closed and quit
- */
- peerName.set("UNKNOWN PEER NAME");
- }
- {
- CriticalBlock block(ClientCountSect);
- if (++ClientCount>MaxClientCount)
- MaxClientCount = ClientCount;
- if (TF_TRACE_CLIENT_CONN)
- {
- StringBuffer s;
- s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
- s.append(peerName);
- PROGLOG("%s", s.str());
- }
- }
- parent = _parent;
- left = 0;
- msg.setEndian(__BIG_ENDIAN);
- selecthandled = false;
- touch();
- }
- ~CRemoteClientHandler()
- {
- {
- CriticalBlock block(ClientCountSect);
- ClientCount--;
- if (TF_TRACE_CLIENT_CONN) {
- PROGLOG("Disconnecting(%p) [%d,%d] ",this,ClientCount,MaxClientCount);
- }
- }
- ISocket *sock = socket.getClear();
- try {
- sock->Release();
- }
- catch (IException *e) {
- EXCLOG(e,"~CRemoteClientHandler");
- e->Release();
- }
- }
- bool isRowServiceClient() const { return calledByRowService; }
- bool notifySelected(ISocket *sock,unsigned selected)
- {
- if (TF_TRACE_FULL)
- PROGLOG("notifySelected(%p)",this);
- if (sock!=socket)
- WARNLOG("notifySelected - invalid socket passed");
- size32_t avail = (size32_t)socket->avail_read();
- if (avail)
- touch();
- else if (left)
- {
- WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
- msg.clear();
- parent->notify(this, msg); // notifying of graceful close
- return false;
- }
- if (left==0)
- {
- try
- {
- left = avail?receiveDaFsBufferSize(socket):0;
- }
- catch (IException *e)
- {
- EXCLOG(e,"notifySelected(1)");
- e->Release();
- left = 0;
- }
- if (left)
- {
- avail = (size32_t)socket->avail_read();
- try
- {
- msg.ensureCapacity(left);
- }
- catch (IException *e)
- {
- EXCLOG(e,"notifySelected(2)");
- e->Release();
- left = 0;
- // if too big then corrupted packet so read avail to try and consume
- char fbuf[1024];
- while (avail)
- {
- size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail;
- try
- {
- socket->read(fbuf, rd); // don't need timeout here
- avail -= rd;
- }
- catch (IException *e)
- {
- EXCLOG(e,"notifySelected(2) flush");
- e->Release();
- break;
- }
- }
- avail = 0;
- left = 0;
- }
- }
- }
- size32_t toread = left>avail?avail:left;
- if (toread)
- {
- try
- {
- socket->read(msg.reserve(toread), toread); // don't need timeout here
- }
- catch (IException *e)
- {
- EXCLOG(e,"notifySelected(3)");
- e->Release();
- toread = left;
- msg.clear();
- }
- }
- if (TF_TRACE_FULL)
- PROGLOG("notifySelected %d,%d",toread,left);
- left -= toread;
- if (left==0)
- {
- // DEBUG
- parent->notify(this, msg); // consumes msg
- }
- return false;
- }
- void logPrevHandle()
- {
- if (previdx<openFiles.ordinality())
- {
- const OpenFileInfo &fileInfo = openFiles.item(previdx);
- PROGLOG("Previous handle(%d): %s", fileInfo.handle, fileInfo.filename->text.get());
- }
- }
- bool throttleCommand(MemoryBuffer &msg)
- {
- RemoteFileCommandType cmd = RFCunknown;
- Owned<IException> e;
- try
- {
- msg.read(cmd);
- parent->throttleCommand(cmd, msg, this);
- return true;
- }
- catch (IException *_e)
- {
- e.setown(_e);
- }
- /* processCommand() will handle most exception and replies,
- * but if throttleCommand fails before it gets that far, this will handle
- */
- MemoryBuffer reply;
- initSendBuffer(reply);
- unsigned err = (cmd == RFCopenIO) ? RFSERR_OpenFailed : 0;
- parent->formatException(reply, e, cmd, false, err, this);
- sendDaFsBuffer(socket, reply);
- return false;
- }
- void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
- {
- MemoryBuffer reply;
- bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
- sendDaFsBuffer(socket, reply, testSocketFlag);
- }
- bool immediateCommand() // returns false if socket closed or failure
- {
- MemoryBuffer msg;
- msg.setEndian(__BIG_ENDIAN);
- touch();
- size32_t avail = (size32_t)socket->avail_read();
- if (avail==0)
- return false;
- receiveDaFsBuffer(socket, msg, 5); // shouldn't timeout as data is available
- touch();
- if (msg.length()==0)
- return false;
- return throttleCommand(msg);
- }
- void process(MemoryBuffer &msg)
- {
- if (selecthandled)
- throttleCommand(msg);
- else
- {
- // msg only used/filled if process() has been triggered by notify()
- while (parent->threadRunningCount()<=parent->targetActiveThreads) // if too many threads add to select handler
- {
- int w;
- try
- {
- w = socket->wait_read(1000);
- }
- catch (IException *e)
- {
- EXCLOG(e, "CRemoteClientHandler::main wait_read error");
- e->Release();
- parent->onCloseSocket(this,1);
- return;
- }
- if (w==0)
- break;
- if ((w<0)||!immediateCommand())
- {
- if (w<0)
- WARNLOG("CRemoteClientHandler::main wait_read error");
- parent->onCloseSocket(this,1);
- return;
- }
- }
- /* This is a bit confusing..
- * The addClient below, adds this request to a selecthandler handled by another thread
- * and passes ownership of 'this' (CRemoteClientHandler)
- *
- * When notified, the selecthandler will launch a new pool thread to handle the request
- * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
- *
- * Either way, a thread pool slot is occupied when processing a request.
- * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
- * IOW, if there are lots of incoming clients that can't be serviced by the CThrottler limit,
- * a large number of pool threads will build up after a while.
- *
- * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
- * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
- * in this additional layer of throttling..
- */
- selecthandled = true;
- parent->addClient(this); // add to select handler
- // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
- }
- }
- bool timedOut()
- {
- return (msTick()-lasttick)>CLIENT_TIMEOUT;
- }
- bool inactiveTimedOut()
- {
- unsigned ms = msTick();
- if ((ms-lastInactiveTick)>CLIENT_INACTIVEWARNING_TIMEOUT)
- {
- lastInactiveTick = ms;
- return true;
- }
- return false;
- }
- void touch()
- {
- lastInactiveTick = lasttick = msTick();
- atomic_set(&globallasttick,lasttick);
- }
- const char *queryPeerName()
- {
- return peerName;
- }
- bool getInfo(StringBuffer &str)
- {
- str.append("client(");
- const char *name = queryPeerName();
- bool ok;
- if (name)
- {
- ok = true;
- str.append(name);
- }
- else
- ok = false;
- unsigned ms = msTick();
- str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
- ForEachItemIn(i, openFiles)
- {
- const OpenFileInfo &fileInfo = openFiles.item(i);
- str.appendf("\n %d: ", fileInfo.handle);
- str.append(fileInfo.filename->text.get());
- }
- return ok;
- }
- };
- class CThrottleQueueItem : public CSimpleInterface
- {
- public:
- RemoteFileCommandType cmd;
- Linked<CRemoteClientHandler> client;
- MemoryBuffer msg;
- CCycleTimer timer;
- CThrottleQueueItem(RemoteFileCommandType _cmd, MemoryBuffer &_msg, CRemoteClientHandler *_client) : cmd(_cmd), client(_client)
- {
- msg.swapWith(_msg);
- }
- };
- class CThrottler
- {
- Semaphore sem;
- CriticalSection crit, configureCrit;
- StringAttr title;
- unsigned limit, delayMs, cpuThreshold, queueLimit;
- unsigned disabledLimit;
- unsigned __int64 totalThrottleDelay;
- CCycleTimer totalThrottleDelayTimer;
- QueueOf<CThrottleQueueItem, false> queue;
- unsigned statsIntervalSecs;
- public:
- CThrottler(const char *_title) : title(_title)
- {
- totalThrottleDelay = 0;
- limit = 0;
- delayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
- cpuThreshold = DEFAULT_STDCMD_THROTTLECPULIMIT;
- disabledLimit = 0;
- queueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
- statsIntervalSecs = DEFAULT_STDCMD_THROTTLECPULIMIT;
- }
- ~CThrottler()
- {
- for (;;)
- {
- Owned<CThrottleQueueItem> item = queue.dequeue();
- if (!item)
- break;
- }
- }
- unsigned queryLimit() const { return limit; }
- unsigned queryDelayMs() const { return delayMs; };;
- unsigned queryCpuThreshold() const { return cpuThreshold; }
- unsigned queryQueueLimit() const { return queueLimit; }
- StringBuffer &getInfoSummary(StringBuffer &info)
- {
- info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
- unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
- time_t simple;
- time(&simple);
- simple -= elapsedSecs;
- CDateTime dt;
- dt.set(simple);
- StringBuffer dateStr;
- dt.getTimeString(dateStr, true);
- info.appendf("Throttler(%s): statistics since %s", title.get(), dateStr.str()).newline();
- info.appendf("Total delay of %0.2f seconds", ((double)totalThrottleDelay)/1000).newline();
- info.appendf("Requests currently queued: %u", queue.ordinality());
- return info;
- }
- void getInfo(StringBuffer &info)
- {
- CriticalBlock b(crit);
- getInfoSummary(info).newline();
- }
- void configure(unsigned _limit, unsigned _delayMs, unsigned _cpuThreshold, unsigned _queueLimit)
- {
- if (_limit > THROTTLE_MAX_LIMIT || _delayMs > THROTTLE_MAX_DELAYMS || _cpuThreshold > THROTTLE_MAX_CPUTHRESHOLD || _queueLimit > THROTTLE_MAX_QUEUELIMIT)
- throw MakeStringException(0, "Throttler(%s), rejecting configure command: limit=%u (max permitted=%u), delayMs=%u (max permitted=%u), cpuThreshold=%u (max permitted=%u), queueLimit=%u (max permitted=%u)",
- title.str(), _limit, THROTTLE_MAX_LIMIT, _delayMs, THROTTLE_MAX_DELAYMS, _cpuThreshold,
- THROTTLE_MAX_CPUTHRESHOLD, _queueLimit, THROTTLE_MAX_QUEUELIMIT);
- CriticalBlock b(configureCrit);
- int delta = 0;
- if (_limit)
- {
- if (disabledLimit) // if transitioning from disabled to some throttling
- {
- assertex(0 == limit);
- delta = _limit - disabledLimit; // + or -
- disabledLimit = 0;
- }
- else
- delta = _limit - limit; // + or -
- }
- else if (0 == disabledLimit)
- {
- PROGLOG("Throttler(%s): disabled, previous limit: %u", title.get(), limit);
- /* disabling - set limit immediately to let all new transaction through.
- * NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
- * Instead the existing 'limit' is kept in 'disabledLimit', so that if/when throttling is
- * re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
- */
- disabledLimit = limit;
- limit = 0;
- }
- if (delta > 0)
- {
- PROGLOG("Throttler(%s): Increasing limit from %u to %u", title.get(), limit, _limit);
- sem.signal(delta);
- limit = _limit;
- // NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
- }
- else if (delta < 0)
- {
- PROGLOG("Throttler(%s): Reducing limit from %u to %u", title.get(), limit, _limit);
- // NB: This is not expected to take long
- CCycleTimer timer;
- while (delta < 0)
- {
- if (sem.wait(1000))
- ++delta;
- else
- PROGLOG("Throttler(%s): Waited %0.2f seconds so far for up to a maximum of %u (previous limit) transactions to complete, %u completed", title.get(), ((double)timer.elapsedMs())/1000, limit, -delta);
- }
- limit = _limit;
- // NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
- }
- if (_delayMs != delayMs)
- {
- PROGLOG("Throttler(%s): New delayMs=%u, previous: %u", title.get(), _delayMs, delayMs);
- delayMs = _delayMs;
- }
- if (_cpuThreshold != cpuThreshold)
- {
- PROGLOG("Throttler(%s): New cpuThreshold=%u, previous: %u", title.get(), _cpuThreshold, cpuThreshold);
- cpuThreshold = _cpuThreshold;
- }
- if (((unsigned)-1) != _queueLimit && _queueLimit != queueLimit)
- {
- PROGLOG("Throttler(%s): New queueLimit=%u%s, previous: %u", title.get(), _queueLimit, 0==_queueLimit?"(disabled)":"", queueLimit);
- queueLimit = _queueLimit;
- }
- }
- void setStatsInterval(unsigned _statsIntervalSecs)
- {
- if (_statsIntervalSecs != statsIntervalSecs)
- {
- PROGLOG("Throttler(%s): New statsIntervalSecs=%u, previous: %u", title.get(), _statsIntervalSecs, statsIntervalSecs);
- statsIntervalSecs = _statsIntervalSecs;
- }
- }
- void take(RemoteFileCommandType cmd) // cmd for info. only
- {
- for (;;)
- {
- if (sem.wait(delayMs))
- return;
- PROGLOG("Throttler(%s): transaction delayed [cmd=%s]", title.get(), getRFCText(cmd));
- }
- }
- void release()
- {
- sem.signal();
- }
- StringBuffer &getStats(StringBuffer &stats, bool reset)
- {
- CriticalBlock b(crit);
- getInfoSummary(stats);
- if (reset)
- {
- totalThrottleDelayTimer.reset();
- totalThrottleDelay = 0;
- }
- return stats;
- }
- void addCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
- {
- CCycleTimer timer;
- Owned<IException> exception;
- bool hadSem = true;
- if (!sem.wait(delayMs))
- {
- CriticalBlock b(crit);
- if (!sem.wait(0)) // check hasn't become available
- {
- unsigned cpu = getLatestCPUUsage();
- if (getLatestCPUUsage()<cpuThreshold)
- {
- /* Allow to proceed, despite hitting throttle limit because CPU < threshold
- * NB: The overall number of threads is still capped by the thread pool.
- */
- unsigned ms = timer.elapsedMs();
- totalThrottleDelay += ms;
- PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
- hadSem = false;
- }
- else
- {
- if (queueLimit && queue.ordinality()>=queueLimit)
- throw MakeStringException(0, "Throttler(%s), the maxiumum number of items are queued (%u), rejecting new command[%s]", title.str(), queue.ordinality(), getRFCText(cmd));
- queue.enqueue(new CThrottleQueueItem(cmd, msg, client)); // NB: takes over ownership of 'client' from running thread
- PROGLOG("Throttler(%s): transaction delayed [cmd=%s], queuing (%u queueud), [client=%p, sock=%u]", title.get(), getRFCText(cmd), queue.ordinality(), client, client->socket->OShandle());
- return;
- }
- }
- }
- /* Guarantee that sem is released.
- * Should normally release on clean exit when queue is empty.
- */
- struct ReleaseSem
- {
- Semaphore *sem;
- ReleaseSem(Semaphore *_sem) { sem = _sem; }
- ~ReleaseSem() { if (sem) sem->signal(); }
- } releaseSem(hadSem?&sem:NULL);
- /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
- * queued items. NB: other threads that are finishing will do also.
- * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
- * Throttle slot (semaphore) is only given back when no more to do.
- */
- Linked<CRemoteClientHandler> currentClient;
- MemoryBuffer currentMsg;
- unsigned ms;
- for (;;)
- {
- RemoteFileCommandType currentCmd;
- {
- CriticalBlock b(crit);
- Owned<CThrottleQueueItem> item = queue.dequeue();
- if (item)
- {
- currentCmd = item->cmd;
- currentClient.setown(item->client.getClear());
- currentMsg.swapWith(item->msg);
- ms = item->timer.elapsedMs();
- }
- else
- {
- if (NULL == client) // previously handled and queue empty
- {
- /* Commands are only queued if semaphore is exhaused (checked inside crit)
- * so only signal the semaphore inside the crit, after checking if there are no queued items
- */
- if (hadSem)
- {
- releaseSem.sem = NULL;
- sem.signal();
- }
- break;
- }
- currentCmd = cmd;
- currentClient.set(client); // process current request after dealing with queue
- currentMsg.swapWith(msg);
- ms = timer.elapsedMs();
- client = NULL;
- }
- }
- if (ms >= 1000)
- {
- if (ms>delayMs)
- PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u seconds", title.get(), getRFCText(currentCmd), ms/1000);
- }
- {
- CriticalBlock b(crit);
- totalThrottleDelay += ms;
- }
- try
- {
- currentClient->processCommand(currentCmd, currentMsg, this);
- }
- catch (IException *e)
- {
- EXCLOG(e, "addCommand: processCommand failed");
- e->Release();
- }
- }
- }
- };
- // temporarily release a throttler slot
- class CThrottleReleaseBlock
- {
- CThrottler &throttler;
- RemoteFileCommandType cmd;
- public:
- CThrottleReleaseBlock(CThrottler &_throttler, RemoteFileCommandType _cmd) : throttler(_throttler), cmd(_cmd)
- {
- throttler.release();
- }
- ~CThrottleReleaseBlock()
- {
- throttler.take(cmd);
- }
- };
- int lasthandle;
- CriticalSection sect;
- Owned<ISocket> acceptsock;
- Owned<ISocket> securesock;
- Owned<ISocket> rowServiceSock;
- bool rowServiceOnStdPort = true; // should row service commands be processed on std. service port
- bool rowServiceSSL = false;
- Owned<ISocketSelectHandler> selecthandler;
- Owned<IThreadPool> threads; // for commands
- bool stopping;
- unsigned clientcounttick;
- unsigned closedclients;
- CAsyncCommandManager asyncCommandManager;
- CThrottler stdCmdThrottler, slowCmdThrottler;
- CClientStatsTable clientStatsTable;
- atomic_t globallasttick;
- unsigned targetActiveThreads;
- Linked<IPropertyTree> keyPairInfo;
- int getNextHandle()
- {
- // called in sect critical block
- for (;;) {
- if (lasthandle==INT_MAX)
- lasthandle = 1;
- else
- lasthandle++;
- unsigned idx1;
- unsigned idx2;
- if (!findHandle(lasthandle,idx1,idx2))
- return lasthandle;
- }
- }
- bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx)
- {
- // called in sect critical block
- clientidx = (unsigned)-1;
- handleidx = (unsigned)-1;
- ForEachItemIn(i,clients) {
- CRemoteClientHandler &client = clients.item(i);
- ForEachItemIn(j, client.openFiles)
- {
- if (client.openFiles.item(j).handle==handle)
- {
- handleidx = j;
- clientidx = i;
- return true;
- }
- }
- }
- return false;
- }
- unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply, bool &maxHit)
- {
- DelayedSizeMarker keyDataSzReturned(reply);
- unsigned numRecs = 0;
- maxHit = false;
- unsigned pos = reply.length();
- while (keyManager->lookup(true))
- {
- unsigned size = keyManager->queryRowSize();
- const byte *result = keyManager->queryKeyBuffer();
- reply.append(size);
- reply.append(size, result);
- ++numRecs;
- if (maxRecs && (0 == --maxRecs))
- {
- maxHit = true;
- break;
- }
- if (reply.length()-pos >= MAX_KEYDATA_SZ)
- {
- maxHit = true;
- break;
- }
- }
- keyDataSzReturned.write();
- return numRecs;
- }
- class cCommandProcessor: public CInterface, implements IPooledThread
- {
- Owned<CRemoteClientHandler> client;
- MemoryBuffer msg;
- public:
- IMPLEMENT_IINTERFACE;
- struct cCommandProcessorParams
- {
- cCommandProcessorParams() { msg.setEndian(__BIG_ENDIAN); }
- CRemoteClientHandler *client;
- MemoryBuffer msg;
- };
- virtual void init(void *_params) override
- {
- cCommandProcessorParams ¶ms = *(cCommandProcessorParams *)_params;
- client.setown(params.client);
- msg.swapWith(params.msg);
- }
- virtual void threadmain() override
- {
- // idea is that initially we process commands inline then pass over to select handler
- try
- {
- client->process(msg);
- }
- catch (IException *e)
- {
- // suppress some errors
- EXCLOG(e,"cCommandProcessor::threadmain");
- e->Release();
- }
- try
- {
- client.clear();
- }
- catch (IException *e)
- {
- // suppress some more errors clearing client
- EXCLOG(e,"cCommandProcessor::threadmain(2)");
- e->Release();
- }
- }
- virtual bool stop() override
- {
- return true;
- }
- virtual bool canReuse() const override
- {
- return false; // want to free owned socket
- }
- };
- IArrayOf<CRemoteClientHandler> clients;
- void validateSSLSetup()
- {
- if (!securitySettings.certificate)
- throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
- if (!checkFileExists(securitySettings.certificate))
- throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
- if (!securitySettings.privateKey)
- throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
- if (!checkFileExists(securitySettings.privateKey))
- throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
- }
- public:
- IMPLEMENT_IINTERFACE
- CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *_keyPairInfo)
- : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter"), keyPairInfo(_keyPairInfo)
- {
- lasthandle = 0;
- selecthandler.setown(createSocketSelectHandler(NULL));
- stdCmdThrottler.configure(DEFAULT_STDCMD_PARALLELREQUESTLIMIT, DEFAULT_STDCMD_THROTTLEDELAYMS, DEFAULT_STDCMD_THROTTLECPULIMIT, DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
- slowCmdThrottler.configure(DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT, DEFAULT_SLOWCMD_THROTTLEDELAYMS, DEFAULT_SLOWCMD_THROTTLECPULIMIT, DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
- unsigned targetMinThreads=maxThreads*20/100; // 20%
- if (0 == targetMinThreads) targetMinThreads = 1;
- targetActiveThreads=maxThreads*80/100; // 80%
- if (0 == targetActiveThreads) targetActiveThreads = 1;
- class CCommandFactory : public CSimpleInterfaceOf<IThreadFactory>
- {
- CRemoteFileServer &parent;
- public:
- CCommandFactory(CRemoteFileServer &_parent) : parent(_parent) { }
- virtual IPooledThread *createNew()
- {
- return parent.createCommandProcessor();
- }
- };
- Owned<IThreadFactory> factory = new CCommandFactory(*this); // NB: pool links factory, so takes ownership
- threads.setown(createThreadPool("CRemoteFileServerPool", factory, NULL, maxThreads, maxThreadsDelayMs,
- #ifdef __64BIT__
- 0, // Unlimited stack size
- #else
- 0x10000,
- #endif
- INFINITE,targetMinThreads));
- threads->setStartDelayTracing(60); // trace amount delayed every minute.
- PROGLOG("CRemoteFileServer: maxThreads = %u, maxThreadsDelayMs = %u, maxAsyncCopy = %u", maxThreads, maxThreadsDelayMs, maxAsyncCopy);
- stopping = false;
- clientcounttick = msTick();
- closedclients = 0;
- atomic_set(&globallasttick,msTick());
- }
- ~CRemoteFileServer()
- {
- #ifdef _DEBUG
- PROGLOG("Exiting CRemoteFileServer");
- #endif
- asyncCommandManager.join();
- clients.kill();
- #ifdef _DEBUG
- PROGLOG("Exited CRemoteFileServer");
- #endif
- }
- bool lookupFileIOHandle(int handle, OpenFileInfo &fileInfo, unsigned newFlags=0)
- {
- if (handle<=0)
- return false;
- CriticalBlock block(sect);
- unsigned clientidx;
- unsigned handleidx;
- if (!findHandle(handle,clientidx,handleidx))
- return false;
- CRemoteClientHandler &client = clients.item(clientidx);
- OpenFileInfo &openFileInfo = client.openFiles.element(handleidx); // NB: links members
- openFileInfo.flags |= newFlags;
- fileInfo = openFileInfo;
- client.previdx = handleidx;
- return true;
- }
- //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
- // should throw a different exception
- bool checkFileIOHandle(int handle, IFileIO *&fileio, bool del=false)
- {
- fileio = NULL;
- if (handle<=0)
- return false;
- CriticalBlock block(sect);
- unsigned clientidx;
- unsigned handleidx;
- if (findHandle(handle,clientidx,handleidx))
- {
- CRemoteClientHandler &client = clients.item(clientidx);
- const OpenFileInfo &fileInfo = client.openFiles.item(handleidx);
- if (del)
- {
- if (fileInfo.flags & of_key)
- clearKeyStoreCacheEntry(fileInfo.fileIO);
- client.openFiles.remove(handleidx);
- client.previdx = (unsigned)-1;
- }
- else
- {
- fileio = client.openFiles.item(handleidx).fileIO;
- client.previdx = handleidx;
- }
- return true;
- }
- return false;
- }
- void checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
- {
- if (!checkFileIOHandle(handle, fileio, del))
- throw createDafsException(RFSERR_InvalidFileIOHandle, nullptr);
- }
- void onCloseSocket(CRemoteClientHandler *client, int which)
- {
- if (!client)
- return;
- CriticalBlock block(sect);
- #ifdef _DEBUG
- StringBuffer s(client->queryPeerName());
- PROGLOG("onCloseSocket(%d) %s",which,s.str());
- #endif
- if (client->socket)
- {
- try
- {
- /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
- * It has not been added to the selecthandler
- * Harmless, but wasteful if so.
- */
- selecthandler->remove(client->socket);
- }
- catch (IException *e) {
- EXCLOG(e,"CRemoteFileServer::onCloseSocket.1");
- e->Release();
- }
- }
- try {
- clients.zap(*client);
- }
- catch (IException *e) {
- EXCLOG(e,"CRemoteFileServer::onCloseSocket.2");
- e->Release();
- }
- }
- bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- Owned<StringAttrItem> name = new StringAttrItem;
- byte mode;
- byte share;
- msg.read(name->text).read(mode).read(share);
- // also try to recv extra byte
- byte extra = 0;
- unsigned short sMode = IFUnone;
- unsigned short cFlags = IFUnone;
- if (msg.remaining() >= sizeof(byte))
- {
- msg.read(extra);
- // and then try to recv extra sMode, cFlags (always sent together)
- if (msg.remaining() >= (sizeof(sMode) + sizeof(cFlags)))
- msg.read(sMode).read(cFlags);
- }
- IFEflags extraFlags = (IFEflags)extra;
- // none => nocache for remote (hint)
- // can revert to previous behavior with conf file setting "allow_pgcache_flush=false"
- if (extraFlags == IFEnone)
- extraFlags = IFEnocache;
- Owned<IFile> file = createIFile(name->text);
- switch ((compatIFSHmode)share) {
- case compatIFSHnone:
- file->setCreateFlags(S_IRUSR|S_IWUSR);
- file->setShareMode(IFSHnone);
- break;
- case compatIFSHread:
- file->setShareMode(IFSHread);
- break;
- case compatIFSHwrite:
- file->setShareMode(IFSHfull);
- break;
- case compatIFSHexec:
- file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
- break;
- case compatIFSHall:
- file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive
- file->setShareMode(IFSHfull);
- break;
- }
- // use sMode, cFlags if sent
- if (sMode != IFUnone && cFlags != IFUnone)
- {
- file->setCreateFlags(cFlags);
- file->setShareMode((IFSHmode)sMode);
- }
- if (TF_TRACE_PRE_IO)
- PROGLOG("before open file '%s', (%d,%d,%d,%d,0%o)",name->text.get(),(int)mode,(int)share,extraFlags,sMode,cFlags);
- Owned<IFileIO> fileio = file->open((IFOmode)mode,extraFlags);
- int handle;
- if (fileio)
- {
- CriticalBlock block(sect);
- handle = getNextHandle();
- client.previdx = client.openFiles.ordinality();
- client.openFiles.append(OpenFileInfo(handle, fileio, name));
- }
- else
- handle = 0;
- reply.append(RFEnoerror);
- reply.append(handle);
- if (TF_TRACE)
- PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle);
- return true;
- }
- bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- int handle;
- msg.read(handle);
- IFileIO *fileio;
- checkFileIOHandle(reply, handle, fileio, true);
- if (TF_TRACE)
- PROGLOG("close file, handle = %d",handle);
- reply.append(RFEnoerror);
- return true;
- }
- void cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
- {
- int handle;
- __int64 pos;
- size32_t len;
- msg.read(handle).read(pos).read(len);
- IFileIO *fileio;
- checkFileIOHandle(reply, handle, fileio);
- //arrange it so we read directly into the reply buffer...
- unsigned posOfErr = reply.length();
- reply.append((unsigned)RFEnoerror);
- size32_t numRead;
- unsigned posOfLength = reply.length();
- if (TF_TRACE_PRE_IO)
- PROGLOG("before read file, handle = %d, toread = %d",handle,len);
- reply.reserve(sizeof(numRead));
- void *data = reply.reserve(len);
- numRead = fileio->read(pos,len,data);
- stats.addRead(len);
- if (TF_TRACE)
- PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
- reply.setLength(posOfLength + sizeof(numRead) + numRead);
- reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead);
- }
- void cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- int handle;
- msg.read(handle);
- IFileIO *fileio;
- checkFileIOHandle(reply, handle, fileio);
- __int64 size = fileio->size();
- reply.append((unsigned)RFEnoerror).append(size);
- if (TF_TRACE)
- PROGLOG("size file, handle = %d, size = %" I64F "d",handle,size);
- }
- void cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- int handle;
- offset_t size;
- msg.read(handle).read(size);
- IFileIO *fileio;
- if (TF_TRACE)
- PROGLOG("set size file, handle = %d, size = %" I64F "d",handle,size);
- checkFileIOHandle(reply, handle, fileio);
- fileio->setSize(size);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
- {
- int handle;
- __int64 pos;
- size32_t len;
- msg.read(handle).read(pos).read(len);
- IFileIO *fileio;
- checkFileIOHandle(reply, handle, fileio);
- const byte *data = (const byte *)msg.readDirect(len);
- if (TF_TRACE_PRE_IO)
- PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
- size32_t numWritten = fileio->write(pos,len,data);
- stats.addWrite(numWritten);
- if (TF_TRACE)
- PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
- reply.append((unsigned)RFEnoerror).append(numWritten);
- }
- void cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("exists, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- bool e = file->exists();
- reply.append((unsigned)RFEnoerror).append(e);
- }
- void cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("remove, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- bool e = file->remove();
- reply.append((unsigned)RFEnoerror).append(e);
- }
- void cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- if (TF_TRACE)
- PROGLOG("getVer");
- /* weird backward compatibility convention,
- * newer clients will send another unsigned to denote
- * and result in the numeric DAFILESRV_VERSION being returned
- * Ancient clients will get back the string form only (SERVER_VERSTRING)
- */
- if (msg.getPos()+sizeof(unsigned)>msg.length())
- reply.append((unsigned)RFEnoerror);
- else
- reply.append((unsigned)DAFILESRV_VERSION+0x10000);
- reply.append(DAFILESRV_VERSIONSTRING);
- }
- void cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
- {
- StringAttr fromname;
- msg.read(fromname);
- StringAttr toname;
- msg.read(toname);
- if (TF_TRACE)
- PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get());
- Owned<IFile> file=createIFile(fromname);
- file->rename(toname);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
- {
- StringAttr fromname;
- msg.read(fromname);
- StringAttr toname;
- msg.read(toname);
- if (TF_TRACE)
- PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get());
- Owned<IFile> file=createIFile(fromname);
- file->move(toname);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- StringAttr fromname;
- msg.read(fromname);
- StringAttr toname;
- msg.read(toname);
- if (TF_TRACE)
- PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get());
- copyFile(toname, fromname);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
- {
- int handle;
- __int64 pos;
- __int64 len;
- StringAttr srcname;
- msg.read(handle).read(srcname).read(pos).read(len);
- IFileIO *fileio;
- checkFileIOHandle(reply, handle, fileio);
- Owned<IFile> file = createIFile(srcname.get());
- __int64 written = fileio->appendFile(file,pos,len);
- stats.addWrite(written);
- if (TF_TRACE)
- PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
- reply.append((unsigned)RFEnoerror).append(written);
- }
- void cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("isFile, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- unsigned ret = (unsigned)file->isFile();
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("isDir, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- unsigned ret = (unsigned)file->isDirectory();
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("isReadOnly, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- unsigned ret = (unsigned)file->isReadOnly();
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- bool set;
- msg.read(name).read(set);
- if (TF_TRACE)
- PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set);
- Owned<IFile> file=createIFile(name);
- file->setReadOnly(set);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdSetFilePerms(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- unsigned fPerms;
- msg.read(name).read(fPerms);
- if (TF_TRACE)
- PROGLOG("setFilePerms, '%s' 0%o",name.get(),fPerms);
- Owned<IFile> file=createIFile(name);
- file->setFilePermissions(fPerms);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("getTime, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- CDateTime createTime;
- CDateTime modifiedTime;
- CDateTime accessedTime;
- bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime);
- reply.append((unsigned)RFEnoerror).append(ret);
- if (ret)
- {
- createTime.serialize(reply);
- modifiedTime.serialize(reply);
- accessedTime.serialize(reply);
- }
- }
- void cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- bool creategot;
- CDateTime createTime;
- bool modifiedgot;
- CDateTime modifiedTime;
- bool accessedgot;
- CDateTime accessedTime;
- msg.read(name);
- msg.read(creategot);
- if (creategot)
- createTime.deserialize(msg);
- msg.read(modifiedgot);
- if (modifiedgot)
- modifiedTime.deserialize(msg);
- msg.read(accessedgot);
- if (accessedgot)
- accessedTime.deserialize(msg);
- if (TF_TRACE)
- PROGLOG("setTime, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL);
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("CreateDir, '%s'",name.get());
- Owned<IFile> dir=createIFile(name);
- bool ret = dir->createDirectory();
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- StringAttr mask;
- bool includedir;
- bool sub;
- byte stream = 0;
- msg.read(name).read(mask).read(includedir).read(sub);
- if (msg.remaining()>=sizeof(byte))
- {
- msg.read(stream);
- if (stream==1)
- client.opendir.clear();
- }
- if (TF_TRACE)
- PROGLOG("GetDir, '%s', '%s', stream='%u'",name.get(),mask.get(),stream);
- if (!stream && !containsFileWildcard(mask))
- {
- // if no streaming, and mask contains no wildcard, it is much more efficient to get the info without a directory iterator!
- StringBuffer fullFilename(name);
- addPathSepChar(fullFilename).append(mask);
- Owned<IFile> iFile = createIFile(fullFilename);
- // NB: This must preserve same serialization format as CRemoteDirectoryIterator::serialize produces for <=1 file.
- reply.append((unsigned)RFEnoerror);
- if (!iFile->exists())
- reply.append((byte)0);
- else
- {
- byte b=1;
- reply.append(b);
- bool isDir = foundYes == iFile->isDirectory();
- reply.append(isDir);
- reply.append(isDir ? 0 : iFile->size());
- CDateTime dt;
- iFile->getTime(nullptr, &dt, nullptr);
- dt.serialize(reply);
- reply.append(mask);
- b = 0;
- reply.append(b);
- }
- }
- else
- {
- Owned<IFile> dir=createIFile(name);
- Owned<IDirectoryIterator> iter;
- if (stream>1)
- iter.set(client.opendir);
- else
- {
- iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
- if (stream != 0)
- client.opendir.set(iter);
- }
- if (!iter)
- throw createDafsException(RFSERR_GetDirFailed, nullptr);
- reply.append((unsigned)RFEnoerror);
- if (serializeRemoteDirectoryIterator(reply,iter,stream?0x100000:0,stream<2))
- {
- if (stream != 0)
- client.opendir.clear();
- }
- else
- {
- bool cont=true;
- reply.append(cont);
- }
- }
- }
- void cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- StringAttr mask;
- bool includedir;
- bool sub;
- unsigned checkinterval;
- unsigned timeout;
- __int64 cancelid; // not yet used
- msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid);
- byte isprev;
- msg.read(isprev);
- Owned<IDirectoryIterator> prev;
- if (isprev==1)
- {
- SocketEndpoint ep;
- prev.setown(createRemoteDirectorIterator(ep, name, msg));
- }
- if (TF_TRACE)
- PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get());
- Owned<IFile> dir=createIFile(name);
- Owned<IDirectoryDifferenceIterator> iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout);
- reply.append((unsigned)RFEnoerror);
- byte state = (iter.get()==NULL)?0:1;
- reply.append(state);
- if (state==1)
- serializeRemoteDirectoryDiff(reply, iter);
- }
- void cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr uuid;
- StringAttr fromFile;
- StringAttr toFile;
- offset_t toOfs;
- offset_t fromOfs;
- offset_t size;
- offset_t sizeDone=0;
- offset_t totalSize=(offset_t)-1;
- unsigned timeout;
- msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout);
- AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout);
- reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize);
- }
- static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
- {
- unsigned start = msTick();
- Owned<IFile> dstfile = createIFile(dstfn);
- // the following is really to check the dest node is up and working (otherwise not much point in continuing!)
- if (dstfile->exists())
- PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename());
- Owned<IFile> srcfile = createIFile(srcfn);
- unsigned lastmin = 0;
- if (!srcfn.queryIP().ipequals(dstfn.queryIP())) {
- CriticalBlock block(treeCopyCrit);
- for (;;) {
- CDateTime dt;
- offset_t sz;
- try {
- sz = srcfile->size();
- if (sz==(offset_t)-1) {
- if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename());
- break;
- }
- srcfile->getTime(NULL,&dt,NULL);
- }
- catch (IException *e) {
- EXCLOG(e,"treeCopyFile(1)");
- e->Release();
- break;
- }
- Linked<CTreeCopyItem> tc;
- unsigned now = msTick();
- ForEachItemInRev(i1,treeCopyArray) {
- CTreeCopyItem &item = treeCopyArray.item(i1);
- // prune old entries (not strictly needed buf I think better)
- if (now-item.lastused>TREECOPYPRUNETIME)
- treeCopyArray.remove(i1);
- else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) {
- tc.set(&item);
- item.lastused = now;
- }
- }
- if (!tc.get()) {
- if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE)
- treeCopyArray.remove(0);
- tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt));
- treeCopyArray.append(*tc.getLink());
- }
- ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?)
- if (!tc->busy->testSet(cand)) {
- // check file accessible and matches
- if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor
- continue;
- bool ok = true;
- Owned<IFile> rmtfile = createIFile(tc->loc.item(cand));
- if (cand) { // only need to check if remote
- try {
- if (rmtfile->size()!=sz)
- ok = false;
- else {
- CDateTime fdt;
- rmtfile->getTime(NULL,&fdt,NULL);
- ok = fdt.equals(dt);
- }
- }
- catch (IException *e) {
- EXCLOG(e,"treeCopyFile(2)");
- e->Release();
- ok = false;
- }
- }
- if (ok) { // if not ok leave 'busy'
- // finally lets try and copy!
- try {
- if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
- {
- CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
- rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
- }
- if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
- tc->busy->set(cand,false);
- if (treeCopyWaiting)
- treeCopySem.signal((treeCopyWaiting>1)?2:1);
- // add to known locations
- tc->busy->set(tc->loc.ordinality(),false); // prob already is clear
- tc->loc.append(dstfn);
- ip.ipset(tc->loc.item(cand).queryIP());
- return;
- }
- catch (IException *e) {
- if (cand==0) {
- tc->busy->set(0,false); // don't leave busy
- if (treeCopyWaiting)
- treeCopySem.signal();
- throw; // what more can we do!
- }
- EXCLOG(e,"treeCopyFile(3)");
- e->Release();
- }
- }
- }
- }
- // all locations busy
- if (msTick()-start>TREECOPYTIMEOUT) {
- WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename());
- break;
- }
- treeCopyWaiting++; // note this isn't precise - just indication
- {
- CriticalUnblock unblock(treeCopyCrit);
- if (throttler)
- {
- CThrottleReleaseBlock block(*throttler, RFCtreecopy);
- treeCopySem.wait(TREECOPYPOLLTIME);
- }
- else
- treeCopySem.wait(TREECOPYPOLLTIME);
- }
- treeCopyWaiting--;
- if ((msTick()-start)/10*1000!=lastmin) {
- lastmin = (msTick()-start)/10*1000;
- PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
- }
- }
- }
- else if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY source on same node as destination");
- if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
- try {
- GetHostIp(ip);
- srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
- }
- catch (IException *e) {
- EXCLOG(e,"TREECOPY(done,fallback)");
- throw;
- }
- if (TF_TRACE_TREE_COPY)
- PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
- }
- void cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler, bool usetmp=false)
- {
- RemoteFilename src;
- src.deserialize(msg);
- RemoteFilename dst;
- dst.deserialize(msg);
- StringAttr net;
- StringAttr mask;
- msg.read(net).read(mask);
- IpAddress ip;
- treeCopyFile(src,dst,net,mask,ip,usetmp,throttler);
- unsigned status = 0;
- reply.append((unsigned)RFEnoerror).append((unsigned)status);
- ip.ipserialize(reply);
- }
- void cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler)
- {
- cmdTreeCopy(msg, reply, client, throttler, true);
- }
- void cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr name;
- msg.read(name);
- if (TF_TRACE)
- PROGLOG("getCRC, '%s'",name.get());
- Owned<IFile> file=createIFile(name);
- unsigned ret = file->getCRC();
- reply.append((unsigned)RFEnoerror).append(ret);
- }
- void cmdStop(MemoryBuffer &msg, MemoryBuffer &reply)
- {
- PROGLOG("Abort request received");
- stopping = true;
- if (acceptsock)
- acceptsock->cancel_accept();
- if (securesock)
- securesock->cancel_accept();
- if (rowServiceSock)
- rowServiceSock->cancel_accept();
- reply.append((unsigned)RFEnoerror);
- }
- void cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr cmdLine;
- msg.read(cmdLine);
- // NB: legacy remoteExec used to simply pass error code and buffer back to caller.
- VStringBuffer errMsg("Remote command execution no longer supported. Trying to execute cmdline=%s", cmdLine.get());
- WARNLOG("%s", errMsg.str());
- size32_t outSz = errMsg.length()+1; // reply with null terminated string
- // reply with error code -1
- reply.append((unsigned)-1).append((unsigned)0).append(outSz).append(outSz, errMsg.str());
- }
- void cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply)
- {
- byte flags;
- msg.read(flags);
- int retcode=-1;
- if (flags!=255) // escape
- {
- retcode = traceFlags;
- traceFlags = flags;
- }
- reply.append(retcode);
- }
- void cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply)
- {
- unsigned level=1;
- if (msg.remaining() >= sizeof(unsigned))
- msg.read(level);
- StringBuffer retstr;
- getInfo(retstr, level);
- reply.append(RFEnoerror).append(retstr.str());
- }
- void cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply)
- {
- // TBD
- StringBuffer retstr;
- getInfo(retstr);
- reply.append(RFEnoerror).append(retstr.str());
- }
- void cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- StringAttr prefix;
- StringAttr filename;
- msg.read(prefix).read(filename);
- RemoteFilename rfn;
- rfn.setLocalPath(filename);
- ExtractedBlobArray extracted;
- extractBlobElements(prefix, rfn, extracted);
- unsigned n = extracted.ordinality();
- reply.append((unsigned)RFEnoerror).append(n);
- for (unsigned i=0;i<n;i++)
- extracted.item(i).serialize(reply);
- }
- void cmdStreamGeneral(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- size32_t jsonSz;
- msg.read(jsonSz);
- Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
- cmdStreamCommon(requestTree, msg, reply, client);
- }
- /* Notes on protocol:
- *
- * A JSON request with these top-level fields:
- * "format" - the format of the reply. Supported formats = "binary", "xml", "json"
- * "handle" - the handle of for a file session that was previously open (for continuation)
- * "commCompression" - compression format of the communication protocol. Supports "LZ4", "LZW", "FLZ" (TBD: "ZLIB")
- * "replyLimit" - Number of K to limit each reply size to. (default 1024)
- * "node" - contains all 'activity' properties below:
- *
- * For a secured dafilesrv (streaming protocol), requests will only be accepted if the meta blob ("metaInfo") has a matching signature.
- * The request must specify "filePart" (1 based) to denote the partition # being read from or written to.
- *
- * "filePartCopy" (1 based) defaults to 1
- *
- * "kind" - supported kinds = "diskread", "diskwrite", "indexread", "indexcount" (TBD: "diskcount", "indexwrite", "disklookup")
- * NB: disk vs index will be auto detected if "kind" is absent.
- *
- * "action" - supported actions = "count" (used if "kind" is auto-detected to specify count should be performed instead of read)
- *
- * "keyFilter" - filter the results by this expression (See: HPCC-18474 for more details).
- *
- * "chooseN" - maximum # of results to return
- *
- * "compressed" - specifies whether input file is compressed. NB: not relevant to "index" types. Default = false. Auto-detected.
- *
- * "input" - specifies layout on disk of the file being read.
- *
- * "output" - where relavant, specifies the output format to be returned
- *
- * "fileName" is only used for unsecured non signed connections (normally forbidden), and specifies the fully qualified path to a physical file.
- *
- */
- void cmdStreamCommon(IPropertyTree *requestTree, MemoryBuffer &rest, MemoryBuffer &reply, CRemoteClientHandler &client)
- {
- /* Example JSON request:
- *
- * {
- * "format" : "binary",
- * "handle" : "1234",
- * "replyLimit" : "64",
- * "commCompression" : "LZ4",
- * "node" : {
- * "metaInfo" : "",
- * "filePart" : 2,
- * "filePartCopy" : 1,
- * "kind" : "diskread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "chooseN" : 5,
- * "compressed" : "false"
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * }
- * }
- * OR
- * {
- * "format" : "binary",
- * "handle" : "1234",
- * "replyLimit" : "64",
- * "commCompression" : "LZ4",
- * "node" : {
- * "kind" : "diskread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "chooseN" : 5,
- * "compressed" : "false"
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * }
- * }
- * OR
- * {
- * "format" : "xml",
- * "handle" : "1234",
- * "replyLimit" : "64",
- * "node" : {
- * "kind" : "diskread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "chooseN" : 5,
- * "compressed" : "false"
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * }
- * }
- * OR
- * {
- * "format" : "xml",
- * "handle" : "1234",
- * "node" : {
- * "kind" : "indexread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * }
- * OR
- * {
- * "format" : "xml",
- * "node" : {
- * "kind" : "xmlread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * "ActivityOptions" : { // usually not required, options here may override file meta info.
- * "rowTag" : "/Dataset/OtherRow"
- * }
- * }
- * OR
- * {
- * "format" : "xml",
- * "node" : {
- * "kind" : "csvread",
- * "fileName": "examplefilename",
- * "keyFilter" : "f1='1 '",
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * "output" : {
- * "f2" : "string",
- * "f1" : "real"
- * }
- * "ActivityOptions" : { // usually not required, options here may override file meta info.
- * "csvQuote" : "\"",
- * "csvSeparate" : ","
- * "csvTerminate" : "\\n,\\r\\n",
- * }
- * }
- * OR
- * {
- * "format" : "xml",
- * "node" : {
- * "action" : "count", // if present performs count with/without filter and returns count
- * "fileName": "examplefilename", // can be either index or flat file
- * "keyFilter" : "f1='1 '",
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * },
- * }
- * }
- * OR
- * {
- * "format" : "binary",
- * "handle" : "1234",
- * "replyLimit" : "64",
- * "commCompression" : "LZ4",
- * "node" : {
- * "kind" : "diskwrite",
- * "fileName": "examplefilename",
- * "compressed" : "false" (or "LZ4", "FLZ", "LZW")
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * }
- * }
- * }
- * OR
- * {
- * "format" : "binary",
- * "handle" : "1234",
- * "replyLimit" : "64",
- * "node" : {
- * "kind" : "indexwrite",
- * "fileName": "examplefilename",
- * "input" : {
- * "f1" : "string5",
- * "f2" : "string5"
- * }
- * }
- * }
- *
- */
- int cursorHandle = requestTree->getPropInt("handle");
- OutputFormat outputFormat = outFmt_Xml;
- Owned<ICompressor> compressor;
- Owned<IExpander> expander;
- Owned<CRemoteRequest> remoteRequest;
- Owned<IRemoteActivity> outputActivity;
- OpenFileInfo fileInfo;
- if (!cursorHandle)
- {
- const char *outputFmtStr = requestTree->queryProp("format");
- if (nullptr == outputFmtStr)
- outputFormat = outFmt_Xml; // default
- else if (strieq("xml", outputFmtStr))
- outputFormat = outFmt_Xml;
- else if (strieq("json", outputFmtStr))
- outputFormat = outFmt_Json;
- else if (strieq("binary", outputFmtStr))
- outputFormat = outFmt_Binary;
- else
- throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
- /* pre-version 2.4, "outputCompression" denoted data was compressed in communication protocol and only applied to reply row data
- * Since 2.5 "commCompression" replaces "outputCompression", and applies to both incoming row data (write) and outgoing row data (read).
- * But "outputCompression" is checked for backward compatibility.
- */
- if (requestTree->hasProp("outputCompression") || requestTree->hasProp("commCompression"))
- {
- const char *commCompressionType = requestTree->queryProp("commCompression");
- if (isEmptyString(commCompressionType))
- commCompressionType = requestTree->queryProp("outputCompression");
- if (isEmptyString(commCompressionType))
- {
- compressor.setown(queryDefaultCompressHandler()->getCompressor());
- expander.setown(queryDefaultCompressHandler()->getExpander());
- }
- else if (outFmt_Binary == outputFormat)
- {
- compressor.setown(getCompressor(commCompressionType));
- expander.setown(getExpander(commCompressionType));
- if (!compressor)
- WARNLOG("Unknown compressor type specified: %s", commCompressionType);
- }
- else
- WARNLOG("Communication protocol compression not supported for format: %s", outputFmtStr);
- }
- /* NB: unless client call is on dedicated service, allow non-authorized requests through, e.g. from engines talking to unsecured port
- * In a secure setup, this service will be configured on a dedicated port, and the std. insecure dafilesrv will be unreachable.
- */
- bool authorizedOnly = rowServiceSock && client.isRowServiceClient();
- // In future this may be passed the request and build a chain of activities and return sink.
- outputActivity.setown(createOutputActivity(*requestTree, authorizedOnly, keyPairInfo));
- {
- CriticalBlock block(sect);
- cursorHandle = getNextHandle();
- }
- remoteRequest.setown(new CRemoteRequest(cursorHandle, outputFormat, compressor, expander, outputActivity));
- StringBuffer requestStr("jsonrequest:");
- outputActivity->getInfoStr(requestStr);
- Owned<StringAttrItem> name = new StringAttrItem(requestStr);
- CriticalBlock block(sect);
- client.previdx = client.openFiles.ordinality();
- client.openFiles.append(OpenFileInfo(cursorHandle, remoteRequest, name));
- }
- else if (!lookupFileIOHandle(cursorHandle, fileInfo))
- cursorHandle = 0; // challenge response ..
- else // known handle, continuation
- {
- remoteRequest.set(fileInfo.remoteRequest);
- outputFormat = fileInfo.remoteRequest->queryFormat();
- }
- if (cursorHandle)
- remoteRequest->process(requestTree, rest, reply);
- else
- {
- const char *outputFmtStr = requestTree->queryProp("format");
- if (nullptr == outputFmtStr)
- outputFormat = outFmt_Xml; // default
- else if (strieq("xml", outputFmtStr))
- outputFormat = outFmt_Xml;
- else if (strieq("json", outputFmtStr))
- outputFormat = outFmt_Json;
- else if (strieq("binary", outputFmtStr))
- outputFormat = outFmt_Binary;
- else
- throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
- if (outFmt_Binary == outputFormat)
- reply.append(cursorHandle);
- else // outFmt_Xml || outFmt_Json
- {
- Owned<IXmlWriterExt> responseWriter = createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == outputFormat ? WTStandard : WTJSONObject);
- responseWriter->outputBeginNested("Response", true);
- if (outFmt_Xml == outputFormat)
- responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
- responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
- responseWriter->outputEndNested("Response");
- responseWriter->finalize();
- reply.append(responseWriter->length(), responseWriter->str());
- }
- }
- }
- void cmdStreamReadCommon(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- size32_t jsonSz = msg.remaining();
- Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
- cmdStreamCommon(requestTree, msg, reply, client);
- }
- // NB: JSON header to message, for some requests (e.g. write), there will be trailing raw data (e.g. row data)
- void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- reply.append(RFEnoerror); // gets patched if there is a follow on error
- cmdStreamReadCommon(msg, reply, client);
- }
- void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- /* NB: exactly the same handling as cmdStreamReadStd(RFCStreamRead) for now,
- * may want to differentiate later
- * i.e. return format is { len[unsigned4-bigendian], errorcode[unsigned4-bigendian], result } - where result format depends on request output type.
- * errorcode = 0 means no error
- */
- reply.append(RFEnoerror); // gets patched if there is a follow on error
- cmdStreamReadCommon(msg, reply, client);
- }
- void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
- {
- reply.append('J');
- cmdStreamReadCommon(msg, reply, client);
- }
- // legacy version
- void cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- unsigned limit, delayMs, cpuThreshold;
- msg.read(limit);
- msg.read(delayMs);
- msg.read(cpuThreshold);
- stdCmdThrottler.configure(limit, delayMs, cpuThreshold, (unsigned)-1);
- reply.append((unsigned)RFEnoerror);
- }
- void cmdSetThrottle2(MemoryBuffer & msg, MemoryBuffer & reply)
- {
- unsigned throttleClass, limit, delayMs, cpuThreshold, queueLimit;
- msg.read(throttleClass);
- msg.read(limit);
- msg.read(delayMs);
- msg.read(cpuThreshold);
- msg.read(queueLimit);
- setThrottle((ThrottleClass)throttleClass, limit, delayMs, cpuThreshold, queueLimit);
- reply.append((unsigned)RFEnoerror);
- }
- void formatException(MemoryBuffer &reply, IException *e, RemoteFileCommandType cmd, bool testSocketFlag, unsigned _dfsErrorCode, CRemoteClientHandler *client)
- {
- unsigned dfsErrorCode = _dfsErrorCode;
- if (!dfsErrorCode)
- {
- if (e)
- dfsErrorCode = (QUERYINTERFACE(e, IDAFS_Exception)) ? e->errorCode() : RFSERR_InternalError;
- else
- dfsErrorCode = RFSERR_InternalError;
- }
- VStringBuffer errMsg("ERROR: cmd=%s, error=%s", getRFCText(cmd), getRFSERRText(dfsErrorCode));
- if (e)
- {
- errMsg.appendf(" (%u, ", e->errorCode());
- unsigned len = errMsg.length();
- e->errorMessage(errMsg);
- if (len == errMsg.length())
- errMsg.setLength(len-2); // strip off ", " if no message in exception
- errMsg.append(")");
- }
- if (testSocketFlag)
- reply.append('-');
- else
- reply.append(dfsErrorCode);
- reply.append(errMsg.str());
- if (client && cmd!=RFCunlock)
- {
- const char *peer = client->queryPeerName();
- if (peer)
- {
- VStringBuffer err("%s. Client: %s", errMsg.str(), peer);
- PROGLOG("%s", err.str());
- }
- client->logPrevHandle();
- }
- }
- void throttleCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
- {
- switch (cmd)
- {
- case RFCexec:
- case RFCgetcrc:
- case RFCcopy:
- case RFCappend:
- case RFCtreecopy:
- case RFCtreecopytmp:
- slowCmdThrottler.addCommand(cmd, msg, client);
- return;
- case RFCcloseIO:
- case RFCopenIO:
- case RFCread:
- case RFCsize:
- case RFCwrite:
- case RFCexists:
- case RFCremove:
- case RFCrename:
- case RFCgetver:
- case RFCisfile:
- case RFCisdirectory:
- case RFCisreadonly:
- case RFCsetreadonly:
- case RFCsetfileperms:
- case RFCreadfilteredindex:
- case RFCreadfilteredindexcount:
- case RFCreadfilteredindexblob:
- case RFCgettime:
- case RFCsettime:
- case RFCcreatedir:
- case RFCgetdir:
- case RFCmonitordir:
- case RFCstop:
- case RFCextractblobelements:
- case RFCredeploy:
- case RFCmove:
- case RFCsetsize:
- case RFCsettrace:
- case RFCgetinfo:
- case RFCfirewall:
- case RFCStreamRead:
- case RFCStreamReadTestSocket:
- case RFCStreamReadJSON:
- stdCmdThrottler.addCommand(cmd, msg, client);
- return;
- // NB: The following commands are still bound by the the thread pool
- case RFCsetthrottle: // legacy version
- case RFCsetthrottle2:
- case RFCcopysection: // slightly odd, but has it's own limit
- default:
- {
- client->processCommand(cmd, msg, NULL);
- break;
- }
- }
- }
- void checkAuthorizedStreamCommand(CRemoteClientHandler &client)
- {
- if (!rowServiceOnStdPort && !client.isRowServiceClient())
- throw createDafsException(DAFSERR_cmdstream_unauthorized, "Unauthorized command");
- }
- bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
- {
- Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
- bool testSocketFlag = false;
- unsigned posOfErr = reply.length();
- try
- {
- switch(cmd)
- {
- MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
- MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
- MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
- MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
- MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
- MAPCOMMAND(RFCsize, cmdSize);
- MAPCOMMANDCLIENT(RFCexists, cmdExists, *client);
- MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client);
- MAPCOMMANDCLIENT(RFCrename, cmdRename, *client);
- MAPCOMMAND(RFCgetver, cmdGetVer);
- MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client);
- MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client);
- MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client);
- MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client);
- MAPCOMMANDCLIENT(RFCsetfileperms, cmdSetFilePerms, *client);
- MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client);
- MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client);
- MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client);
- MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client);
- MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client);
- MAPCOMMAND(RFCstop, cmdStop);
- MAPCOMMANDCLIENT(RFCexec, cmdExec, *client);
- MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client);
- MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client);
- MAPCOMMANDCLIENT(RFCmove, cmdMove, *client);
- MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client);
- MAPCOMMAND(RFCsetsize, cmdSetSize);
- MAPCOMMAND(RFCsettrace, cmdSetTrace);
- MAPCOMMAND(RFCgetinfo, cmdGetInfo);
- MAPCOMMAND(RFCfirewall, cmdFirewall);
- MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
- MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
- MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
- MAPCOMMAND(RFCsetthrottle, cmdSetThrottle); // legacy version
- MAPCOMMAND(RFCsetthrottle2, cmdSetThrottle2);
- // row service commands
- case RFCStreamGeneral:
- {
- checkAuthorizedStreamCommand(*client);
- reply.append(RFEnoerror); // gets patched if there is a follow on error
- cmdStreamGeneral(msg, reply, *client);
- break;
- }
- case RFCStreamRead:
- {
- checkAuthorizedStreamCommand(*client);
- cmdStreamReadStd(msg, reply, *client);
- break;
- }
- case RFCStreamReadJSON:
- {
- checkAuthorizedStreamCommand(*client);
- cmdStreamReadJSON(msg, reply, *client);
- break;
- }
- case RFCStreamReadTestSocket:
- {
- testSocketFlag = true;
- checkAuthorizedStreamCommand(*client);
- cmdStreamReadTestSocket(msg, reply, *client);
- break;
- }
- default:
- formatException(reply, nullptr, cmd, false, RFSERR_InvalidCommand, client);
- break;
- }
- }
- catch (IException *e)
- {
- reply.setWritePos(posOfErr);
- formatException(reply, e, cmd, testSocketFlag, 0, client);
- }
- return testSocketFlag;
- }
- IPooledThread *createCommandProcessor()
- {
- return new cCommandProcessor();
- }
- virtual void run(DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
- {
- SocketEndpoint sslep(listenep);
- if (sslPort)
- sslep.port = sslPort;
- else
- sslep.port = securitySettings.daFileSrvSSLPort;
- Owned<ISocket> acceptSock, secureSock, rowServiceSock;
- if (_connectMethod != SSLOnly)
- {
- if (listenep.port == 0)
- throw createDafsException(DAFSERR_serverinit_failed, "dafilesrv port not specified");
- if (listenep.isNull())
- acceptSock.setown(ISocket::create(listenep.port));
- else
- {
- StringBuffer ips;
- listenep.getIpText(ips);
- acceptSock.setown(ISocket::create_ip(listenep.port,ips.str()));
- }
- }
- if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
- {
- if (sslep.port == 0)
- throw createDafsException(DAFSERR_serverinit_failed, "Secure dafilesrv port not specified");
- if (_connectMethod == UnsecureFirst)
- {
- // don't fail, but warn - this allows for fast SSL client rejections
- if (!securitySettings.certificate)
- WARNLOG("SSL Certificate information not found in environment.conf, cannot accept SSL connections");
- else if ( !checkFileExists(securitySettings.certificate) )
- {
- WARNLOG("SSL Certificate File not found in environment.conf, cannot accept SSL connections");
- securitySettings.certificate = nullptr;
- }
- if (!securitySettings.privateKey)
- WARNLOG("SSL Key information not found in environment.conf, cannot accept SSL connections");
- else if ( !checkFileExists(securitySettings.privateKey) )
- {
- WARNLOG("SSL Key File not found in environment.conf, cannot accept SSL connections");
- securitySettings.privateKey = nullptr;
- }
- }
- else
- validateSSLSetup();
- if (sslep.isNull())
- secureSock.setown(ISocket::create(sslep.port));
- else
- {
- StringBuffer ips;
- sslep.getIpText(ips);
- secureSock.setown(ISocket::create_ip(sslep.port,ips.str()));
- }
- }
- if (rowServiceEp)
- {
- rowServiceSSL = _rowServiceSSL;
- rowServiceOnStdPort = _rowServiceOnStdPort;
- if (rowServiceEp->isNull())
- rowServiceSock.setown(ISocket::create(rowServiceEp->port));
- else
- {
- StringBuffer ips;
- rowServiceEp->getIpText(ips);
- rowServiceSock.setown(ISocket::create_ip(rowServiceEp->port, ips.str()));
- }
- #ifdef _USE_OPENSSL
- if (rowServiceSSL)
- validateSSLSetup();
- #else
- rowServiceSSL = false;
- #endif
- }
- run(_connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
- }
- virtual void run(DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock) override
- {
- acceptsock.setown(_acceptSock);
- securesock.setown(_secureSock);
- rowServiceSock.setown(_rowServiceSock);
- if (_connectMethod != SSLOnly)
- {
- if (!acceptsock)
- throw createDafsException(DAFSERR_serverinit_failed, "Invalid non-secure socket");
- }
- if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
- {
- if (!securesock)
- throw createDafsException(DAFSERR_serverinit_failed, "Invalid secure socket");
- }
- selecthandler->start();
- for (;;)
- {
- Owned<ISocket> sock;
- Owned<ISocket> sockSSL;
- Owned<ISocket> acceptedRSSock;
- bool sockavail = false;
- bool securesockavail = false;
- bool rowServiceSockAvail = false;
- if (_connectMethod == SSLNone && (nullptr == rowServiceSock.get()))
- sockavail = acceptsock->wait_read(1000*60*1)!=0;
- else if (_connectMethod == SSLOnly && (nullptr == rowServiceSock.get()))
- securesockavail = securesock->wait_read(1000*60*1)!=0;
- else
- {
- UnsignedArray readSocks;
- UnsignedArray waitingSocks;
- if (acceptsock)
- readSocks.append(acceptsock->OShandle());
- if (securesock)
- readSocks.append(securesock->OShandle());
- if (rowServiceSock)
- readSocks.append(rowServiceSock->OShandle());
- int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
- if (numReady > 0)
- {
- for (int idx = 0; idx < numReady; idx++)
- {
- unsigned waitingSock = waitingSocks.item(idx);
- if (acceptsock && (waitingSock == acceptsock->OShandle()))
- sockavail = true;
- else if (securesock && (waitingSock == securesock->OShandle()))
- securesockavail = true;
- else if (rowServiceSock && (waitingSock == rowServiceSock->OShandle()))
- rowServiceSockAvail = true;
- }
- }
- }
- #if 0
- if (!sockavail && !securesockavail && !rowServiceSockAvail)
- {
- JSocketStatistics stats;
- getSocketStatistics(stats);
- StringBuffer s;
- getSocketStatisticsString(stats,s);
- PROGLOG( "Socket statistics : \n%s\n",s.str());
- }
- #endif
- if (stopping)
- break;
- if (sockavail || securesockavail || rowServiceSockAvail)
- {
- if (sockavail)
- {
- try
- {
- sock.setown(acceptsock->accept(true));
- if (!sock||stopping)
- break;
- }
- catch (IException *e)
- {
- EXCLOG(e,"CRemoteFileServer");
- e->Release();
- continue;
- }
- }
- if (securesockavail)
- {
- Owned<ISecureSocket> ssock;
- try
- {
- sockSSL.setown(securesock->accept(true));
- if (!sockSSL||stopping)
- break;
- if ( (_connectMethod == UnsecureFirst) && (!securitySettings.certificate || !securitySettings.privateKey) )
- {
- // for client secure_connect() to fail quickly ...
- cleanupDaFsSocket(sockSSL);
- sockSSL.clear();
- securesockavail = false;
- }
- else
- {
- ssock.setown(createSecureSocket(sockSSL.getClear(), ServerSocket));
- int status = ssock->secure_accept();
- if (status < 0)
- throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection");
- sockSSL.setown(ssock.getLink());
- }
- }
- catch (IJSOCK_Exception *e)
- {
- // accept failed ...
- EXCLOG(e,"CRemoteFileServer (secure)");
- e->Release();
- break;
- }
- catch (IException *e) // IDAFS_Exception also ...
- {
- EXCLOG(e,"CRemoteFileServer1 (secure)");
- e->Release();
- cleanupDaFsSocket(sockSSL);
- sockSSL.clear();
- cleanupDaFsSocket(ssock);
- ssock.clear();
- securesockavail = false;
- }
- }
- if (rowServiceSockAvail)
- {
- Owned<ISecureSocket> ssock;
- try
- {
- acceptedRSSock.setown(rowServiceSock->accept(true));
- if (!acceptedRSSock||stopping)
- break;
- if (rowServiceSSL) // NB: will be disabled if !_USE_OPENSLL
- {
- ssock.setown(createSecureSocket(acceptedRSSock.getClear(), ServerSocket));
- int status = ssock->secure_accept();
- if (status < 0)
- throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish SSL row service connection");
- acceptedRSSock.setown(ssock.getLink());
- }
- }
- catch (IJSOCK_Exception *e)
- {
- // accept failed ...
- EXCLOG(e,"CRemoteFileServer (row service)");
- e->Release();
- break;
- }
- catch (IException *e) // IDAFS_Exception also ...
- {
- EXCLOG(e,"CRemoteFileServer1 (row service)");
- e->Release();
- cleanupDaFsSocket(acceptedRSSock);
- sockSSL.clear();
- cleanupDaFsSocket(ssock);
- ssock.clear();
- rowServiceSockAvail = false;
- }
- }
- #ifdef _DEBUG
- SocketEndpoint eps;
- StringBuffer peerURL;
- #endif
- if (sockavail)
- {
- #ifdef _DEBUG
- sock->getPeerEndpoint(eps);
- eps.getUrlStr(peerURL);
- PROGLOG("Server accepting from %s", peerURL.str());
- #endif
- runClient(sock.getClear(), false);
- }
- if (securesockavail)
- {
- #ifdef _DEBUG
- sockSSL->getPeerEndpoint(eps);
- eps.getUrlStr(peerURL.clear());
- PROGLOG("Server accepting SECURE from %s", peerURL.str());
- #endif
- runClient(sockSSL.getClear(), false);
- }
- if (rowServiceSockAvail)
- {
- #ifdef _DEBUG
- acceptedRSSock->getPeerEndpoint(eps);
- eps.getUrlStr(peerURL.clear());
- PROGLOG("Server accepting row service socket from %s", peerURL.str());
- #endif
- runClient(acceptedRSSock.getClear(), true);
- }
- }
- else
- checkTimeout();
- }
- if (TF_TRACE_CLIENT_STATS)
- PROGLOG("CRemoteFileServer:run exiting");
- selecthandler->stop(true);
- }
- void processUnauthenticatedCommand(RemoteFileCommandType cmd, ISocket *socket, MemoryBuffer &msg)
- {
- // these are unauthenticated commands
- if (cmd != RFCgetver)
- cmd = RFCinvalid;
- MemoryBuffer reply;
- bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
- sendDaFsBuffer(socket, reply, testSocketFlag);
- }
- void runClient(ISocket *sock, bool rowService) // rowService used to distinguish client calls
- {
- cCommandProcessor::cCommandProcessorParams params;
- params.client = new CRemoteClientHandler(this, sock, globallasttick, rowService);
- {
- CriticalBlock block(sect);
- clients.append(*LINK(params.client));
- }
- // NB: This could be blocked, by thread pool limit
- threads->start(¶ms);
- }
- void stop()
- {
- // stop accept loop
- if (TF_TRACE_CLIENT_STATS)
- PROGLOG("CRemoteFileServer::stop");
- if (acceptsock)
- acceptsock->cancel_accept();
- if (securesock)
- securesock->cancel_accept();
- threads->stopAll();
- threads->joinAll(true,60*1000);
- }
- bool notify(CRemoteClientHandler *_client, MemoryBuffer &msg)
- {
- Linked<CRemoteClientHandler> client;
- client.set(_client);
- if (TF_TRACE_FULL)
- PROGLOG("notify %d", msg.length());
- if (msg.length())
- {
- if (TF_TRACE_FULL)
- PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
- cCommandProcessor::cCommandProcessorParams params;
- params.client = client.getClear();
- params.msg.swapWith(msg);
- /* This can block because the thread pool is full and therefore block the selecthandler
- * This is akin to the main server blocking post accept() for the same reason.
- */
- threads->start(¶ms);
- }
- else
- onCloseSocket(client,3); // removes owned handles
- return false;
- }
- void addClient(CRemoteClientHandler *client)
- {
- if (client&&client->socket)
- selecthandler->add(client->socket,SELECTMODE_READ,client);
- }
- void checkTimeout()
- {
- if (msTick()-clientcounttick>1000*60*60)
- {
- CriticalBlock block(ClientCountSect);
- if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
- PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
- clientcounttick = msTick();
- MaxClientCount = ClientCount;
- if (closedclients)
- {
- if (TF_TRACE_CLIENT_STATS)
- PROGLOG("Closed client count = %d",closedclients);
- closedclients = 0;
- }
- }
- CriticalBlock block(sect);
- ForEachItemInRev(i,clients)
- {
- CRemoteClientHandler &client = clients.item(i);
- if (client.timedOut())
- {
- StringBuffer s;
- bool ok = client.getInfo(s); // will spot duff sockets
- if (ok&&(client.openFiles.ordinality()!=0))
- {
- if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
- WARNLOG("Inactive %s",s.str());
- }
- else
- {
- #ifndef _DEBUG
- if (TF_TRACE_CLIENT_CONN)
- #endif
- PROGLOG("Timing out %s",s.str());
- closedclients++;
- onCloseSocket(&client,4); // removes owned handles
- }
- }
- }
- }
- void getInfo(StringBuffer &info, unsigned level=1)
- {
- {
- CriticalBlock block(ClientCountSect);
- info.append(DAFILESRV_VERSIONSTRING).append('\n');
- info.appendf("Client count = %d\n",ClientCount);
- info.appendf("Max client count = %d",MaxClientCount);
- }
- CriticalBlock block(sect);
- ForEachItemIn(i,clients)
- {
- info.newline().append(i).append(": ");
- clients.item(i).getInfo(info);
- }
- info.newline().appendf("Running threads: %u", threadRunningCount());
- info.newline();
- stdCmdThrottler.getInfo(info);
- info.newline();
- slowCmdThrottler.getInfo(info);
- clientStatsTable.getInfo(info, level);
- }
- unsigned threadRunningCount()
- {
- if (!threads)
- return 0;
- return threads->runningCount();
- }
- unsigned idleTime()
- {
- unsigned t = (unsigned)atomic_read(&globallasttick);
- return msTick()-t;
- }
- void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs, unsigned cpuThreshold, unsigned queueLimit)
- {
- switch (throttleClass)
- {
- case ThrottleStd:
- stdCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
- break;
- case ThrottleSlow:
- slowCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
- break;
- default:
- {
- StringBuffer availableClasses("{ ");
- for (unsigned c=0; c<ThrottleClassMax; c++)
- {
- availableClasses.append(c).append(" = ").append(getThrottleClassText((ThrottleClass)c));
- if (c+1<ThrottleClassMax)
- availableClasses.append(", ");
- }
- availableClasses.append(" }");
- throw MakeStringException(0, "Unknown throttle class: %u, available classes are: %s", (unsigned)throttleClass, availableClasses.str());
- }
- }
- }
- StringBuffer &getStats(StringBuffer &stats, bool reset)
- {
- CriticalBlock block(sect);
- stdCmdThrottler.getStats(stats, reset).newline();
- slowCmdThrottler.getStats(stats, reset);
- if (reset)
- clientStatsTable.reset();
- return stats;
- }
- };
- IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *keyPairInfo)
- {
- return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo);
- }
- int setDaliServerTrace(byte flags)
- {
- byte ret = traceFlags;
- traceFlags = flags;
- return ret;
- }
- #ifdef _USE_CPPUNIT
- #include "unittests.hpp"
- #include "rmtfile.hpp"
- /* MP_START_PORT -> MP_END_PORT is the MP reserved dynamic port range, and is used here for convenience.
- * MP_START_PORT is used as starting point to find an available port for the temporary dafilesrv service in these unittests.
- * All (MP) components using this range always check and find an unused port.
- */
- static unsigned serverPort = MP_START_PORT;
- static StringBuffer basePath;
- static Owned<CSimpleInterface> serverThread;
- class RemoteFileSlowTest : public CppUnit::TestFixture
- {
- CPPUNIT_TEST_SUITE(RemoteFileSlowTest);
- CPPUNIT_TEST(testRemoteFilename);
- CPPUNIT_TEST(testStartServer);
- CPPUNIT_TEST(testBasicFunctionality);
- CPPUNIT_TEST(testCopy);
- CPPUNIT_TEST(testOther);
- CPPUNIT_TEST(testConfiguration);
- CPPUNIT_TEST(testDirectoryMonitoring);
- CPPUNIT_TEST(testFinish);
- CPPUNIT_TEST_SUITE_END();
- size32_t testLen = 1024;
- protected:
- void testRemoteFilename()
- {
- const char *rfns = "//1.2.3.4/dir1/file1|//1.2.3.4:7100/dir1/file1,"
- "//1.2.3.4:7100/dir1/file1|//1.2.3.4:7100/dir1/file1,"
- "//1.2.3.4/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
- "//1.2.3.4:7100/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
- "//1.2.3.4:7100/d$/dir1/file1|//1.2.3.4:7100/d$/dir1/file1";
- StringArray tests;
- tests.appendList(rfns, ",");
- ForEachItemIn(i, tests)
- {
- StringArray inOut;
- const char *pair = tests.item(i);
- inOut.appendList(pair, "|");
- const char *rfn = inOut.item(0);
- const char *expected = inOut.item(1);
- Owned<IFile> iFile = createIFile(rfn);
- const char *res = iFile->queryFilename();
- if (!streq(expected, res))
- {
- StringBuffer errMsg("testRemoteFilename MISMATCH");
- errMsg.newline().append("Expected: ").append(expected);
- errMsg.newline().append("Got: ").append(res);
- PROGLOG("%s", errMsg.str());
- CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
- }
- else
- PROGLOG("MATCH: %s", res);
- }
- }
- void testStartServer()
- {
- Owned<ISocket> socket;
- unsigned endPort = MP_END_PORT;
- while (1)
- {
- try
- {
- socket.setown(ISocket::create(serverPort));
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (e->errorCode() != JSOCKERR_port_in_use)
- {
- StringBuffer eStr;
- e->errorMessage(eStr);
- e->Release();
- CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
- }
- else if (serverPort == endPort)
- {
- e->Release();
- CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
- }
- }
- ++serverPort;
- }
- basePath.append("//");
- SocketEndpoint ep(serverPort);
- ep.getUrlStr(basePath);
- char cpath[_MAX_DIR];
- if (!GetCurrentDirectory(_MAX_DIR, cpath))
- CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
- else
- basePath.append(cpath);
- addPathSepChar(basePath);
- PROGLOG("basePath = %s", basePath.str());
- class CServerThread : public CSimpleInterface, implements IThreaded
- {
- CThreaded threaded;
- Owned<CRemoteFileServer> server;
- Linked<ISocket> socket;
- public:
- CServerThread(CRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
- {
- threaded.init(this);
- }
- ~CServerThread()
- {
- threaded.join();
- }
- // IThreaded
- virtual void threadmain() override
- {
- DAFSConnectCfg sslCfg = SSLNone;
- server->run(sslCfg, socket, nullptr, nullptr);
- }
- };
- Owned<IRemoteFileServer> server = createRemoteFileServer();
- serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), CRemoteFileServer), socket.getClear()));
- }
- void testBasicFunctionality()
- {
- VStringBuffer filePath("%s%s", basePath.str(), "file1");
- // create file
- Owned<IFile> iFile = createIFile(filePath);
- CPPUNIT_ASSERT(iFile);
- Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
- CPPUNIT_ASSERT(iFileIO);
- // write out 1k of random data and crc
- MemoryBuffer mb;
- char *buf = (char *)mb.reserveTruncate(testLen);
- for (unsigned b=0; b<1024; b++)
- buf[b] = getRandom()%256;
- CRC32 crc;
- crc.tally(testLen, buf);
- unsigned writeCrc = crc.get();
- size32_t sz = iFileIO->write(0, testLen, buf);
- CPPUNIT_ASSERT(sz == testLen);
- // close file
- iFileIO.clear();
- // validate remote crc
- CPPUNIT_ASSERT(writeCrc == iFile->getCRC());
- // exists
- CPPUNIT_ASSERT(iFile->exists());
- // validate size
- CPPUNIT_ASSERT(iFile->size() == testLen);
- // read back and validate read data's crc against written
- iFileIO.setown(iFile->open(IFOread));
- CPPUNIT_ASSERT(iFileIO);
- sz = iFileIO->read(0, testLen, buf);
- iFileIO.clear();
- CPPUNIT_ASSERT(sz == testLen);
- crc.reset();
- crc.tally(testLen, buf);
- CPPUNIT_ASSERT(writeCrc == crc.get());
- }
- void testCopy()
- {
- VStringBuffer filePath("%s%s", basePath.str(), "file1");
- Owned<IFile> iFile = createIFile(filePath);
- // test file copy
- VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
- Owned<IFile> iFile1Copy = createIFile(filePathCopy);
- iFile->copyTo(iFile1Copy);
- // read back copy and validate read data's crc against written
- Owned<IFileIO> iFileIO = iFile1Copy->open(IFOreadwrite); // open read/write for appendFile in next step.
- CPPUNIT_ASSERT(iFileIO);
- MemoryBuffer mb;
- char *buf = (char *)mb.reserveTruncate(testLen);
- size32_t sz = iFileIO->read(0, testLen, buf);
- CPPUNIT_ASSERT(sz == testLen);
- CRC32 crc;
- crc.tally(testLen, buf);
- CPPUNIT_ASSERT(iFile->getCRC() == crc.get());
- // check appendFile functionality. NB after this "file1copy" should be 2*testLen
- CPPUNIT_ASSERT(testLen == iFileIO->appendFile(iFile));
- iFileIO.clear();
- // validate new size
- CPPUNIT_ASSERT(iFile1Copy->size() == 2 * testLen);
- // setSize test, truncate copy to original size
- iFileIO.setown(iFile1Copy->open(IFOreadwrite));
- iFileIO->setSize(testLen);
- // validate new size
- CPPUNIT_ASSERT(iFile1Copy->size() == testLen);
- }
- void testOther()
- {
- VStringBuffer filePath("%s%s", basePath.str(), "file1");
- Owned<IFile> iFile = createIFile(filePath);
- // rename
- iFile->rename("file2");
- // create a directory
- VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
- Owned<IFile> subDirIFile = createIFile(subDirPath);
- subDirIFile->createDirectory();
- // check isDirectory result
- CPPUNIT_ASSERT(subDirIFile->isDirectory());
- // move previous created and renamed file into new sub-directory
- // ensure not present before move
- VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
- Owned<IFile> iFile2 = createIFile(subDirFilePath);
- iFile2->remove();
- iFile->move(subDirFilePath);
- // open sub-directory file2 explicitly
- RemoteFilename rfn;
- rfn.setRemotePath(subDirPath.str());
- Owned<IFile> dir = createIFile(rfn);
- Owned<IDirectoryIterator> diriter = dir->directoryFiles("file2");
- if (!diriter->first())
- {
- CPPUNIT_ASSERT_MESSAGE("Error, file2 diriter->first() is null", 0);
- }
- Linked<IFile> iFile3 = &diriter->query();
- diriter.clear();
- dir.clear();
- OwnedIFileIO iFile3IO = iFile3->openShared(IFOread, IFSHfull);
- if (!iFile3IO)
- {
- CPPUNIT_ASSERT_MESSAGE("Error, file2 openShared() failed", 0);
- }
- iFile3IO->close();
- // count sub-directory files with a wildcard
- unsigned count=0;
- Owned<IDirectoryIterator> iter = subDirIFile->directoryFiles("*2");
- ForEach(*iter)
- ++count;
- CPPUNIT_ASSERT(1 == count);
- // check isFile result
- CPPUNIT_ASSERT(iFile2->isFile());
- // validate isReadOnly before after setting
- CPPUNIT_ASSERT(!iFile2->isReadOnly());
- iFile2->setReadOnly(true);
- CPPUNIT_ASSERT(iFile2->isReadOnly());
- // get/set Time and validate result
- CDateTime createTime, modifiedTime, accessedTime;
- CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
- CDateTime newModifiedTime = modifiedTime;
- newModifiedTime.adjustTime(-86400); // -1 day
- CPPUNIT_ASSERT(subDirIFile->setTime(&createTime, &newModifiedTime, &accessedTime));
- CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
- CPPUNIT_ASSERT(modifiedTime == newModifiedTime);
- // test set file permissions
- try
- {
- iFile2->setFilePermissions(0777);
- }
- catch (...)
- {
- CPPUNIT_ASSERT_MESSAGE("iFile2->setFilePermissions() exception", 0);
- }
- }
- void testConfiguration()
- {
- SocketEndpoint ep(serverPort); // test trace open connections
- CPPUNIT_ASSERT(setDafileSvrTraceFlags(ep, 0x08));
- StringBuffer infoStr;
- CPPUNIT_ASSERT(RFEnoerror == getDafileSvrInfo(ep, 10, infoStr));
- CPPUNIT_ASSERT(RFEnoerror == setDafileSvrThrottleLimit(ep, ThrottleStd, DEFAULT_STDCMD_PARALLELREQUESTLIMIT+1, DEFAULT_STDCMD_THROTTLEDELAYMS+1, DEFAULT_STDCMD_THROTTLECPULIMIT+1, DEFAULT_STDCMD_THROTTLEQUEUELIMIT+1));
- }
- void testDirectoryMonitoring()
- {
- VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
- Owned<IFile> subDirIFile = createIFile(subDirPath);
- subDirIFile->createDirectory();
- VStringBuffer filePath("%s/%s", subDirPath.str(), "file1");
- class CDelayedFileCreate : implements IThreaded
- {
- CThreaded threaded;
- StringAttr filePath;
- Semaphore doneSem;
- public:
- CDelayedFileCreate(const char *_filePath) : filePath(_filePath), threaded("CDelayedFileCreate")
- {
- threaded.init(this);
- }
- ~CDelayedFileCreate()
- {
- stop();
- }
- void stop()
- {
- doneSem.signal();
- threaded.join();
- }
- // IThreaded impl.
- virtual void threadmain() override
- {
- MilliSleep(1000); // give monitorDirectory a chance to be monitoring
- // create file
- Owned<IFile> iFile = createIFile(filePath);
- CPPUNIT_ASSERT(iFile);
- Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
- CPPUNIT_ASSERT(iFileIO);
- iFileIO.clear();
- doneSem.wait(60 * 1000);
- CPPUNIT_ASSERT(iFile->remove());
- }
- } delayedFileCreate(filePath);
- Owned<IDirectoryDifferenceIterator> iter = subDirIFile->monitorDirectory(nullptr, nullptr, false, false, 2000, 60 * 1000);
- ForEach(*iter)
- {
- StringBuffer fname;
- iter->getName(fname);
- PROGLOG("fname = %s", fname.str());
- }
- delayedFileCreate.stop();
- }
- void testFinish()
- {
- // clearup
- VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
- Owned<IFile> iFile1Copy = createIFile(filePathCopy);
- CPPUNIT_ASSERT(iFile1Copy->remove());
- VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
- VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
- Owned<IFile> iFile2 = createIFile(subDirFilePath);
- CPPUNIT_ASSERT(iFile2->remove());
- Owned<IFile> subDirIFile = createIFile(subDirPath);
- CPPUNIT_ASSERT(subDirIFile->remove());
- SocketEndpoint ep(serverPort);
- Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
- CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
- serverThread.clear();
- }
- };
- CPPUNIT_TEST_SUITE_REGISTRATION( RemoteFileSlowTest );
- CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RemoteFileSlowTest, "RemoteFileSlowTests" );
- #endif // _USE_CPPUNIT
|