dafsserver.cpp 199 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // todo look at IRemoteFileServer stop
  14. #include "platform.h"
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jmisc.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "securesocket.hpp"
  24. #include "portlist.h"
  25. #include "jsocket.hpp"
  26. #include "jencrypt.hpp"
  27. #include "jlzw.hpp"
  28. #include "jset.hpp"
  29. #include "jhtree.hpp"
  30. #include "dadfs.hpp"
  31. #include "remoteerr.hpp"
  32. #include <atomic>
  33. #include <string>
  34. #include <unordered_map>
  35. #include "rtldynfield.hpp"
  36. #include "rtlds_imp.hpp"
  37. #include "rtlread_imp.hpp"
  38. #include "rtlrecord.hpp"
  39. #include "eclhelper_dyn.hpp"
  40. #include "rtlcommon.hpp"
  41. #include "rtlformat.hpp"
  42. #include "jflz.hpp"
  43. #include "digisign.hpp"
  44. #include "dafdesc.hpp"
  45. #include "thorcommon.hpp"
  46. #include "csvsplitter.hpp"
  47. #include "thorxmlread.hpp"
  48. #include "dafscommon.hpp"
  49. #include "rmtfile.hpp"
  50. #include "rmtclient_impl.hpp"
  51. #include "dafsserver.hpp"
  52. using namespace cryptohelper;
  53. #define SOCKET_CACHE_MAX 500
  54. #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger)
  55. #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed
  56. #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day
  57. static const unsigned __int64 defaultFileStreamChooseNLimit = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
  58. static const unsigned __int64 defaultFileStreamSkipN = 0;
  59. static const unsigned defaultDaFSReplyLimitKB = 1024; // 1MB
  60. enum OutputFormat:byte { outFmt_Binary, outFmt_Xml, outFmt_Json };
  61. ///////////////////////////
  62. static unsigned maxConnectTime = 0;
  63. static unsigned maxReceiveTime = 0;
  64. //Security and default port attributes
  65. static class _securitySettings
  66. {
  67. public:
  68. DAFSConnectCfg connectMethod;
  69. unsigned short daFileSrvPort;
  70. unsigned short daFileSrvSSLPort;
  71. const char * certificate;
  72. const char * privateKey;
  73. const char * passPhrase;
  74. _securitySettings()
  75. {
  76. queryDafsSecSettings(&connectMethod, &daFileSrvPort, &daFileSrvSSLPort, &certificate, &privateKey, &passPhrase);
  77. }
  78. } securitySettings;
  79. static CriticalSection secureContextCrit;
  80. static Owned<ISecureSocketContext> secureContextServer;
  81. static Owned<ISecureSocketContext> secureContextClient;
  82. #ifdef _USE_OPENSSL
  83. static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
  84. {
  85. {
  86. CriticalBlock b(secureContextCrit);
  87. if (type == ServerSocket)
  88. {
  89. if (!secureContextServer)
  90. secureContextServer.setown(createSecureSocketContextEx(securitySettings.certificate, securitySettings.privateKey, securitySettings.passPhrase, type));
  91. }
  92. else if (!secureContextClient)
  93. secureContextClient.setown(createSecureSocketContext(type));
  94. }
  95. int loglevel = SSLogNormal;
  96. #ifdef _DEBUG
  97. loglevel = SSLogMax;
  98. #endif
  99. if (type == ServerSocket)
  100. return secureContextServer->createSecureSocket(sock, loglevel);
  101. else
  102. return secureContextClient->createSecureSocket(sock, loglevel);
  103. }
  104. #else
  105. static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
  106. {
  107. throwUnexpected();
  108. }
  109. #endif
  110. struct sRFTM
  111. {
  112. CTimeMon *timemon;
  113. sRFTM(unsigned limit) { timemon = limit ? new CTimeMon(limit) : NULL; }
  114. ~sRFTM() { delete timemon; }
  115. };
  116. const char *remoteServerVersionString() { return DAFILESRV_VERSIONSTRING; }
  117. #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
  118. #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
  119. #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
  120. // (increased when waiting for large block)
  121. #define RFCText(cmd) #cmd
  122. const char *RFCStrings[] =
  123. {
  124. RFCText(RFCopenIO),
  125. RFCText(RFCcloseIO),
  126. RFCText(RFCread),
  127. RFCText(RFCwrite),
  128. RFCText(RFCsize),
  129. RFCText(RFCexists),
  130. RFCText(RFCremove),
  131. RFCText(RFCrename),
  132. RFCText(RFCgetver),
  133. RFCText(RFCisfile),
  134. RFCText(RFCisdirectory),
  135. RFCText(RFCisreadonly),
  136. RFCText(RFCsetreadonly),
  137. RFCText(RFCgettime),
  138. RFCText(RFCsettime),
  139. RFCText(RFCcreatedir),
  140. RFCText(RFCgetdir),
  141. RFCText(RFCstop),
  142. RFCText(RFCexec),
  143. RFCText(RFCdummy1),
  144. RFCText(RFCredeploy),
  145. RFCText(RFCgetcrc),
  146. RFCText(RFCmove),
  147. RFCText(RFCsetsize),
  148. RFCText(RFCextractblobelements),
  149. RFCText(RFCcopy),
  150. RFCText(RFCappend),
  151. RFCText(RFCmonitordir),
  152. RFCText(RFCsettrace),
  153. RFCText(RFCgetinfo),
  154. RFCText(RFCfirewall),
  155. RFCText(RFCunlock),
  156. RFCText(RFCunlockreply),
  157. RFCText(RFCinvalid),
  158. RFCText(RFCcopysection),
  159. RFCText(RFCtreecopy),
  160. RFCText(RFCtreecopytmp),
  161. RFCText(RFCsetthrottle), // legacy version
  162. RFCText(RFCsetthrottle2),
  163. RFCText(RFCsetfileperms),
  164. RFCText(RFCreadfilteredindex),
  165. RFCText(RFCreadfilteredcount),
  166. RFCText(RFCreadfilteredblob),
  167. RFCText(RFCStreamRead),
  168. RFCText(RFCStreamReadTestSocket),
  169. };
  170. static const char *getRFCText(RemoteFileCommandType cmd)
  171. {
  172. if (cmd==RFCStreamReadJSON)
  173. return "RFCStreamReadJSON";
  174. else
  175. {
  176. unsigned elems = sizeof(RFCStrings) / sizeof(RFCStrings[0]);
  177. if (cmd >= elems)
  178. return "RFCunknown";
  179. return RFCStrings[cmd];
  180. }
  181. }
  182. static const char *getRFSERRText(unsigned err)
  183. {
  184. switch (err)
  185. {
  186. case RFSERR_InvalidCommand:
  187. return "RFSERR_InvalidCommand";
  188. case RFSERR_NullFileIOHandle:
  189. return "RFSERR_NullFileIOHandle";
  190. case RFSERR_InvalidFileIOHandle:
  191. return "RFSERR_InvalidFileIOHandle";
  192. case RFSERR_TimeoutFileIOHandle:
  193. return "RFSERR_TimeoutFileIOHandle";
  194. case RFSERR_OpenFailed:
  195. return "RFSERR_OpenFailed";
  196. case RFSERR_ReadFailed:
  197. return "RFSERR_ReadFailed";
  198. case RFSERR_WriteFailed:
  199. return "RFSERR_WriteFailed";
  200. case RFSERR_RenameFailed:
  201. return "RFSERR_RenameFailed";
  202. case RFSERR_ExistsFailed:
  203. return "RFSERR_ExistsFailed";
  204. case RFSERR_RemoveFailed:
  205. return "RFSERR_RemoveFailed";
  206. case RFSERR_CloseFailed:
  207. return "RFSERR_CloseFailed";
  208. case RFSERR_IsFileFailed:
  209. return "RFSERR_IsFileFailed";
  210. case RFSERR_IsDirectoryFailed:
  211. return "RFSERR_IsDirectoryFailed";
  212. case RFSERR_IsReadOnlyFailed:
  213. return "RFSERR_IsReadOnlyFailed";
  214. case RFSERR_SetReadOnlyFailed:
  215. return "RFSERR_SetReadOnlyFailed";
  216. case RFSERR_GetTimeFailed:
  217. return "RFSERR_GetTimeFailed";
  218. case RFSERR_SetTimeFailed:
  219. return "RFSERR_SetTimeFailed";
  220. case RFSERR_CreateDirFailed:
  221. return "RFSERR_CreateDirFailed";
  222. case RFSERR_GetDirFailed:
  223. return "RFSERR_GetDirFailed";
  224. case RFSERR_GetCrcFailed:
  225. return "RFSERR_GetCrcFailed";
  226. case RFSERR_MoveFailed:
  227. return "RFSERR_MoveFailed";
  228. case RFSERR_ExtractBlobElementsFailed:
  229. return "RFSERR_ExtractBlobElementsFailed";
  230. case RFSERR_CopyFailed:
  231. return "RFSERR_CopyFailed";
  232. case RFSERR_AppendFailed:
  233. return "RFSERR_AppendFailed";
  234. case RFSERR_AuthenticateFailed:
  235. return "RFSERR_AuthenticateFailed";
  236. case RFSERR_CopySectionFailed:
  237. return "RFSERR_CopySectionFailed";
  238. case RFSERR_TreeCopyFailed:
  239. return "RFSERR_TreeCopyFailed";
  240. case RAERR_InvalidUsernamePassword:
  241. return "RAERR_InvalidUsernamePassword";
  242. case RFSERR_MasterSeemsToHaveDied:
  243. return "RFSERR_MasterSeemsToHaveDied";
  244. case RFSERR_TimeoutWaitSlave:
  245. return "RFSERR_TimeoutWaitSlave";
  246. case RFSERR_TimeoutWaitConnect:
  247. return "RFSERR_TimeoutWaitConnect";
  248. case RFSERR_TimeoutWaitMaster:
  249. return "RFSERR_TimeoutWaitMaster";
  250. case RFSERR_NoConnectSlave:
  251. return "RFSERR_NoConnectSlave";
  252. case RFSERR_NoConnectSlaveXY:
  253. return "RFSERR_NoConnectSlaveXY";
  254. case RFSERR_VersionMismatch:
  255. return "RFSERR_VersionMismatch";
  256. case RFSERR_SetThrottleFailed:
  257. return "RFSERR_SetThrottleFailed";
  258. case RFSERR_MaxQueueRequests:
  259. return "RFSERR_MaxQueueRequests";
  260. case RFSERR_KeyIndexFailed:
  261. return "RFSERR_MaxQueueRequests";
  262. case RFSERR_StreamReadFailed:
  263. return "RFSERR_StreamReadFailed";
  264. case RFSERR_InternalError:
  265. return "Internal Error";
  266. }
  267. return "RFSERR_Unknown";
  268. }
  269. #define ThrottleText(throttleClass) #throttleClass
  270. const char *ThrottleStrings[] =
  271. {
  272. ThrottleText(ThrottleStd),
  273. ThrottleText(ThrottleSlow),
  274. };
  275. // very high upper limits that configure can't exceed
  276. #define THROTTLE_MAX_LIMIT 1000000
  277. #define THROTTLE_MAX_DELAYMS 3600000
  278. #define THROTTLE_MAX_CPUTHRESHOLD 100
  279. #define THROTTLE_MAX_QUEUELIMIT 10000000
  280. static const char *getThrottleClassText(ThrottleClass throttleClass) { return ThrottleStrings[throttleClass]; }
  281. //---------------------------------------------------------------------------
  282. // TreeCopy
  283. #define TREECOPY_CACHE_SIZE 50
  284. struct CTreeCopyItem: public CInterface
  285. {
  286. StringAttr net;
  287. StringAttr mask;
  288. offset_t sz; // original size
  289. CDateTime dt; // original date
  290. RemoteFilenameArray loc; // locations for file - 0 is original
  291. Owned<IBitSet> busy;
  292. unsigned lastused;
  293. CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  294. : net(_net), mask(_mask)
  295. {
  296. loc.append(orig);
  297. dt.set(_dt);
  298. sz = _sz;
  299. busy.setown(createThreadSafeBitSet());
  300. lastused = msTick();
  301. }
  302. bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  303. {
  304. if (!orig.equals(loc.item(0)))
  305. return false;
  306. if (strcmp(_net,net)!=0)
  307. return false;
  308. if (strcmp(_mask,mask)!=0)
  309. return false;
  310. if (sz!=_sz)
  311. return false;
  312. return (dt.equals(_dt,false));
  313. }
  314. };
  315. static CIArrayOf<CTreeCopyItem> treeCopyArray;
  316. static CriticalSection treeCopyCrit;
  317. static unsigned treeCopyWaiting=0;
  318. static Semaphore treeCopySem;
  319. /////////////////////////
  320. //====================================================================================================
  321. class CAsyncCommandManager
  322. {
  323. class CAsyncJob: public CInterface
  324. {
  325. class cThread: public Thread
  326. {
  327. CAsyncJob *parent;
  328. public:
  329. cThread(CAsyncJob *_parent)
  330. : Thread("CAsyncJob")
  331. {
  332. parent = _parent;
  333. }
  334. int run()
  335. {
  336. int ret = -1;
  337. try {
  338. ret = parent->run();
  339. parent->setDone();
  340. }
  341. catch (IException *e)
  342. {
  343. parent->setException(e);
  344. }
  345. parent->signal();
  346. return ret;
  347. }
  348. } *thread;
  349. StringAttr uuid;
  350. CAsyncCommandManager &parent;
  351. public:
  352. CAsyncJob(CAsyncCommandManager &_parent, const char *_uuid)
  353. : uuid(_uuid), parent(_parent)
  354. {
  355. thread = new cThread(this);
  356. hash = hashc((const byte *)uuid.get(),uuid.length(),~0U);
  357. }
  358. ~CAsyncJob()
  359. {
  360. thread->join();
  361. thread->Release();
  362. }
  363. static void destroy(CAsyncJob *j)
  364. {
  365. j->Release();
  366. }
  367. void signal()
  368. {
  369. parent.signal();
  370. }
  371. void start()
  372. {
  373. parent.wait();
  374. thread->start();
  375. }
  376. void join()
  377. {
  378. thread->join();
  379. }
  380. static unsigned getHash(const char *key)
  381. {
  382. return hashc((const byte *)key,strlen(key),~0U);
  383. }
  384. static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; }
  385. unsigned hash;
  386. bool eq(const char *key)
  387. {
  388. return stricmp(key,uuid.get())==0;
  389. }
  390. virtual int run()=0;
  391. virtual void setException(IException *e)=0;
  392. virtual void setDone()=0;
  393. };
  394. class CAsyncCopySection: public CAsyncJob
  395. {
  396. Owned<IFile> src;
  397. RemoteFilename dst;
  398. offset_t toOfs;
  399. offset_t fromOfs;
  400. offset_t size;
  401. CFPmode mode; // not yet supported
  402. CriticalSection sect;
  403. offset_t done;
  404. offset_t total;
  405. Semaphore finished;
  406. AsyncCommandStatus status;
  407. Owned<IException> exc;
  408. public:
  409. CAsyncCopySection(CAsyncCommandManager &parent, const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size)
  410. : CAsyncJob(parent, _uuid)
  411. {
  412. status = ACScontinue;
  413. src.setown(createIFile(fromFile));
  414. dst.setRemotePath(toFile);
  415. toOfs = _toOfs;
  416. fromOfs = _fromOfs;
  417. size = _size;
  418. mode = CFPcontinue;
  419. done = 0;
  420. total = (offset_t)-1;
  421. }
  422. AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout)
  423. {
  424. if (timeout&&finished.wait(timeout))
  425. finished.signal(); // may need to call again
  426. CriticalBlock block(sect);
  427. if (exc)
  428. throw exc.getClear();
  429. _done = done;
  430. _total = total;
  431. return status;
  432. }
  433. int run()
  434. {
  435. class cProgress: implements ICopyFileProgress
  436. {
  437. CriticalSection &sect;
  438. CFPmode &mode;
  439. offset_t &done;
  440. offset_t &total;
  441. public:
  442. cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode)
  443. : sect(_sect), mode(_mode), done(_done), total(_total)
  444. {
  445. }
  446. CFPmode onProgress(offset_t sizeDone, offset_t totalSize)
  447. {
  448. CriticalBlock block(sect);
  449. done = sizeDone;
  450. total = totalSize;
  451. return mode;
  452. }
  453. } progress(sect,total,done,mode);
  454. src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class
  455. return 0;
  456. }
  457. void setException(IException *e)
  458. {
  459. EXCLOG(e,"CAsyncCommandManager::CAsyncJob");
  460. CriticalBlock block(sect);
  461. if (exc.get())
  462. e->Release();
  463. else
  464. exc.setown(e);
  465. status = ACSerror;
  466. }
  467. void setDone()
  468. {
  469. CriticalBlock block(sect);
  470. finished.signal();
  471. status = ACSdone;
  472. }
  473. };
  474. CMinHashTable<CAsyncJob> jobtable;
  475. CriticalSection sect;
  476. Semaphore threadsem;
  477. unsigned limit;
  478. public:
  479. CAsyncCommandManager(unsigned _limit) : limit(_limit)
  480. {
  481. if (limit) // 0 == unbound
  482. threadsem.signal(limit); // max number of async jobs
  483. }
  484. void join()
  485. {
  486. CriticalBlock block(sect);
  487. unsigned i;
  488. CAsyncJob *j=jobtable.first(i);
  489. while (j) {
  490. j->join();
  491. j=jobtable.next(i);
  492. }
  493. }
  494. void signal()
  495. {
  496. if (limit)
  497. threadsem.signal();
  498. }
  499. void wait()
  500. {
  501. if (limit)
  502. threadsem.wait();
  503. }
  504. AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout)
  505. {
  506. // return 0 if continuing, 1 if done
  507. CAsyncCopySection * job;
  508. Linked<CAsyncJob> cjob;
  509. {
  510. CriticalBlock block(sect);
  511. cjob.set(jobtable.find(uuid,false));
  512. if (cjob) {
  513. job = QUERYINTERFACE(cjob.get(),CAsyncCopySection);
  514. if (!job) {
  515. throw MakeStringException(-1,"Async job ID mismatch");
  516. }
  517. }
  518. else {
  519. job = new CAsyncCopySection(*this, uuid, fromFile, toFile, toOfs, fromOfs, size);
  520. cjob.setown(job);
  521. jobtable.add(cjob.getLink());
  522. cjob->start();
  523. }
  524. }
  525. AsyncCommandStatus ret = ACSerror;
  526. Owned<IException> rete;
  527. try {
  528. ret = job->poll(done,total,timeout);
  529. }
  530. catch (IException * e) {
  531. rete.setown(e);
  532. }
  533. if ((ret!=ACScontinue)||rete.get()) {
  534. job->join();
  535. CriticalBlock block(sect);
  536. jobtable.remove(job);
  537. if (rete.get())
  538. throw rete.getClear();
  539. }
  540. return ret;
  541. }
  542. };
  543. //====================================================================================================
  544. inline void appendErr(MemoryBuffer &reply, unsigned e)
  545. {
  546. reply.append(e).append(getRFSERRText(e));
  547. }
  548. #define MAPCOMMAND(c,p) case c: { this->p(msg, reply) ; break; }
  549. #define MAPCOMMANDCLIENT(c,p,client) case c: { this->p(msg, reply, client); break; }
  550. #define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { testSocketFlag = true; this->p(msg, reply, client); break; }
  551. #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { this->p(msg, reply, client, throttler); break; }
  552. #define MAPCOMMANDSTATS(c,p,stats) case c: { this->p(msg, reply, stats); break; }
  553. #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { this->p(msg, reply, client, stats); break; }
  554. static unsigned ClientCount = 0;
  555. static unsigned MaxClientCount = 0;
  556. static CriticalSection ClientCountSect;
  557. #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
  558. class CClientStats : public CInterface
  559. {
  560. public:
  561. CClientStats(const char *_client) : client(_client), count(0), bRead(0), bWritten(0) { }
  562. const char *queryFindString() const { return client; }
  563. inline void addRead(unsigned __int64 len)
  564. {
  565. bRead += len;
  566. }
  567. inline void addWrite(unsigned __int64 len)
  568. {
  569. bWritten += len;
  570. }
  571. void getStatus(StringBuffer & info) const
  572. {
  573. info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
  574. client.get(), count, bRead.load(), bWritten.load()).newline();
  575. }
  576. StringAttr client;
  577. unsigned __int64 count;
  578. std::atomic<unsigned __int64> bRead;
  579. std::atomic<unsigned __int64> bWritten;
  580. };
  581. class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
  582. {
  583. typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
  584. CriticalSection crit;
  585. unsigned cmdStats[RFCmax];
  586. static int compareElement(void* const *ll, void* const *rr)
  587. {
  588. const CClientStats *l = (const CClientStats *) *ll;
  589. const CClientStats *r = (const CClientStats *) *rr;
  590. if (l->count == r->count)
  591. return 0;
  592. else if (l->count<r->count)
  593. return 1;
  594. else
  595. return -1;
  596. }
  597. public:
  598. CClientStatsTable()
  599. {
  600. memset(&cmdStats[0], 0, sizeof(cmdStats));
  601. }
  602. ~CClientStatsTable()
  603. {
  604. _releaseAll();
  605. }
  606. CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
  607. {
  608. CriticalBlock b(crit);
  609. CClientStats *stats = PARENT::find(client);
  610. if (!stats)
  611. {
  612. stats = new CClientStats(client);
  613. PARENT::replace(*stats);
  614. }
  615. if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
  616. cmdStats[cmd]++;
  617. ++stats->count;
  618. return LINK(stats);
  619. }
  620. StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
  621. {
  622. CriticalBlock b(crit);
  623. unsigned __int64 totalCmds = 0;
  624. for (unsigned c=0; c<RFCmax; c++)
  625. totalCmds += cmdStats[c];
  626. unsigned totalClients = PARENT::ordinality();
  627. info.appendf("Commands processed = %" I64F "u, unique clients = %u", totalCmds, totalClients);
  628. if (totalCmds)
  629. {
  630. info.append("Command stats:").newline();
  631. for (unsigned c=0; c<RFCmax; c++)
  632. {
  633. unsigned __int64 count = cmdStats[c];
  634. if (count)
  635. info.append(getRFCText(c)).append(": ").append(count).newline();
  636. }
  637. }
  638. if (totalClients)
  639. {
  640. SuperHashIteratorOf<CClientStats> iter(*this);
  641. PointerArrayOf<CClientStats> elements;
  642. ForEach(iter)
  643. {
  644. CClientStats &elem = iter.query();
  645. elements.append(&elem);
  646. }
  647. elements.sort(&compareElement);
  648. if (level < 10)
  649. {
  650. // list up to 10 clients ordered by # of commands processed
  651. unsigned max=elements.ordinality();
  652. if (max>10)
  653. max = 10; // cap
  654. info.append("Top 10 clients:").newline();
  655. for (unsigned e=0; e<max; e++)
  656. {
  657. const CClientStats &element = *elements.item(e);
  658. element.getStatus(info);
  659. }
  660. }
  661. else // list all
  662. {
  663. info.append("All clients:").newline();
  664. ForEachItemIn(e, elements)
  665. {
  666. const CClientStats &element = *elements.item(e);
  667. element.getStatus(info);
  668. }
  669. }
  670. }
  671. return info;
  672. }
  673. void reset()
  674. {
  675. CriticalBlock b(crit);
  676. memset(&cmdStats[0], 0, sizeof(cmdStats));
  677. kill();
  678. }
  679. };
  680. interface IRemoteReadActivity;
  681. interface IRemoteWriteActivity;
  682. interface IRemoteActivity : extends IInterface
  683. {
  684. virtual unsigned __int64 queryProcessed() const = 0;
  685. virtual IOutputMetaData *queryOutputMeta() const = 0;
  686. virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
  687. virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
  688. virtual void restoreCursor(MemoryBuffer &src) = 0;
  689. virtual bool isGrouped() const = 0;
  690. virtual IRemoteReadActivity *queryIsReadActivity() { return nullptr; }
  691. virtual IRemoteWriteActivity *queryIsWriteActivity() { return nullptr; }
  692. };
  693. interface IRemoteReadActivity : extends IRemoteActivity
  694. {
  695. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &sz) = 0;
  696. virtual bool requiresPostProject() const = 0;
  697. };
  698. interface IRemoteWriteActivity : extends IRemoteActivity
  699. {
  700. virtual void write(size32_t sz, const void *row) = 0;
  701. };
  702. class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
  703. {
  704. int cursorHandle;
  705. OutputFormat format;
  706. unsigned __int64 replyLimit = defaultDaFSReplyLimitKB * 1024;
  707. Linked<IRemoteActivity> activity;
  708. Linked<ICompressor> compressor;
  709. Linked<IExpander> expander;
  710. MemoryBuffer expandMb;
  711. Owned<IXmlWriterExt> responseWriter; // for xml or json response
  712. bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
  713. {
  714. size32_t sz = inMb.length()-inPos;
  715. if (sz < replyLimit)
  716. return false;
  717. if (!compressor)
  718. return true;
  719. // consumes data from inMb into compressor
  720. totalSz += sz;
  721. const void *data = inMb.bytes()+inPos;
  722. assertex(compressor->write(data, sz) == sz);
  723. inMb.setLength(inPos);
  724. return compressMb.capacity() > replyLimit;
  725. }
  726. void processRead(IPropertyTree *requestTree, MemoryBuffer &responseMb)
  727. {
  728. IRemoteReadActivity *readActivity = activity->queryIsReadActivity();
  729. assertex(readActivity);
  730. MemoryBuffer compressMb;
  731. IOutputMetaData *outMeta = readActivity->queryOutputMeta();
  732. bool eoi=false;
  733. bool grouped = readActivity->isGrouped();
  734. MemoryBuffer resultBuffer;
  735. MemoryBufferBuilder outBuilder(resultBuffer, outMeta->getMinRecordSize());
  736. if (outFmt_Binary == format)
  737. {
  738. if (compressor)
  739. {
  740. compressMb.setEndian(__BIG_ENDIAN);
  741. compressMb.append(responseMb);
  742. }
  743. DelayedMarker<size32_t> dataLenMarker(compressor ? compressMb : responseMb); // uncompressed data size
  744. if (compressor)
  745. {
  746. size32_t initialSz = replyLimit >= 0x10000 ? 0x10000 : replyLimit;
  747. compressor->open(compressMb, initialSz);
  748. }
  749. outBuilder.setBuffer(responseMb); // write direct to responseMb buffer for efficiency
  750. unsigned __int64 numProcessedStart = readActivity->queryProcessed();
  751. size32_t totalDataSz = 0;
  752. size32_t dataStartPos = responseMb.length();
  753. if (grouped)
  754. {
  755. bool pastFirstRow = numProcessedStart>0;
  756. do
  757. {
  758. size32_t eogPos = 0;
  759. if (pastFirstRow)
  760. {
  761. /* this is for last row output, which might have been returned in the previous request
  762. * The eog marker may change as a result of the next row (see writeDirect() call below);
  763. */
  764. eogPos = responseMb.length();
  765. responseMb.append(false);
  766. }
  767. size32_t rowSz;
  768. const void *row = readActivity->nextRow(outBuilder, rowSz);
  769. if (!row)
  770. {
  771. if (!pastFirstRow)
  772. {
  773. eoi = true;
  774. break;
  775. }
  776. else
  777. {
  778. bool eog = true;
  779. responseMb.writeDirect(eogPos, sizeof(eog), &eog);
  780. row = readActivity->nextRow(outBuilder, rowSz);
  781. if (!row)
  782. {
  783. eoi = true;
  784. break;
  785. }
  786. }
  787. }
  788. pastFirstRow = true;
  789. }
  790. while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
  791. }
  792. else
  793. {
  794. do
  795. {
  796. size32_t rowSz;
  797. const void *row = readActivity->nextRow(outBuilder, rowSz);
  798. if (!row)
  799. {
  800. eoi = true;
  801. break;
  802. }
  803. }
  804. while (!handleFull(responseMb, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
  805. }
  806. // Consume any trailing data remaining
  807. if (compressor)
  808. {
  809. size32_t sz = responseMb.length()-dataStartPos;
  810. if (sz)
  811. {
  812. // consumes data built up in responseMb buffer into compressor
  813. totalDataSz += sz;
  814. const void *data = responseMb.bytes()+dataStartPos;
  815. assertex(compressor->write(data, sz) == sz);
  816. responseMb.setLength(dataStartPos);
  817. }
  818. }
  819. // finalize responseMb
  820. dataLenMarker.write(compressor ? totalDataSz : responseMb.length()-dataStartPos);
  821. DelayedSizeMarker cursorLenMarker(responseMb); // cursor length
  822. if (!eoi)
  823. readActivity->serializeCursor(responseMb);
  824. cursorLenMarker.write();
  825. if (compressor)
  826. {
  827. // consume cursor into compressor
  828. size32_t sz = responseMb.length()-dataStartPos;
  829. const void *data = responseMb.bytes()+dataStartPos;
  830. assertex(compressor->write(data, sz) == sz);
  831. compressor->close();
  832. // now ready to swap compressed output into responseMb
  833. responseMb.swapWith(compressMb);
  834. }
  835. }
  836. else
  837. {
  838. responseWriter->outputBeginArray("Row");
  839. if (grouped)
  840. {
  841. bool pastFirstRow = readActivity->queryProcessed()>0;
  842. bool first = true;
  843. do
  844. {
  845. size32_t rowSz;
  846. const void *row = readActivity->nextRow(outBuilder, rowSz);
  847. if (!row)
  848. {
  849. if (!pastFirstRow)
  850. {
  851. eoi = true;
  852. break;
  853. }
  854. else
  855. {
  856. row = readActivity->nextRow(outBuilder, rowSz);
  857. if (!row)
  858. {
  859. eoi = true;
  860. break;
  861. }
  862. if (first) // possible if eog was 1st row on next packet
  863. responseWriter->outputBeginNested("Row", false);
  864. responseWriter->outputBool(true, "dfs:Eog"); // field name cannot clash with an ecl field name
  865. }
  866. }
  867. if (pastFirstRow)
  868. responseWriter->outputEndNested("Row"); // close last row
  869. responseWriter->outputBeginNested("Row", false);
  870. outMeta->toXML((const byte *)row, *responseWriter);
  871. resultBuffer.clear();
  872. pastFirstRow = true;
  873. first = false;
  874. }
  875. while (responseWriter->length() < replyLimit);
  876. if (pastFirstRow)
  877. responseWriter->outputEndNested("Row"); // close last row
  878. }
  879. else
  880. {
  881. do
  882. {
  883. size32_t rowSz;
  884. const void *row = readActivity->nextRow(outBuilder, rowSz);
  885. if (!row)
  886. {
  887. eoi = true;
  888. break;
  889. }
  890. responseWriter->outputBeginNested("Row", false);
  891. outMeta->toXML((const byte *)row, *responseWriter);
  892. responseWriter->outputEndNested("Row");
  893. resultBuffer.clear();
  894. }
  895. while (responseWriter->length() < replyLimit);
  896. }
  897. responseWriter->outputEndArray("Row");
  898. if (!eoi)
  899. {
  900. MemoryBuffer cursorMb;
  901. cursorMb.setEndian(__BIG_ENDIAN);
  902. readActivity->serializeCursor(cursorMb);
  903. StringBuffer cursorBinStr;
  904. JBASE64_Encode(cursorMb.toByteArray(), cursorMb.length(), cursorBinStr);
  905. responseWriter->outputString(cursorBinStr.length(), cursorBinStr.str(), "cursorBin");
  906. }
  907. }
  908. }
  909. void processWrite(IPropertyTree *requestTree, MemoryBuffer &rowDataMb, MemoryBuffer &responseMb)
  910. {
  911. IRemoteWriteActivity *writeActivity = activity->queryIsWriteActivity();
  912. assertex(writeActivity);
  913. /* row data is in serialized disk format already, and do not need to look at individual rows
  914. * so simply dump to disk
  915. */
  916. size32_t rowDataSz;
  917. rowDataMb.read(rowDataSz);
  918. const void *rowData;
  919. if (expander)
  920. {
  921. rowDataSz = expander->init(rowDataMb.readDirect(rowDataSz));
  922. expandMb.clear().reserve(rowDataSz);
  923. expander->expand(expandMb.bufferBase());
  924. rowData = expandMb.bufferBase();
  925. }
  926. else
  927. rowData = rowDataMb.readDirect(rowDataSz);
  928. writeActivity->write(rowDataSz, rowData);
  929. }
  930. public:
  931. CRemoteRequest(int _cursorHandle, OutputFormat _format, ICompressor *_compressor, IExpander *_expander, IRemoteActivity *_activity)
  932. : cursorHandle(_cursorHandle), format(_format), compressor(_compressor), expander(_expander), activity(_activity)
  933. {
  934. if (outFmt_Binary != format)
  935. {
  936. responseWriter.setown(createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == format ? WTStandard : WTJSONObject));
  937. responseWriter->outputBeginNested("Response", true);
  938. if (outFmt_Xml == format)
  939. responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
  940. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  941. }
  942. }
  943. OutputFormat queryFormat() const { return format; }
  944. unsigned __int64 queryReplyLimit() const { return replyLimit; }
  945. IRemoteActivity *queryActivity() const { return activity; }
  946. ICompressor *queryCompressor() const { return compressor; }
  947. void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb)
  948. {
  949. if (requestTree->hasProp("replyLimit"))
  950. replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
  951. if (outFmt_Binary == format)
  952. responseMb.append(cursorHandle);
  953. else // outFmt_Xml || outFmt_Json
  954. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  955. if (requestTree->hasProp("cursorBin")) // use handle if one provided
  956. {
  957. MemoryBuffer cursorMb;
  958. cursorMb.setEndian(__BIG_ENDIAN);
  959. JBASE64_Decode(requestTree->queryProp("cursorBin"), cursorMb);
  960. activity->restoreCursor(cursorMb);
  961. }
  962. if (activity->queryIsReadActivity())
  963. processRead(requestTree, responseMb);
  964. else if (activity->queryIsWriteActivity())
  965. processWrite(requestTree, restMb, responseMb);
  966. if (outFmt_Binary != format)
  967. {
  968. responseWriter->outputEndNested("Response");
  969. responseWriter->finalize();
  970. PROGLOG("Response: %s", responseWriter->str());
  971. responseMb.append(responseWriter->length(), responseWriter->str());
  972. }
  973. }
  974. };
  975. enum OpenFileFlag { of_null=0x0, of_key=0x01 };
  976. struct OpenFileInfo
  977. {
  978. OpenFileInfo() { }
  979. OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
  980. OpenFileInfo(int _handle, CRemoteRequest *_remoteRequest, StringAttrItem *_filename)
  981. : handle(_handle), remoteRequest(_remoteRequest), filename(_filename) { }
  982. Linked<IFileIO> fileIO;
  983. Linked<CRemoteRequest> remoteRequest;
  984. Linked<StringAttrItem> filename; // for debug
  985. int handle = 0;
  986. unsigned flags = 0;
  987. };
  988. static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName, bool grouped)
  989. {
  990. IPropertyTree *json = actNode.queryPropTree(typePropName);
  991. if (json)
  992. return createTypeInfoOutputMetaData(*json, grouped);
  993. else
  994. {
  995. StringBuffer binTypePropName(typePropName);
  996. const char *jsonBin = actNode.queryProp(binTypePropName.append("Bin"));
  997. if (!jsonBin)
  998. return nullptr;
  999. MemoryBuffer mb;
  1000. JBASE64_Decode(jsonBin, mb);
  1001. return createTypeInfoOutputMetaData(mb, grouped);
  1002. }
  1003. }
  1004. class CRemoteDiskBaseActivity : public CSimpleInterfaceOf<IRemoteReadActivity>, implements IVirtualFieldCallback
  1005. {
  1006. typedef CSimpleInterfaceOf<IRemoteReadActivity> PARENT;
  1007. protected:
  1008. StringAttr fileName; // physical filename
  1009. Linked<IOutputMetaData> inMeta, outMeta;
  1010. unsigned __int64 processed = 0;
  1011. bool outputGrouped = false;
  1012. bool opened = false;
  1013. bool eofSeen = false;
  1014. const RtlRecord *record = nullptr;
  1015. RowFilter filters;
  1016. RtlDynRow *filterRow = nullptr;
  1017. // virtual field values
  1018. StringAttr logicalFilename;
  1019. unsigned numInputFields = 0;
  1020. inline bool fieldFilterMatch(const void * buffer)
  1021. {
  1022. if (filterRow)
  1023. {
  1024. filterRow->setRow(buffer, filters.getNumFieldsRequired());
  1025. return filters.matches(*filterRow);
  1026. }
  1027. else
  1028. return true;
  1029. }
  1030. public:
  1031. IMPLEMENT_IINTERFACE_USING(PARENT);
  1032. CRemoteDiskBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
  1033. {
  1034. fileName.set(config.queryProp("fileName"));
  1035. if (isEmptyString(fileName))
  1036. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteDiskBaseActivity: fileName missing");
  1037. logicalFilename.set(config.queryProp("virtualFields/logicalFilename"));
  1038. }
  1039. ~CRemoteDiskBaseActivity()
  1040. {
  1041. delete filterRow;
  1042. }
  1043. void setupInputMeta(const IPropertyTree &config, IOutputMetaData *_inMeta)
  1044. {
  1045. inMeta.setown(_inMeta);
  1046. record = &inMeta->queryRecordAccessor(true);
  1047. numInputFields = record->getNumFields();
  1048. if (config.hasProp("keyFilter"))
  1049. {
  1050. filterRow = new RtlDynRow(*record);
  1051. Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
  1052. ForEach(*filterIter)
  1053. filters.addFilter(*record, filterIter->query().queryProp(nullptr));
  1054. }
  1055. }
  1056. // IRemoteReadActivity impl.
  1057. virtual unsigned __int64 queryProcessed() const override
  1058. {
  1059. return processed;
  1060. }
  1061. virtual IOutputMetaData *queryOutputMeta() const override
  1062. {
  1063. return outMeta;
  1064. }
  1065. virtual bool isGrouped() const override
  1066. {
  1067. return outputGrouped;
  1068. }
  1069. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1070. {
  1071. throwUnexpected();
  1072. }
  1073. virtual void restoreCursor(MemoryBuffer &src) override
  1074. {
  1075. throwUnexpected();
  1076. }
  1077. virtual IRemoteReadActivity *queryIsReadActivity()
  1078. {
  1079. return this;
  1080. }
  1081. virtual bool requiresPostProject() const override
  1082. {
  1083. return false;
  1084. }
  1085. //interface IVirtualFieldCallback
  1086. virtual const char * queryLogicalFilename(const void * row) override
  1087. {
  1088. return logicalFilename.str();
  1089. }
  1090. virtual unsigned __int64 getFilePosition(const void * row) override
  1091. {
  1092. throwUnexpected();
  1093. }
  1094. virtual unsigned __int64 getLocalFilePosition(const void * row) override
  1095. {
  1096. throwUnexpected();
  1097. }
  1098. virtual const byte * lookupBlob(unsigned __int64 id) override
  1099. {
  1100. throwUnexpected();
  1101. }
  1102. };
  1103. class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity
  1104. {
  1105. typedef CRemoteDiskBaseActivity PARENT;
  1106. protected:
  1107. Owned<ISerialStream> inputStream;
  1108. Owned<IFileIO> iFileIO;
  1109. unsigned __int64 chooseN = 0;
  1110. unsigned __int64 startPos = 0;
  1111. bool compressed = false;
  1112. bool cursorDirty = false;
  1113. // virtual field values
  1114. unsigned partNum = 0;
  1115. offset_t baseFpos = 0;
  1116. virtual bool refreshCursor()
  1117. {
  1118. if (inputStream->tell() != startPos)
  1119. {
  1120. inputStream->reset(startPos);
  1121. return true;
  1122. }
  1123. return false;
  1124. }
  1125. bool checkOpen() // NB: returns true if this call opened file
  1126. {
  1127. if (opened)
  1128. {
  1129. if (!cursorDirty)
  1130. return false;
  1131. refreshCursor();
  1132. eofSeen = false;
  1133. cursorDirty = false;
  1134. return false;
  1135. }
  1136. cursorDirty = false;
  1137. OwnedIFile iFile = createIFile(fileName);
  1138. assertex(iFile);
  1139. iFileIO.setown(createCompressedFileReader(iFile));
  1140. if (iFileIO)
  1141. {
  1142. if (!compressed)
  1143. {
  1144. WARNLOG("meta info did not mark file '%s' as compressed, but detected file as compressed", fileName.get());
  1145. compressed = true;
  1146. }
  1147. }
  1148. else
  1149. {
  1150. iFileIO.setown(iFile->open(IFOread));
  1151. if (!iFileIO)
  1152. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Failed to open: '%s'", fileName.get());
  1153. if (compressed)
  1154. {
  1155. WARNLOG("meta info marked file '%s' as compressed, but detected file as uncompressed", fileName.get());
  1156. compressed = false;
  1157. }
  1158. }
  1159. inputStream.setown(createFileSerialStream(iFileIO, startPos));
  1160. opened = true;
  1161. eofSeen = false;
  1162. return true;
  1163. }
  1164. void close()
  1165. {
  1166. iFileIO.clear();
  1167. opened = false;
  1168. eofSeen = true;
  1169. }
  1170. public:
  1171. CRemoteStreamReadBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1172. {
  1173. compressed = config.getPropBool("compressed");
  1174. chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
  1175. partNum = config.getPropInt("virtualFields/partNum");
  1176. baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
  1177. }
  1178. // IVirtualFieldCallback impl.
  1179. virtual unsigned __int64 getFilePosition(const void * row) override
  1180. {
  1181. return inputStream->tell() + baseFpos;
  1182. }
  1183. virtual unsigned __int64 getLocalFilePosition(const void * row) override
  1184. {
  1185. return makeLocalFposOffset(partNum, inputStream->tell());
  1186. }
  1187. };
  1188. class CRemoteDiskReadActivity : public CRemoteStreamReadBaseActivity
  1189. {
  1190. typedef CRemoteStreamReadBaseActivity PARENT;
  1191. CThorContiguousRowBuffer prefetchBuffer;
  1192. Owned<ISourceRowPrefetcher> prefetcher;
  1193. bool inputGrouped = false;
  1194. bool eogPending = false;
  1195. bool someInGroup = false;
  1196. Owned<const IDynamicTransform> translator;
  1197. virtual bool refreshCursor() override
  1198. {
  1199. if (prefetchBuffer.tell() != startPos)
  1200. {
  1201. inputStream->reset(startPos);
  1202. prefetchBuffer.clearStream();
  1203. prefetchBuffer.setStream(inputStream);
  1204. return true;
  1205. }
  1206. return false;
  1207. }
  1208. bool checkOpen()
  1209. {
  1210. if (!PARENT::checkOpen()) // returns true if it opened file
  1211. return false;
  1212. prefetchBuffer.setStream(inputStream);
  1213. prefetcher.setown(inMeta->createDiskPrefetcher());
  1214. return true;
  1215. }
  1216. public:
  1217. CRemoteDiskReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc), prefetchBuffer(nullptr)
  1218. {
  1219. inputGrouped = config.getPropBool("inputGrouped", false);
  1220. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", inputGrouped));
  1221. outputGrouped = config.getPropBool("outputGrouped", false);
  1222. if (!inputGrouped && outputGrouped)
  1223. outputGrouped = false; // perhaps should fire error
  1224. outMeta.setown(getTypeInfoOutputMetaData(config, "output", outputGrouped));
  1225. if (!outMeta)
  1226. outMeta.set(inMeta);
  1227. translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
  1228. }
  1229. // IRemoteReadActivity impl.
  1230. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1231. {
  1232. if (eogPending || eofSeen)
  1233. {
  1234. eogPending = false;
  1235. someInGroup = false;
  1236. retSz = 0;
  1237. return nullptr;
  1238. }
  1239. checkOpen();
  1240. while (!eofSeen && (processed < chooseN))
  1241. {
  1242. while (!prefetchBuffer.eos())
  1243. {
  1244. prefetcher->readAhead(prefetchBuffer);
  1245. size32_t inputRowSz = prefetchBuffer.queryRowSize();
  1246. bool eog = false;
  1247. if (inputGrouped)
  1248. {
  1249. prefetchBuffer.skip(sizeof(eog));
  1250. if (outputGrouped)
  1251. {
  1252. byte b = *(prefetchBuffer.queryRow()+inputRowSz);
  1253. memcpy(&eog, prefetchBuffer.queryRow()+inputRowSz, sizeof(eog));
  1254. }
  1255. }
  1256. const byte *next = prefetchBuffer.queryRow();
  1257. size32_t rowSz; // use local var instead of reference param for efficiency
  1258. if (fieldFilterMatch(next))
  1259. rowSz = translator->translate(outBuilder, *this, next);
  1260. else
  1261. rowSz = 0;
  1262. prefetchBuffer.finishedRow();
  1263. const void *ret = outBuilder.getSelf();
  1264. outBuilder.finishRow(rowSz);
  1265. if (rowSz)
  1266. {
  1267. processed++;
  1268. eogPending = eog;
  1269. someInGroup = true;
  1270. retSz = rowSz;
  1271. return ret;
  1272. }
  1273. else if (eog)
  1274. {
  1275. eogPending = false;
  1276. if (someInGroup)
  1277. {
  1278. someInGroup = false;
  1279. return nullptr;
  1280. }
  1281. }
  1282. }
  1283. eofSeen = true;
  1284. }
  1285. close();
  1286. retSz = 0;
  1287. return nullptr;
  1288. }
  1289. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1290. {
  1291. tgt.append(prefetchBuffer.tell());
  1292. tgt.append(processed);
  1293. tgt.append(someInGroup);
  1294. tgt.append(eogPending);
  1295. }
  1296. virtual void restoreCursor(MemoryBuffer &src) override
  1297. {
  1298. cursorDirty = true;
  1299. src.read(startPos);
  1300. src.read(processed);
  1301. src.read(someInGroup);
  1302. src.read(eogPending);
  1303. }
  1304. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1305. {
  1306. return out.appendf("diskread[%s]", fileName.get());
  1307. }
  1308. //interface IVirtualFieldCallback
  1309. virtual unsigned __int64 getFilePosition(const void * row) override
  1310. {
  1311. return prefetchBuffer.tell() + baseFpos;
  1312. }
  1313. };
  1314. class CRemoteExternalFormatReadActivity : public CRemoteStreamReadBaseActivity
  1315. {
  1316. typedef CRemoteStreamReadBaseActivity PARENT;
  1317. protected:
  1318. Owned<const IDynamicFieldValueFetcher> fieldFetcher;
  1319. Owned<const IDynamicTransform> translator;
  1320. bool postProject = false;
  1321. public:
  1322. CRemoteExternalFormatReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1323. {
  1324. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
  1325. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1326. const RtlRecord *outRecord = record;
  1327. if (filterRow)
  1328. {
  1329. if (outMeta)
  1330. postProject = true;
  1331. outMeta.set(inMeta);
  1332. }
  1333. else
  1334. {
  1335. if (outMeta)
  1336. outRecord = &outMeta->queryRecordAccessor(true);
  1337. else
  1338. outMeta.set(inMeta);
  1339. }
  1340. translator.setown(createRecordTranslatorViaCallback(*outRecord, *record));
  1341. }
  1342. virtual bool requiresPostProject() const override
  1343. {
  1344. return postProject;
  1345. }
  1346. };
  1347. class CNullNestedRowIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
  1348. {
  1349. public:
  1350. virtual bool first() override { return false; }
  1351. virtual bool next() override { return false; }
  1352. virtual bool isValid() override { return false; }
  1353. virtual IDynamicFieldValueFetcher &query() override
  1354. {
  1355. throwUnexpected();
  1356. }
  1357. } nullNestedRowIterator;
  1358. class CRemoteCsvReadActivity : public CRemoteExternalFormatReadActivity
  1359. {
  1360. typedef CRemoteExternalFormatReadActivity PARENT;
  1361. StringBuffer csvQuote, csvSeparate, csvTerminate, csvEscape;
  1362. unsigned __int64 headerLines = 0;
  1363. unsigned __int64 maxRowSize = 0;
  1364. bool preserveWhitespace = false;
  1365. CSVSplitter csvSplitter;
  1366. class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
  1367. {
  1368. CSVSplitter &csvSplitter;
  1369. unsigned numInputFields;
  1370. public:
  1371. CFieldFetcher(CSVSplitter &_csvSplitter, unsigned _numInputFields) : csvSplitter(_csvSplitter), numInputFields(_numInputFields)
  1372. {
  1373. }
  1374. virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
  1375. {
  1376. dbgassertex(fieldNum < numInputFields);
  1377. sz = csvSplitter.queryLengths()[fieldNum];
  1378. return csvSplitter.queryData()[fieldNum];
  1379. }
  1380. virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
  1381. {
  1382. return LINK(&nullNestedRowIterator);
  1383. }
  1384. virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
  1385. virtual size32_t getRecordSize() const override { throwUnexpected(); }
  1386. };
  1387. bool checkOpen()
  1388. {
  1389. if (!PARENT::checkOpen())
  1390. return false;
  1391. csvSplitter.init(numInputFields, maxRowSize, csvQuote, csvSeparate, csvTerminate, csvEscape, preserveWhitespace);
  1392. if (headerLines)
  1393. {
  1394. do
  1395. {
  1396. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  1397. if (0 == lineLength)
  1398. break;
  1399. inputStream->skip(lineLength);
  1400. }
  1401. while (--headerLines);
  1402. }
  1403. if (!fieldFetcher)
  1404. fieldFetcher.setown(new CFieldFetcher(csvSplitter, numInputFields));
  1405. return true;
  1406. }
  1407. const unsigned defaultMaxCsvRowSize = 10; // MB
  1408. public:
  1409. CRemoteCsvReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1410. {
  1411. maxRowSize = config.getPropInt64("ActivityOptions/maxRowSize", defaultMaxCsvRowSize) * 1024 * 1024;
  1412. preserveWhitespace = config.getPropBool("ActivityOptions/preserveWhitespace");
  1413. if (!config.getProp("ActivityOptions/csvQuote", csvQuote))
  1414. {
  1415. if (!fileDesc->queryProperties().getProp("@csvQuote", csvQuote))
  1416. csvQuote.append("\"");
  1417. }
  1418. if (!config.getProp("ActivityOptions/csvSeparate", csvSeparate))
  1419. {
  1420. if (!fileDesc->queryProperties().getProp("@csvSeparate", csvSeparate))
  1421. csvSeparate.append("\\,");
  1422. }
  1423. if (!config.getProp("ActivityOptions/csvTerminate", csvTerminate))
  1424. {
  1425. if (!fileDesc->queryProperties().getProp("@csvTerminate", csvTerminate))
  1426. csvTerminate.append("\\n,\\r\\n");
  1427. }
  1428. if (!config.getProp("ActivityOptions/csvEscape", csvEscape))
  1429. fileDesc->queryProperties().getProp("@csvEscape", csvEscape);
  1430. headerLines = config.getPropInt64("ActivityOptions/headerLines"); // really this should be a published attribute too
  1431. }
  1432. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1433. {
  1434. return out.appendf("csvread[%s]", fileName.get());
  1435. }
  1436. // IRemoteReadActivity impl.
  1437. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1438. {
  1439. if (eofSeen)
  1440. {
  1441. retSz = 0;
  1442. return nullptr;
  1443. }
  1444. checkOpen();
  1445. while (!eofSeen && (processed < chooseN))
  1446. {
  1447. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  1448. if (!lineLength)
  1449. break;
  1450. retSz = translator->translate(outBuilder, *this, *fieldFetcher);
  1451. dbgassertex(retSz);
  1452. const void *ret = outBuilder.getSelf();
  1453. if (fieldFilterMatch(ret))
  1454. {
  1455. outBuilder.finishRow(retSz);
  1456. ++processed;
  1457. inputStream->skip(lineLength);
  1458. return ret;
  1459. }
  1460. else
  1461. outBuilder.removeBytes(retSz);
  1462. inputStream->skip(lineLength);
  1463. }
  1464. eofSeen = true;
  1465. close();
  1466. retSz = 0;
  1467. return nullptr;
  1468. }
  1469. };
  1470. class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, implements IXMLSelect
  1471. {
  1472. typedef CRemoteExternalFormatReadActivity PARENT;
  1473. ThorActivityKind kind;
  1474. IXmlToRowTransformer *xmlTransformer;
  1475. Linked<IColumnProvider> lastMatch;
  1476. Owned<IXMLParse> xmlParser;
  1477. bool noRoot = false;
  1478. bool useXmlContents = false;
  1479. // JCSMORE - it would be good if these were cached/reused (can I assume anything using fetcher is single threaded?)
  1480. class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
  1481. {
  1482. unsigned numInputFields;
  1483. const RtlRecord &recInfo;
  1484. Linked<IColumnProvider> currentMatch;
  1485. const char **compoundXPaths = nullptr;
  1486. const char *queryCompoundXPath(unsigned fieldNum) const
  1487. {
  1488. if (compoundXPaths && compoundXPaths[fieldNum])
  1489. return compoundXPaths[fieldNum];
  1490. else
  1491. return recInfo.queryXPath(fieldNum);
  1492. }
  1493. public:
  1494. CFieldFetcher(const RtlRecord &_recInfo, IColumnProvider *_currentMatch) : recInfo(_recInfo), currentMatch(_currentMatch)
  1495. {
  1496. numInputFields = recInfo.getNumFields();
  1497. // JCSMORE - should this be done (optionally) when RtlRecord is created?
  1498. for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
  1499. {
  1500. if (recInfo.queryType(fieldNum)->queryChildType())
  1501. {
  1502. const char *xpath = recInfo.queryXPath(fieldNum);
  1503. dbgassertex(xpath);
  1504. const char *ptr = xpath;
  1505. char *expandedXPath = nullptr;
  1506. char *expandedXPathPtr = nullptr;
  1507. while (true)
  1508. {
  1509. if (*ptr == xpathCompoundSeparatorChar)
  1510. {
  1511. if (!compoundXPaths)
  1512. {
  1513. compoundXPaths = new const char *[numInputFields];
  1514. memset(compoundXPaths, 0, sizeof(const char *)*numInputFields);
  1515. }
  1516. size_t sz = strlen(xpath)+1;
  1517. expandedXPath = new char[sz];
  1518. expandedXPathPtr = expandedXPath;
  1519. if (ptr == xpath) // if leading char, just skip
  1520. ++ptr;
  1521. else
  1522. {
  1523. size32_t len = ptr-xpath;
  1524. memcpy(expandedXPath, xpath, len);
  1525. expandedXPathPtr = expandedXPath + len;
  1526. *expandedXPathPtr++ = '/';
  1527. ++ptr;
  1528. }
  1529. while (*ptr)
  1530. {
  1531. if (*ptr == xpathCompoundSeparatorChar)
  1532. {
  1533. *expandedXPathPtr++ = '/';
  1534. ++ptr;
  1535. }
  1536. else
  1537. *expandedXPathPtr++ = *ptr++;
  1538. }
  1539. }
  1540. else
  1541. ptr++;
  1542. if ('\0' == *ptr)
  1543. {
  1544. if (expandedXPath)
  1545. {
  1546. *expandedXPathPtr = '\0';
  1547. compoundXPaths[fieldNum] = expandedXPath;
  1548. }
  1549. break;
  1550. }
  1551. }
  1552. }
  1553. }
  1554. }
  1555. ~CFieldFetcher()
  1556. {
  1557. if (compoundXPaths)
  1558. {
  1559. for (unsigned fieldNum=0; fieldNum<numInputFields; fieldNum++)
  1560. delete [] compoundXPaths[fieldNum];
  1561. delete [] compoundXPaths;
  1562. }
  1563. }
  1564. void setCurrentMatch(IColumnProvider *_currentMatch)
  1565. {
  1566. currentMatch.set(_currentMatch);
  1567. }
  1568. // IDynamicFieldValueFetcher impl.
  1569. virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
  1570. {
  1571. dbgassertex(fieldNum < numInputFields);
  1572. dbgassertex(currentMatch);
  1573. size32_t rawSz;
  1574. const char *ret = currentMatch->readRaw(recInfo.queryXPath(fieldNum), rawSz);
  1575. sz = rawSz;
  1576. return (const byte *)ret;
  1577. }
  1578. virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
  1579. {
  1580. dbgassertex(fieldNum < numInputFields);
  1581. dbgassertex(currentMatch);
  1582. const RtlRecord *nested = recInfo.queryNested(fieldNum);
  1583. if (!nested)
  1584. return nullptr;
  1585. class CIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
  1586. {
  1587. XmlChildIterator xmlIter;
  1588. Linked<IDynamicFieldValueFetcher> curFieldValueFetcher;
  1589. Linked<IColumnProvider> parentMatch;
  1590. const RtlRecord &nestedRecInfo;
  1591. public:
  1592. CIterator(const RtlRecord &_nestedRecInfo, IColumnProvider *_parentMatch, const char *xpath) : nestedRecInfo(_nestedRecInfo), parentMatch(_parentMatch)
  1593. {
  1594. xmlIter.initOwn(parentMatch->getChildIterator(xpath));
  1595. }
  1596. virtual bool first() override
  1597. {
  1598. IColumnProvider *child = xmlIter.first();
  1599. if (!child)
  1600. {
  1601. curFieldValueFetcher.clear();
  1602. return false;
  1603. }
  1604. curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
  1605. return true;
  1606. }
  1607. virtual bool next() override
  1608. {
  1609. IColumnProvider *child = xmlIter.next();
  1610. if (!child)
  1611. {
  1612. curFieldValueFetcher.clear();
  1613. return false;
  1614. }
  1615. curFieldValueFetcher.setown(new CFieldFetcher(nestedRecInfo, child));
  1616. return true;
  1617. }
  1618. virtual bool isValid() override
  1619. {
  1620. return nullptr != curFieldValueFetcher.get();
  1621. }
  1622. virtual IDynamicFieldValueFetcher &query() override
  1623. {
  1624. assertex(curFieldValueFetcher);
  1625. return *curFieldValueFetcher;
  1626. }
  1627. };
  1628. // JCSMORE - it would be good if these were cached/reused (can I assume anything using parent fetcher is single threaded?)
  1629. return new CIterator(*nested, currentMatch, queryCompoundXPath(fieldNum));
  1630. }
  1631. virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
  1632. virtual size32_t getRecordSize() const override { throwUnexpected(); }
  1633. };
  1634. bool checkOpen()
  1635. {
  1636. if (!PARENT::checkOpen())
  1637. return false;
  1638. class CSimpleStream : public CSimpleInterfaceOf<ISimpleReadStream>
  1639. {
  1640. Linked<ISerialStream> stream;
  1641. public:
  1642. CSimpleStream(ISerialStream *_stream) : stream(_stream)
  1643. {
  1644. }
  1645. // ISimpleReadStream impl.
  1646. virtual size32_t read(size32_t max_len, void * data) override
  1647. {
  1648. size32_t got;
  1649. const void *res = stream->peek(max_len, got);
  1650. if (got)
  1651. {
  1652. if (got>max_len)
  1653. got = max_len;
  1654. memcpy(data, res, got);
  1655. stream->skip(got);
  1656. }
  1657. return got;
  1658. }
  1659. };
  1660. Owned<ISimpleReadStream> simpleStream = new CSimpleStream(inputStream);
  1661. if (kind==TAKjsonread)
  1662. xmlParser.setown(createJSONParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
  1663. else
  1664. xmlParser.setown(createXMLParse(*simpleStream, xpath, *this, noRoot?ptr_noRoot:ptr_none, useXmlContents));
  1665. if (!fieldFetcher)
  1666. fieldFetcher.setown(new CFieldFetcher(*record, nullptr));
  1667. return true;
  1668. }
  1669. protected:
  1670. StringBuffer xpath;
  1671. StringBuffer customRowTag;
  1672. public:
  1673. IMPLEMENT_IINTERFACE_USING(PARENT);
  1674. CRemoteMarkupReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc, ThorActivityKind _kind) : PARENT(config, fileDesc), kind(_kind)
  1675. {
  1676. config.getProp("ActivityOptions/rowTag", customRowTag);
  1677. noRoot = config.getPropBool("noRoot");
  1678. }
  1679. IColumnProvider *queryMatch() const { return lastMatch; }
  1680. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1681. {
  1682. return out.appendf("%s[%s]", getActivityText(kind), fileName.get());
  1683. }
  1684. // IRemoteReadActivity impl.
  1685. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1686. {
  1687. if (eofSeen)
  1688. {
  1689. retSz = 0;
  1690. return nullptr;
  1691. }
  1692. checkOpen();
  1693. while (xmlParser->next())
  1694. {
  1695. if (lastMatch)
  1696. {
  1697. ((CFieldFetcher *)fieldFetcher.get())->setCurrentMatch(lastMatch);
  1698. retSz = translator->translate(outBuilder, *this, *fieldFetcher);
  1699. dbgassertex(retSz);
  1700. lastMatch.clear();
  1701. const void *ret = outBuilder.getSelf();
  1702. if (fieldFilterMatch(ret))
  1703. {
  1704. outBuilder.finishRow(retSz);
  1705. ++processed;
  1706. return ret;
  1707. }
  1708. else
  1709. outBuilder.removeBytes(retSz);
  1710. }
  1711. }
  1712. eofSeen = true;
  1713. close();
  1714. retSz = 0;
  1715. return nullptr;
  1716. }
  1717. // IXMLSelect impl.
  1718. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  1719. {
  1720. lastMatch.set(&entry);
  1721. }
  1722. };
  1723. class CRemoteXmlReadActivity : public CRemoteMarkupReadActivity
  1724. {
  1725. typedef CRemoteMarkupReadActivity PARENT;
  1726. public:
  1727. CRemoteXmlReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKxmlread)
  1728. {
  1729. xpath.set("/Dataset/");
  1730. if (customRowTag.isEmpty()) // no override
  1731. fileDesc->queryProperties().getProp("@rowTag", xpath);
  1732. else
  1733. xpath.append(customRowTag);
  1734. }
  1735. };
  1736. class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
  1737. {
  1738. typedef CRemoteMarkupReadActivity PARENT;
  1739. public:
  1740. CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
  1741. {
  1742. xpath.set("/");
  1743. if (customRowTag.isEmpty()) // no override
  1744. fileDesc->queryProperties().getProp("@rowTag", xpath);
  1745. else
  1746. xpath.append(customRowTag);
  1747. }
  1748. };
  1749. /* A IRemoteReadActivity that projects to output format
  1750. * Created if input activity requires filtering, but it must 1st translate from external format to the actual format
  1751. * NB: processor, grouped and cursor are same as input.
  1752. */
  1753. class CRemoteCompoundReadProjectActivity : public CSimpleInterfaceOf<IRemoteReadActivity>
  1754. {
  1755. Linked<IRemoteReadActivity> input;
  1756. Owned<IOutputMetaData> outMeta;
  1757. Owned<const IDynamicTransform> translator;
  1758. UnexpectedVirtualFieldCallback fieldCallback;
  1759. MemoryBuffer inputRowMb;
  1760. MemoryBufferBuilder *inputRowBuilder;
  1761. public:
  1762. CRemoteCompoundReadProjectActivity(IPropertyTree &config, IRemoteReadActivity *_input) : input(_input)
  1763. {
  1764. IOutputMetaData *inMeta = input->queryOutputMeta();
  1765. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1766. dbgassertex(outMeta);
  1767. const RtlRecord &inRecord = inMeta->queryRecordAccessor(true);
  1768. const RtlRecord &outRecord = outMeta->queryRecordAccessor(true);
  1769. translator.setown(createRecordTranslator(outRecord, inRecord));
  1770. inputRowBuilder = new MemoryBufferBuilder(inputRowMb, inMeta->getMinRecordSize());
  1771. }
  1772. ~CRemoteCompoundReadProjectActivity()
  1773. {
  1774. delete inputRowBuilder;
  1775. }
  1776. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1777. {
  1778. return input->getInfoStr(out).append(" - CompoundProject");
  1779. }
  1780. // IRemoteReadActivity impl.
  1781. virtual unsigned __int64 queryProcessed() const override
  1782. {
  1783. return input->queryProcessed();
  1784. }
  1785. virtual IOutputMetaData *queryOutputMeta() const override
  1786. {
  1787. return outMeta;
  1788. }
  1789. virtual bool isGrouped() const override
  1790. {
  1791. return input->isGrouped();
  1792. }
  1793. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1794. {
  1795. input->serializeCursor(tgt);
  1796. }
  1797. virtual void restoreCursor(MemoryBuffer &src) override
  1798. {
  1799. input->restoreCursor(src);
  1800. }
  1801. virtual IRemoteReadActivity *queryIsReadActivity()
  1802. {
  1803. return this;
  1804. }
  1805. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1806. {
  1807. size32_t rowSz;
  1808. const void *row = input->nextRow(*inputRowBuilder, rowSz);
  1809. if (!row)
  1810. {
  1811. retSz = 0;
  1812. return nullptr;
  1813. }
  1814. retSz = translator->translate(outBuilder, fieldCallback, (const byte *)row);
  1815. const void *ret = outBuilder.getSelf();
  1816. outBuilder.finishRow(retSz);
  1817. return ret;
  1818. }
  1819. virtual bool requiresPostProject() const override
  1820. {
  1821. return false;
  1822. }
  1823. };
  1824. class CRemoteIndexBaseActivity : public CRemoteDiskBaseActivity
  1825. {
  1826. typedef CRemoteDiskBaseActivity PARENT;
  1827. protected:
  1828. bool isTlk = false;
  1829. bool allowPreload = false;
  1830. unsigned fileCrc = 0;
  1831. Owned<IKeyIndex> keyIndex;
  1832. Owned<IKeyManager> keyManager;
  1833. void checkOpen()
  1834. {
  1835. if (opened)
  1836. return;
  1837. Owned<IFile> indexFile = createIFile(fileName);
  1838. CDateTime modTime;
  1839. indexFile->getTime(nullptr, &modTime, nullptr);
  1840. time_t modTimeTT = modTime.getSimple();
  1841. CRC32 crc32(fileCrc);
  1842. crc32.tally(sizeof(time_t), &modTimeTT);
  1843. unsigned crc = crc32.get();
  1844. keyIndex.setown(createKeyIndex(fileName, crc, isTlk, allowPreload));
  1845. keyManager.setown(createLocalKeyManager(*record, keyIndex, nullptr, true, false));
  1846. filters.createSegmentMonitors(keyManager);
  1847. keyManager->finishSegmentMonitors();
  1848. keyManager->reset();
  1849. opened = true;
  1850. }
  1851. void close()
  1852. {
  1853. keyManager.clear();
  1854. keyIndex.clear();
  1855. opened = false;
  1856. eofSeen = true;
  1857. }
  1858. public:
  1859. CRemoteIndexBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1860. {
  1861. setupInputMeta(config, getTypeInfoOutputMetaData(config, "input", false));
  1862. isTlk = config.getPropBool("isTlk");
  1863. allowPreload = config.getPropBool("allowPreload");
  1864. fileCrc = config.getPropInt("crc");
  1865. }
  1866. };
  1867. class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity
  1868. {
  1869. typedef CRemoteIndexBaseActivity PARENT;
  1870. Owned<const IDynamicTransform> translator;
  1871. unsigned __int64 chooseN = 0;
  1872. public:
  1873. CRemoteIndexReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  1874. {
  1875. chooseN = config.getPropInt64("chooseN", defaultFileStreamChooseNLimit);
  1876. outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
  1877. if (outMeta)
  1878. translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
  1879. else
  1880. outMeta.set(inMeta);
  1881. }
  1882. // IRemoteReadActivity impl.
  1883. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  1884. {
  1885. if (eofSeen)
  1886. {
  1887. retSz = 0;
  1888. return nullptr;
  1889. }
  1890. checkOpen();
  1891. if (!eofSeen)
  1892. {
  1893. if (processed < chooseN)
  1894. {
  1895. while (keyManager->lookup(true))
  1896. {
  1897. const byte *keyRow = keyManager->queryKeyBuffer();
  1898. if (fieldFilterMatch(keyRow))
  1899. {
  1900. if (translator)
  1901. retSz = translator->translate(outBuilder, *this, keyRow);
  1902. else
  1903. {
  1904. retSz = keyManager->queryRowSize();
  1905. outBuilder.ensureCapacity(retSz, nullptr);
  1906. memcpy(outBuilder.getSelf(), keyRow, retSz);
  1907. }
  1908. dbgassertex(retSz);
  1909. const void *ret = outBuilder.getSelf();
  1910. outBuilder.finishRow(retSz);
  1911. ++processed;
  1912. return ret;
  1913. }
  1914. }
  1915. retSz = 0;
  1916. }
  1917. eofSeen = true;
  1918. }
  1919. close();
  1920. return nullptr;
  1921. }
  1922. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1923. {
  1924. keyManager->serializeCursorPos(tgt);
  1925. tgt.append(processed);
  1926. /* JCSMORE (see HPCC-19640), serialize seek/scan data to client
  1927. tgt.append(keyManager->querySeeks());
  1928. tgt.append(keyManager->queryScans());
  1929. */
  1930. }
  1931. virtual void restoreCursor(MemoryBuffer &src) override
  1932. {
  1933. checkOpen();
  1934. eofSeen = false;
  1935. keyManager->deserializeCursorPos(src);
  1936. src.read(processed);
  1937. }
  1938. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1939. {
  1940. return out.appendf("indexread[%s]", fileName.get());
  1941. }
  1942. };
  1943. class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
  1944. {
  1945. protected:
  1946. StringAttr fileName; // physical filename
  1947. Linked<IOutputMetaData> meta;
  1948. unsigned __int64 processed = 0;
  1949. bool opened = false;
  1950. bool eofSeen = false;
  1951. Owned<IFileIO> iFileIO;
  1952. bool grouped = false;
  1953. void close()
  1954. {
  1955. iFileIO.clear();
  1956. opened = false;
  1957. eofSeen = true;
  1958. }
  1959. public:
  1960. CRemoteWriteBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc)
  1961. {
  1962. fileName.set(config.queryProp("fileName"));
  1963. if (isEmptyString(fileName))
  1964. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteWriteBaseActivity: fileName missing");
  1965. grouped = config.getPropBool("inputGrouped");
  1966. meta.setown(getTypeInfoOutputMetaData(config, "input", grouped));
  1967. }
  1968. ~CRemoteWriteBaseActivity()
  1969. {
  1970. }
  1971. // IRemoteWriteActivity impl.
  1972. virtual unsigned __int64 queryProcessed() const override
  1973. {
  1974. return processed;
  1975. }
  1976. virtual IOutputMetaData *queryOutputMeta() const override
  1977. {
  1978. return meta;
  1979. }
  1980. virtual bool isGrouped() const override
  1981. {
  1982. return grouped;
  1983. }
  1984. virtual void serializeCursor(MemoryBuffer &tgt) const override
  1985. {
  1986. throwUnexpected();
  1987. }
  1988. virtual void restoreCursor(MemoryBuffer &src) override
  1989. {
  1990. throwUnexpected();
  1991. }
  1992. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  1993. {
  1994. return out.appendf("diskwrite[%s]", fileName.get());
  1995. }
  1996. virtual void write(size32_t sz, const void *rowData) override
  1997. {
  1998. throwUnexpected(); // method should be implemented in derived classes.
  1999. }
  2000. virtual IRemoteWriteActivity *queryIsWriteActivity()
  2001. {
  2002. return this;
  2003. }
  2004. };
  2005. class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
  2006. {
  2007. typedef CRemoteWriteBaseActivity PARENT;
  2008. unsigned compressionFormat = 0;
  2009. bool eogPending = false;
  2010. bool someInGroup = false;
  2011. size32_t recordSize = 0;
  2012. Owned<IFileIOStream> iFileIOStream;
  2013. bool append = false;
  2014. void checkOpen()
  2015. {
  2016. if (opened)
  2017. return;
  2018. if (!recursiveCreateDirectoryForFile(fileName))
  2019. throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get());
  2020. OwnedIFile iFile = createIFile(fileName);
  2021. assertex(iFile);
  2022. /* NB: if concurrent writers were supported, then would need mutex here, during open/create
  2023. * multiple activities, each with there own handle would be possible, with mutex during write.
  2024. * Would need mutex per physical filename active.
  2025. */
  2026. if (compressionFormat)
  2027. iFileIO.setown(createCompressedFileWriter(iFile, recordSize, append, true, nullptr, compressionFormat));
  2028. else
  2029. {
  2030. iFileIO.setown(iFile->open(append ? IFOwrite : IFOcreate));
  2031. if (!iFileIO)
  2032. throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to open: '%s' for write", fileName.get());
  2033. }
  2034. iFileIOStream.setown(createIOStream(iFileIO));
  2035. if (append)
  2036. iFileIOStream->seek(0, IFSend);
  2037. opened = true;
  2038. eofSeen = false;
  2039. }
  2040. public:
  2041. CRemoteDiskWriteActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  2042. {
  2043. const char *compressed = config.queryProp("compressed"); // the compression format for the serialized rows in the transport
  2044. if (!isEmptyString(compressed))
  2045. {
  2046. // boolean or format allowed
  2047. if (strieq("true", compressed))
  2048. compressionFormat = translateToCompMethod(nullptr); // gets default
  2049. else if (strieq("false", compressed))
  2050. compressionFormat = 0;
  2051. else
  2052. compressionFormat = translateToCompMethod(compressed);
  2053. }
  2054. append = config.getPropBool("append");
  2055. }
  2056. virtual void write(size32_t sz, const void *rowData) override
  2057. {
  2058. checkOpen();
  2059. iFileIOStream->write(sz, rowData);
  2060. }
  2061. virtual void serializeCursor(MemoryBuffer &tgt) const override
  2062. {
  2063. tgt.append(iFileIOStream->tell());
  2064. }
  2065. virtual void restoreCursor(MemoryBuffer &src) override
  2066. {
  2067. offset_t pos;
  2068. src.read(pos);
  2069. checkOpen();
  2070. iFileIOStream->seek(pos, IFSbegin);
  2071. }
  2072. };
  2073. // create a { unsigned8 } output meta for the count
  2074. static const RtlIntTypeInfo indexCountFieldType(type_unsigned|type_int, 8);
  2075. static const RtlFieldStrInfo indexCountField("count", nullptr, &indexCountFieldType);
  2076. static const RtlFieldInfo * const indexCountFields[2] = { &indexCountField, nullptr };
  2077. static const RtlRecordTypeInfo indexCountRecord(type_record, 2, indexCountFields);
  2078. class CRemoteIndexCountActivity : public CRemoteIndexBaseActivity
  2079. {
  2080. typedef CRemoteIndexBaseActivity PARENT;
  2081. unsigned __int64 rowLimit = 0;
  2082. public:
  2083. CRemoteIndexCountActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
  2084. {
  2085. rowLimit = config.getPropInt64("chooseN");
  2086. outMeta.setown(new CDynamicOutputMetaData(indexCountRecord));
  2087. }
  2088. // IRemoteReadActivity impl.
  2089. virtual const void *nextRow(MemoryBufferBuilder &outBuilder, size32_t &retSz) override
  2090. {
  2091. if (eofSeen)
  2092. {
  2093. retSz = 0;
  2094. return nullptr;
  2095. }
  2096. checkOpen();
  2097. unsigned __int64 count = 0;
  2098. if (!eofSeen)
  2099. {
  2100. if (rowLimit)
  2101. count = keyManager->checkCount(rowLimit);
  2102. else
  2103. count = keyManager->getCount();
  2104. }
  2105. void *tgt = outBuilder.ensureCapacity(sizeof(count), "count");
  2106. const void *ret = outBuilder.getSelf();
  2107. memcpy(tgt, &count, sizeof(count));
  2108. outBuilder.finishRow(sizeof(count));
  2109. close();
  2110. return ret;
  2111. }
  2112. virtual StringBuffer &getInfoStr(StringBuffer &out) const override
  2113. {
  2114. return out.appendf("indexcount[%s]", fileName.get());
  2115. }
  2116. };
  2117. void checkExpiryTime(IPropertyTree &metaInfo)
  2118. {
  2119. const char *expiryTime = metaInfo.queryProp("expiryTime");
  2120. if (isEmptyString(expiryTime))
  2121. throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
  2122. CDateTime expiryTimeDt;
  2123. try
  2124. {
  2125. expiryTimeDt.setString(expiryTime);
  2126. }
  2127. catch (IException *e)
  2128. {
  2129. e->Release();
  2130. throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
  2131. }
  2132. CDateTime nowDt;
  2133. nowDt.setNow();
  2134. if (nowDt >= expiryTimeDt)
  2135. throw createDafsException(DAFSERR_cmdstream_authexpired, "createRemoteActivity: authorization expired");
  2136. }
  2137. IFileDescriptor *verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2138. {
  2139. if (!authorizedOnly) // if configured false, allows unencrypted meta info
  2140. {
  2141. if (actNode.hasProp("fileName"))
  2142. return nullptr;
  2143. }
  2144. StringBuffer metaInfoB64;
  2145. actNode.getProp("metaInfo", metaInfoB64);
  2146. if (0 == metaInfoB64.length())
  2147. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: missing metaInfo");
  2148. MemoryBuffer compressedMetaInfoMb;
  2149. JBASE64_Decode(metaInfoB64.str(), compressedMetaInfoMb);
  2150. MemoryBuffer decompressedMetaInfoMb;
  2151. fastLZDecompressToBuffer(decompressedMetaInfoMb, compressedMetaInfoMb);
  2152. Owned<IPropertyTree> metaInfoEnvelope = createPTree(decompressedMetaInfoMb);
  2153. Owned<IPropertyTree> metaInfo;
  2154. #if defined(_USE_OPENSSL)
  2155. MemoryBuffer metaInfoBlob;
  2156. metaInfoEnvelope->getPropBin("metaInfoBlob", metaInfoBlob);
  2157. bool isSigned = metaInfoBlob.length() != 0;
  2158. if (authorizedOnly && !isSigned)
  2159. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: unathorized");
  2160. if (isSigned)
  2161. {
  2162. metaInfo.setown(createPTree(metaInfoBlob));
  2163. const char *keyPairName = metaInfo->queryProp("keyPairName");
  2164. StringBuffer metaInfoSignature;
  2165. if (!metaInfoEnvelope->getProp("signature", metaInfoSignature))
  2166. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing signature");
  2167. VStringBuffer keyPairPath("KeyPair[@name=\"%s\"]", keyPairName);
  2168. IPropertyTree *keyPair = keyPairInfo->queryPropTree(keyPairPath);
  2169. if (!keyPair)
  2170. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing key pair definition");
  2171. const char *publicKeyFName = keyPair->queryProp("@publicKey");
  2172. if (isEmptyString(publicKeyFName))
  2173. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing public key definition");
  2174. Owned<CLoadedKey> publicKey = loadPublicKeyFromFile(publicKeyFName, nullptr); // NB: if cared could cache loaded keys
  2175. if (!digiVerify(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *publicKey))
  2176. throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: signature verification failed");
  2177. checkExpiryTime(*metaInfo);
  2178. }
  2179. else
  2180. #endif
  2181. metaInfo.set(metaInfoEnvelope);
  2182. assertex(actNode.hasProp("filePart"));
  2183. unsigned partNum = actNode.getPropInt("filePart");
  2184. assertex(partNum);
  2185. unsigned partCopy = actNode.getPropInt("filePartCopy", 1);
  2186. Owned<IFileDescriptor> fileDesc;
  2187. unsigned metaInfoVersion = metaInfo->getPropInt("version");
  2188. switch (metaInfoVersion)
  2189. {
  2190. case 0:
  2191. // implies unsigned direct request from engines (on unsecure port)
  2192. // fall through
  2193. case 1: // legacy
  2194. {
  2195. IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
  2196. assertex(fileInfo);
  2197. VStringBuffer xpath("Part[%u]/Copy[%u]/@filePath", partNum, partCopy);
  2198. StringBuffer partFileName;
  2199. fileInfo->getProp(xpath, partFileName);
  2200. if (!partFileName.length())
  2201. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: invalid file info");
  2202. actNode.setProp("fileName", partFileName.str());
  2203. break;
  2204. }
  2205. case 2: // serialized compact IFileDescriptor
  2206. {
  2207. IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
  2208. fileDesc.setown(deserializeFileDescriptorTree(fileInfo));
  2209. RemoteFilename rfn;
  2210. fileDesc->getFilename(partNum-1, partCopy-1, rfn);
  2211. StringBuffer path;
  2212. rfn.getLocalPath(path);
  2213. actNode.setProp("fileName", path.str());
  2214. break;
  2215. }
  2216. default:
  2217. throwUnexpected();
  2218. }
  2219. verifyex(actNode.removeProp("metaInfo")); // no longer needed
  2220. return fileDesc.getClear();
  2221. }
  2222. template<class ActivityClass> IRemoteReadActivity *createConditionalProjectingActivity(IPropertyTree &actNode, IFileDescriptor *fileDesc)
  2223. {
  2224. Owned<IRemoteReadActivity> activity = new ActivityClass(actNode, fileDesc);
  2225. if (activity->requiresPostProject())
  2226. return new CRemoteCompoundReadProjectActivity(actNode, activity);
  2227. else
  2228. return activity.getClear();
  2229. }
  2230. IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2231. {
  2232. Owned<IFileDescriptor> fileDesc = verifyMetaInfo(actNode, authorizedOnly, keyPairInfo);
  2233. const char *partFileName = actNode.queryProp("fileName");
  2234. const char *kindStr = actNode.queryProp("kind");
  2235. ThorActivityKind kind = TAKnone;
  2236. if (kindStr)
  2237. {
  2238. if (strieq("diskread", kindStr))
  2239. kind = TAKdiskread;
  2240. if (strieq("csvread", kindStr))
  2241. kind = TAKcsvread;
  2242. else if (strieq("xmlread", kindStr))
  2243. kind = TAKxmlread;
  2244. else if (strieq("jsonread", kindStr))
  2245. kind = TAKjsonread;
  2246. else if (strieq("indexread", kindStr))
  2247. kind = TAKindexread;
  2248. else if (strieq("indexcount", kindStr))
  2249. kind = TAKindexcount;
  2250. else if (strieq("diskwrite", kindStr))
  2251. kind = TAKdiskwrite;
  2252. else if (strieq("indexwrite", kindStr))
  2253. kind = TAKindexwrite;
  2254. // else - auto-detect
  2255. }
  2256. Owned<IRemoteActivity> activity;
  2257. switch (kind)
  2258. {
  2259. case TAKdiskread:
  2260. {
  2261. activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
  2262. break;
  2263. }
  2264. case TAKcsvread:
  2265. {
  2266. activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
  2267. break;
  2268. }
  2269. case TAKxmlread:
  2270. {
  2271. activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
  2272. break;
  2273. }
  2274. case TAKjsonread:
  2275. {
  2276. activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
  2277. break;
  2278. }
  2279. case TAKindexread:
  2280. {
  2281. activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
  2282. break;
  2283. }
  2284. case TAKindexcount:
  2285. {
  2286. activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
  2287. break;
  2288. }
  2289. case TAKdiskwrite:
  2290. {
  2291. activity.setown(new CRemoteDiskWriteActivity(actNode, fileDesc));
  2292. break;
  2293. }
  2294. default: // in absense of type, read is assumed and file format is auto-detected.
  2295. {
  2296. const char *action = actNode.queryProp("action");
  2297. if (isIndexFile(partFileName))
  2298. {
  2299. if (!isEmptyString(action))
  2300. {
  2301. if (streq("count", action))
  2302. activity.setown(new CRemoteIndexCountActivity(actNode, fileDesc));
  2303. else
  2304. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on index '%s'", action, partFileName);
  2305. }
  2306. else
  2307. activity.setown(new CRemoteIndexReadActivity(actNode, fileDesc));
  2308. }
  2309. else
  2310. {
  2311. if (!isEmptyString(action))
  2312. {
  2313. if (streq("count", action))
  2314. throw createDafsException(DAFSERR_cmdstream_protocol_failure, "Remote Disk Counts currently unsupported");
  2315. else
  2316. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on flat file '%s'", action, partFileName);
  2317. }
  2318. else
  2319. {
  2320. const char *kind = queryFileKind(fileDesc);
  2321. if (isEmptyString(kind) || (streq("flat", kind)))
  2322. activity.setown(new CRemoteDiskReadActivity(actNode, fileDesc));
  2323. else if (streq("csv", kind))
  2324. activity.setown(createConditionalProjectingActivity<CRemoteCsvReadActivity>(actNode, fileDesc));
  2325. else if (streq("xml", kind))
  2326. activity.setown(createConditionalProjectingActivity<CRemoteXmlReadActivity>(actNode, fileDesc));
  2327. else if (streq("json", kind))
  2328. activity.setown(createConditionalProjectingActivity<CRemoteJsonReadActivity>(actNode, fileDesc));
  2329. else
  2330. throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown file kind '%s'", kind);
  2331. }
  2332. }
  2333. break;
  2334. }
  2335. }
  2336. return activity.getClear();
  2337. }
  2338. IRemoteActivity *createOutputActivity(IPropertyTree &requestTree, bool authorizedOnly, const IPropertyTree *keyPairInfo)
  2339. {
  2340. IPropertyTree *actNode = requestTree.queryPropTree("node");
  2341. assertex(actNode);
  2342. return createRemoteActivity(*actNode, authorizedOnly, keyPairInfo);
  2343. }
  2344. #define MAX_KEYDATA_SZ 0x10000
  2345. class CRemoteFileServer : implements IRemoteFileServer, public CInterface
  2346. {
  2347. class CThrottler;
  2348. class CRemoteClientHandler : implements ISocketSelectNotify, public CInterface
  2349. {
  2350. bool calledByRowService;
  2351. public:
  2352. CRemoteFileServer *parent;
  2353. Owned<ISocket> socket;
  2354. StringAttr peerName;
  2355. MemoryBuffer msg;
  2356. bool selecthandled;
  2357. size32_t left;
  2358. StructArrayOf<OpenFileInfo> openFiles;
  2359. Owned<IDirectoryIterator> opendir;
  2360. unsigned lasttick, lastInactiveTick;
  2361. atomic_t &globallasttick;
  2362. unsigned previdx; // for debug
  2363. IMPLEMENT_IINTERFACE;
  2364. CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,atomic_t &_globallasttick, bool _calledByRowService)
  2365. : socket(_socket), globallasttick(_globallasttick), calledByRowService(_calledByRowService)
  2366. {
  2367. previdx = (unsigned)-1;
  2368. StringBuffer peerBuf;
  2369. char name[256];
  2370. name[0] = 0;
  2371. int port = socket->peer_name(name,sizeof(name)-1);
  2372. if (port>=0)
  2373. {
  2374. peerBuf.append(name);
  2375. if (port)
  2376. peerBuf.append(':').append(port);
  2377. peerName.set(peerBuf);
  2378. }
  2379. else
  2380. {
  2381. /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
  2382. * May potentially be unavailable for other reasons also.
  2383. * Must be set, as used in client stats HT.
  2384. * If socket closed, the handler will start up but notice closed and quit
  2385. */
  2386. peerName.set("UNKNOWN PEER NAME");
  2387. }
  2388. {
  2389. CriticalBlock block(ClientCountSect);
  2390. if (++ClientCount>MaxClientCount)
  2391. MaxClientCount = ClientCount;
  2392. if (TF_TRACE_CLIENT_CONN)
  2393. {
  2394. StringBuffer s;
  2395. s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
  2396. s.append(peerName);
  2397. PROGLOG("%s", s.str());
  2398. }
  2399. }
  2400. parent = _parent;
  2401. left = 0;
  2402. msg.setEndian(__BIG_ENDIAN);
  2403. selecthandled = false;
  2404. touch();
  2405. }
  2406. ~CRemoteClientHandler()
  2407. {
  2408. {
  2409. CriticalBlock block(ClientCountSect);
  2410. ClientCount--;
  2411. if (TF_TRACE_CLIENT_CONN) {
  2412. PROGLOG("Disconnecting(%p) [%d,%d] ",this,ClientCount,MaxClientCount);
  2413. }
  2414. }
  2415. ISocket *sock = socket.getClear();
  2416. try {
  2417. sock->Release();
  2418. }
  2419. catch (IException *e) {
  2420. EXCLOG(e,"~CRemoteClientHandler");
  2421. e->Release();
  2422. }
  2423. }
  2424. bool isRowServiceClient() const { return calledByRowService; }
  2425. bool notifySelected(ISocket *sock,unsigned selected)
  2426. {
  2427. if (TF_TRACE_FULL)
  2428. PROGLOG("notifySelected(%p)",this);
  2429. if (sock!=socket)
  2430. WARNLOG("notifySelected - invalid socket passed");
  2431. size32_t avail = (size32_t)socket->avail_read();
  2432. if (avail)
  2433. touch();
  2434. else if (left)
  2435. {
  2436. WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
  2437. msg.clear();
  2438. parent->notify(this, msg); // notifying of graceful close
  2439. return false;
  2440. }
  2441. if (left==0)
  2442. {
  2443. try
  2444. {
  2445. left = avail?receiveDaFsBufferSize(socket):0;
  2446. }
  2447. catch (IException *e)
  2448. {
  2449. EXCLOG(e,"notifySelected(1)");
  2450. e->Release();
  2451. left = 0;
  2452. }
  2453. if (left)
  2454. {
  2455. avail = (size32_t)socket->avail_read();
  2456. try
  2457. {
  2458. msg.ensureCapacity(left);
  2459. }
  2460. catch (IException *e)
  2461. {
  2462. EXCLOG(e,"notifySelected(2)");
  2463. e->Release();
  2464. left = 0;
  2465. // if too big then corrupted packet so read avail to try and consume
  2466. char fbuf[1024];
  2467. while (avail)
  2468. {
  2469. size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail;
  2470. try
  2471. {
  2472. socket->read(fbuf, rd); // don't need timeout here
  2473. avail -= rd;
  2474. }
  2475. catch (IException *e)
  2476. {
  2477. EXCLOG(e,"notifySelected(2) flush");
  2478. e->Release();
  2479. break;
  2480. }
  2481. }
  2482. avail = 0;
  2483. left = 0;
  2484. }
  2485. }
  2486. }
  2487. size32_t toread = left>avail?avail:left;
  2488. if (toread)
  2489. {
  2490. try
  2491. {
  2492. socket->read(msg.reserve(toread), toread); // don't need timeout here
  2493. }
  2494. catch (IException *e)
  2495. {
  2496. EXCLOG(e,"notifySelected(3)");
  2497. e->Release();
  2498. toread = left;
  2499. msg.clear();
  2500. }
  2501. }
  2502. if (TF_TRACE_FULL)
  2503. PROGLOG("notifySelected %d,%d",toread,left);
  2504. left -= toread;
  2505. if (left==0)
  2506. {
  2507. // DEBUG
  2508. parent->notify(this, msg); // consumes msg
  2509. }
  2510. return false;
  2511. }
  2512. void logPrevHandle()
  2513. {
  2514. if (previdx<openFiles.ordinality())
  2515. {
  2516. const OpenFileInfo &fileInfo = openFiles.item(previdx);
  2517. PROGLOG("Previous handle(%d): %s", fileInfo.handle, fileInfo.filename->text.get());
  2518. }
  2519. }
  2520. bool throttleCommand(MemoryBuffer &msg)
  2521. {
  2522. RemoteFileCommandType cmd = RFCunknown;
  2523. Owned<IException> e;
  2524. try
  2525. {
  2526. msg.read(cmd);
  2527. parent->throttleCommand(cmd, msg, this);
  2528. return true;
  2529. }
  2530. catch (IException *_e)
  2531. {
  2532. e.setown(_e);
  2533. }
  2534. /* processCommand() will handle most exception and replies,
  2535. * but if throttleCommand fails before it gets that far, this will handle
  2536. */
  2537. MemoryBuffer reply;
  2538. initSendBuffer(reply);
  2539. unsigned err = (cmd == RFCopenIO) ? RFSERR_OpenFailed : 0;
  2540. parent->formatException(reply, e, cmd, false, err, this);
  2541. sendDaFsBuffer(socket, reply);
  2542. return false;
  2543. }
  2544. void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
  2545. {
  2546. MemoryBuffer reply;
  2547. bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
  2548. sendDaFsBuffer(socket, reply, testSocketFlag);
  2549. }
  2550. bool immediateCommand() // returns false if socket closed or failure
  2551. {
  2552. MemoryBuffer msg;
  2553. msg.setEndian(__BIG_ENDIAN);
  2554. touch();
  2555. size32_t avail = (size32_t)socket->avail_read();
  2556. if (avail==0)
  2557. return false;
  2558. receiveDaFsBuffer(socket, msg, 5); // shouldn't timeout as data is available
  2559. touch();
  2560. if (msg.length()==0)
  2561. return false;
  2562. return throttleCommand(msg);
  2563. }
  2564. void process(MemoryBuffer &msg)
  2565. {
  2566. if (selecthandled)
  2567. throttleCommand(msg);
  2568. else
  2569. {
  2570. // msg only used/filled if process() has been triggered by notify()
  2571. while (parent->threadRunningCount()<=parent->targetActiveThreads) // if too many threads add to select handler
  2572. {
  2573. int w;
  2574. try
  2575. {
  2576. w = socket->wait_read(1000);
  2577. }
  2578. catch (IException *e)
  2579. {
  2580. EXCLOG(e, "CRemoteClientHandler::main wait_read error");
  2581. e->Release();
  2582. parent->onCloseSocket(this,1);
  2583. return;
  2584. }
  2585. if (w==0)
  2586. break;
  2587. if ((w<0)||!immediateCommand())
  2588. {
  2589. if (w<0)
  2590. WARNLOG("CRemoteClientHandler::main wait_read error");
  2591. parent->onCloseSocket(this,1);
  2592. return;
  2593. }
  2594. }
  2595. /* This is a bit confusing..
  2596. * The addClient below, adds this request to a selecthandler handled by another thread
  2597. * and passes ownership of 'this' (CRemoteClientHandler)
  2598. *
  2599. * When notified, the selecthandler will launch a new pool thread to handle the request
  2600. * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
  2601. *
  2602. * Either way, a thread pool slot is occupied when processing a request.
  2603. * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
  2604. * IOW, if there are lots of incoming clients that can't be serviced by the CThrottler limit,
  2605. * a large number of pool threads will build up after a while.
  2606. *
  2607. * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
  2608. * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
  2609. * in this additional layer of throttling..
  2610. */
  2611. selecthandled = true;
  2612. parent->addClient(this); // add to select handler
  2613. // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
  2614. }
  2615. }
  2616. bool timedOut()
  2617. {
  2618. return (msTick()-lasttick)>CLIENT_TIMEOUT;
  2619. }
  2620. bool inactiveTimedOut()
  2621. {
  2622. unsigned ms = msTick();
  2623. if ((ms-lastInactiveTick)>CLIENT_INACTIVEWARNING_TIMEOUT)
  2624. {
  2625. lastInactiveTick = ms;
  2626. return true;
  2627. }
  2628. return false;
  2629. }
  2630. void touch()
  2631. {
  2632. lastInactiveTick = lasttick = msTick();
  2633. atomic_set(&globallasttick,lasttick);
  2634. }
  2635. const char *queryPeerName()
  2636. {
  2637. return peerName;
  2638. }
  2639. bool getInfo(StringBuffer &str)
  2640. {
  2641. str.append("client(");
  2642. const char *name = queryPeerName();
  2643. bool ok;
  2644. if (name)
  2645. {
  2646. ok = true;
  2647. str.append(name);
  2648. }
  2649. else
  2650. ok = false;
  2651. unsigned ms = msTick();
  2652. str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
  2653. ForEachItemIn(i, openFiles)
  2654. {
  2655. const OpenFileInfo &fileInfo = openFiles.item(i);
  2656. str.appendf("\n %d: ", fileInfo.handle);
  2657. str.append(fileInfo.filename->text.get());
  2658. }
  2659. return ok;
  2660. }
  2661. };
  2662. class CThrottleQueueItem : public CSimpleInterface
  2663. {
  2664. public:
  2665. RemoteFileCommandType cmd;
  2666. Linked<CRemoteClientHandler> client;
  2667. MemoryBuffer msg;
  2668. CCycleTimer timer;
  2669. CThrottleQueueItem(RemoteFileCommandType _cmd, MemoryBuffer &_msg, CRemoteClientHandler *_client) : cmd(_cmd), client(_client)
  2670. {
  2671. msg.swapWith(_msg);
  2672. }
  2673. };
  2674. class CThrottler
  2675. {
  2676. Semaphore sem;
  2677. CriticalSection crit, configureCrit;
  2678. StringAttr title;
  2679. unsigned limit, delayMs, cpuThreshold, queueLimit;
  2680. unsigned disabledLimit;
  2681. unsigned __int64 totalThrottleDelay;
  2682. CCycleTimer totalThrottleDelayTimer;
  2683. QueueOf<CThrottleQueueItem, false> queue;
  2684. unsigned statsIntervalSecs;
  2685. public:
  2686. CThrottler(const char *_title) : title(_title)
  2687. {
  2688. totalThrottleDelay = 0;
  2689. limit = 0;
  2690. delayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
  2691. cpuThreshold = DEFAULT_STDCMD_THROTTLECPULIMIT;
  2692. disabledLimit = 0;
  2693. queueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
  2694. statsIntervalSecs = DEFAULT_STDCMD_THROTTLECPULIMIT;
  2695. }
  2696. ~CThrottler()
  2697. {
  2698. for (;;)
  2699. {
  2700. Owned<CThrottleQueueItem> item = queue.dequeue();
  2701. if (!item)
  2702. break;
  2703. }
  2704. }
  2705. unsigned queryLimit() const { return limit; }
  2706. unsigned queryDelayMs() const { return delayMs; };;
  2707. unsigned queryCpuThreshold() const { return cpuThreshold; }
  2708. unsigned queryQueueLimit() const { return queueLimit; }
  2709. StringBuffer &getInfoSummary(StringBuffer &info)
  2710. {
  2711. info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
  2712. unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
  2713. time_t simple;
  2714. time(&simple);
  2715. simple -= elapsedSecs;
  2716. CDateTime dt;
  2717. dt.set(simple);
  2718. StringBuffer dateStr;
  2719. dt.getTimeString(dateStr, true);
  2720. info.appendf("Throttler(%s): statistics since %s", title.get(), dateStr.str()).newline();
  2721. info.appendf("Total delay of %0.2f seconds", ((double)totalThrottleDelay)/1000).newline();
  2722. info.appendf("Requests currently queued: %u", queue.ordinality());
  2723. return info;
  2724. }
  2725. void getInfo(StringBuffer &info)
  2726. {
  2727. CriticalBlock b(crit);
  2728. getInfoSummary(info).newline();
  2729. }
  2730. void configure(unsigned _limit, unsigned _delayMs, unsigned _cpuThreshold, unsigned _queueLimit)
  2731. {
  2732. if (_limit > THROTTLE_MAX_LIMIT || _delayMs > THROTTLE_MAX_DELAYMS || _cpuThreshold > THROTTLE_MAX_CPUTHRESHOLD || _queueLimit > THROTTLE_MAX_QUEUELIMIT)
  2733. throw MakeStringException(0, "Throttler(%s), rejecting configure command: limit=%u (max permitted=%u), delayMs=%u (max permitted=%u), cpuThreshold=%u (max permitted=%u), queueLimit=%u (max permitted=%u)",
  2734. title.str(), _limit, THROTTLE_MAX_LIMIT, _delayMs, THROTTLE_MAX_DELAYMS, _cpuThreshold,
  2735. THROTTLE_MAX_CPUTHRESHOLD, _queueLimit, THROTTLE_MAX_QUEUELIMIT);
  2736. CriticalBlock b(configureCrit);
  2737. int delta = 0;
  2738. if (_limit)
  2739. {
  2740. if (disabledLimit) // if transitioning from disabled to some throttling
  2741. {
  2742. assertex(0 == limit);
  2743. delta = _limit - disabledLimit; // + or -
  2744. disabledLimit = 0;
  2745. }
  2746. else
  2747. delta = _limit - limit; // + or -
  2748. }
  2749. else if (0 == disabledLimit)
  2750. {
  2751. PROGLOG("Throttler(%s): disabled, previous limit: %u", title.get(), limit);
  2752. /* disabling - set limit immediately to let all new transaction through.
  2753. * NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
  2754. * Instead the existing 'limit' is kept in 'disabledLimit', so that if/when throttling is
  2755. * re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
  2756. */
  2757. disabledLimit = limit;
  2758. limit = 0;
  2759. }
  2760. if (delta > 0)
  2761. {
  2762. PROGLOG("Throttler(%s): Increasing limit from %u to %u", title.get(), limit, _limit);
  2763. sem.signal(delta);
  2764. limit = _limit;
  2765. // NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
  2766. }
  2767. else if (delta < 0)
  2768. {
  2769. PROGLOG("Throttler(%s): Reducing limit from %u to %u", title.get(), limit, _limit);
  2770. // NB: This is not expected to take long
  2771. CCycleTimer timer;
  2772. while (delta < 0)
  2773. {
  2774. if (sem.wait(1000))
  2775. ++delta;
  2776. else
  2777. PROGLOG("Throttler(%s): Waited %0.2f seconds so far for up to a maximum of %u (previous limit) transactions to complete, %u completed", title.get(), ((double)timer.elapsedMs())/1000, limit, -delta);
  2778. }
  2779. limit = _limit;
  2780. // NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
  2781. }
  2782. if (_delayMs != delayMs)
  2783. {
  2784. PROGLOG("Throttler(%s): New delayMs=%u, previous: %u", title.get(), _delayMs, delayMs);
  2785. delayMs = _delayMs;
  2786. }
  2787. if (_cpuThreshold != cpuThreshold)
  2788. {
  2789. PROGLOG("Throttler(%s): New cpuThreshold=%u, previous: %u", title.get(), _cpuThreshold, cpuThreshold);
  2790. cpuThreshold = _cpuThreshold;
  2791. }
  2792. if (((unsigned)-1) != _queueLimit && _queueLimit != queueLimit)
  2793. {
  2794. PROGLOG("Throttler(%s): New queueLimit=%u%s, previous: %u", title.get(), _queueLimit, 0==_queueLimit?"(disabled)":"", queueLimit);
  2795. queueLimit = _queueLimit;
  2796. }
  2797. }
  2798. void setStatsInterval(unsigned _statsIntervalSecs)
  2799. {
  2800. if (_statsIntervalSecs != statsIntervalSecs)
  2801. {
  2802. PROGLOG("Throttler(%s): New statsIntervalSecs=%u, previous: %u", title.get(), _statsIntervalSecs, statsIntervalSecs);
  2803. statsIntervalSecs = _statsIntervalSecs;
  2804. }
  2805. }
  2806. void take(RemoteFileCommandType cmd) // cmd for info. only
  2807. {
  2808. for (;;)
  2809. {
  2810. if (sem.wait(delayMs))
  2811. return;
  2812. PROGLOG("Throttler(%s): transaction delayed [cmd=%s]", title.get(), getRFCText(cmd));
  2813. }
  2814. }
  2815. void release()
  2816. {
  2817. sem.signal();
  2818. }
  2819. StringBuffer &getStats(StringBuffer &stats, bool reset)
  2820. {
  2821. CriticalBlock b(crit);
  2822. getInfoSummary(stats);
  2823. if (reset)
  2824. {
  2825. totalThrottleDelayTimer.reset();
  2826. totalThrottleDelay = 0;
  2827. }
  2828. return stats;
  2829. }
  2830. void addCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  2831. {
  2832. CCycleTimer timer;
  2833. Owned<IException> exception;
  2834. bool hadSem = true;
  2835. if (!sem.wait(delayMs))
  2836. {
  2837. CriticalBlock b(crit);
  2838. if (!sem.wait(0)) // check hasn't become available
  2839. {
  2840. unsigned cpu = getLatestCPUUsage();
  2841. if (getLatestCPUUsage()<cpuThreshold)
  2842. {
  2843. /* Allow to proceed, despite hitting throttle limit because CPU < threshold
  2844. * NB: The overall number of threads is still capped by the thread pool.
  2845. */
  2846. unsigned ms = timer.elapsedMs();
  2847. totalThrottleDelay += ms;
  2848. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
  2849. hadSem = false;
  2850. }
  2851. else
  2852. {
  2853. if (queueLimit && queue.ordinality()>=queueLimit)
  2854. throw MakeStringException(0, "Throttler(%s), the maxiumum number of items are queued (%u), rejecting new command[%s]", title.str(), queue.ordinality(), getRFCText(cmd));
  2855. queue.enqueue(new CThrottleQueueItem(cmd, msg, client)); // NB: takes over ownership of 'client' from running thread
  2856. PROGLOG("Throttler(%s): transaction delayed [cmd=%s], queuing (%u queueud), [client=%p, sock=%u]", title.get(), getRFCText(cmd), queue.ordinality(), client, client->socket->OShandle());
  2857. return;
  2858. }
  2859. }
  2860. }
  2861. /* Guarantee that sem is released.
  2862. * Should normally release on clean exit when queue is empty.
  2863. */
  2864. struct ReleaseSem
  2865. {
  2866. Semaphore *sem;
  2867. ReleaseSem(Semaphore *_sem) { sem = _sem; }
  2868. ~ReleaseSem() { if (sem) sem->signal(); }
  2869. } releaseSem(hadSem?&sem:NULL);
  2870. /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
  2871. * queued items. NB: other threads that are finishing will do also.
  2872. * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
  2873. * Throttle slot (semaphore) is only given back when no more to do.
  2874. */
  2875. Linked<CRemoteClientHandler> currentClient;
  2876. MemoryBuffer currentMsg;
  2877. unsigned ms;
  2878. for (;;)
  2879. {
  2880. RemoteFileCommandType currentCmd;
  2881. {
  2882. CriticalBlock b(crit);
  2883. Owned<CThrottleQueueItem> item = queue.dequeue();
  2884. if (item)
  2885. {
  2886. currentCmd = item->cmd;
  2887. currentClient.setown(item->client.getClear());
  2888. currentMsg.swapWith(item->msg);
  2889. ms = item->timer.elapsedMs();
  2890. }
  2891. else
  2892. {
  2893. if (NULL == client) // previously handled and queue empty
  2894. {
  2895. /* Commands are only queued if semaphore is exhaused (checked inside crit)
  2896. * so only signal the semaphore inside the crit, after checking if there are no queued items
  2897. */
  2898. if (hadSem)
  2899. {
  2900. releaseSem.sem = NULL;
  2901. sem.signal();
  2902. }
  2903. break;
  2904. }
  2905. currentCmd = cmd;
  2906. currentClient.set(client); // process current request after dealing with queue
  2907. currentMsg.swapWith(msg);
  2908. ms = timer.elapsedMs();
  2909. client = NULL;
  2910. }
  2911. }
  2912. if (ms >= 1000)
  2913. {
  2914. if (ms>delayMs)
  2915. PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u seconds", title.get(), getRFCText(currentCmd), ms/1000);
  2916. }
  2917. {
  2918. CriticalBlock b(crit);
  2919. totalThrottleDelay += ms;
  2920. }
  2921. try
  2922. {
  2923. currentClient->processCommand(currentCmd, currentMsg, this);
  2924. }
  2925. catch (IException *e)
  2926. {
  2927. EXCLOG(e, "addCommand: processCommand failed");
  2928. e->Release();
  2929. }
  2930. }
  2931. }
  2932. };
  2933. // temporarily release a throttler slot
  2934. class CThrottleReleaseBlock
  2935. {
  2936. CThrottler &throttler;
  2937. RemoteFileCommandType cmd;
  2938. public:
  2939. CThrottleReleaseBlock(CThrottler &_throttler, RemoteFileCommandType _cmd) : throttler(_throttler), cmd(_cmd)
  2940. {
  2941. throttler.release();
  2942. }
  2943. ~CThrottleReleaseBlock()
  2944. {
  2945. throttler.take(cmd);
  2946. }
  2947. };
  2948. int lasthandle;
  2949. CriticalSection sect;
  2950. Owned<ISocket> acceptsock;
  2951. Owned<ISocket> securesock;
  2952. Owned<ISocket> rowServiceSock;
  2953. bool rowServiceOnStdPort = true; // should row service commands be processed on std. service port
  2954. bool rowServiceSSL = false;
  2955. Owned<ISocketSelectHandler> selecthandler;
  2956. Owned<IThreadPool> threads; // for commands
  2957. bool stopping;
  2958. unsigned clientcounttick;
  2959. unsigned closedclients;
  2960. CAsyncCommandManager asyncCommandManager;
  2961. CThrottler stdCmdThrottler, slowCmdThrottler;
  2962. CClientStatsTable clientStatsTable;
  2963. atomic_t globallasttick;
  2964. unsigned targetActiveThreads;
  2965. Linked<IPropertyTree> keyPairInfo;
  2966. int getNextHandle()
  2967. {
  2968. // called in sect critical block
  2969. for (;;) {
  2970. if (lasthandle==INT_MAX)
  2971. lasthandle = 1;
  2972. else
  2973. lasthandle++;
  2974. unsigned idx1;
  2975. unsigned idx2;
  2976. if (!findHandle(lasthandle,idx1,idx2))
  2977. return lasthandle;
  2978. }
  2979. }
  2980. bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx)
  2981. {
  2982. // called in sect critical block
  2983. clientidx = (unsigned)-1;
  2984. handleidx = (unsigned)-1;
  2985. ForEachItemIn(i,clients) {
  2986. CRemoteClientHandler &client = clients.item(i);
  2987. ForEachItemIn(j, client.openFiles)
  2988. {
  2989. if (client.openFiles.item(j).handle==handle)
  2990. {
  2991. handleidx = j;
  2992. clientidx = i;
  2993. return true;
  2994. }
  2995. }
  2996. }
  2997. return false;
  2998. }
  2999. unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply, bool &maxHit)
  3000. {
  3001. DelayedSizeMarker keyDataSzReturned(reply);
  3002. unsigned numRecs = 0;
  3003. maxHit = false;
  3004. unsigned pos = reply.length();
  3005. while (keyManager->lookup(true))
  3006. {
  3007. unsigned size = keyManager->queryRowSize();
  3008. const byte *result = keyManager->queryKeyBuffer();
  3009. reply.append(size);
  3010. reply.append(size, result);
  3011. ++numRecs;
  3012. if (maxRecs && (0 == --maxRecs))
  3013. {
  3014. maxHit = true;
  3015. break;
  3016. }
  3017. if (reply.length()-pos >= MAX_KEYDATA_SZ)
  3018. {
  3019. maxHit = true;
  3020. break;
  3021. }
  3022. }
  3023. keyDataSzReturned.write();
  3024. return numRecs;
  3025. }
  3026. class cCommandProcessor: public CInterface, implements IPooledThread
  3027. {
  3028. Owned<CRemoteClientHandler> client;
  3029. MemoryBuffer msg;
  3030. public:
  3031. IMPLEMENT_IINTERFACE;
  3032. struct cCommandProcessorParams
  3033. {
  3034. cCommandProcessorParams() { msg.setEndian(__BIG_ENDIAN); }
  3035. CRemoteClientHandler *client;
  3036. MemoryBuffer msg;
  3037. };
  3038. virtual void init(void *_params) override
  3039. {
  3040. cCommandProcessorParams &params = *(cCommandProcessorParams *)_params;
  3041. client.setown(params.client);
  3042. msg.swapWith(params.msg);
  3043. }
  3044. virtual void threadmain() override
  3045. {
  3046. // idea is that initially we process commands inline then pass over to select handler
  3047. try
  3048. {
  3049. client->process(msg);
  3050. }
  3051. catch (IException *e)
  3052. {
  3053. // suppress some errors
  3054. EXCLOG(e,"cCommandProcessor::threadmain");
  3055. e->Release();
  3056. }
  3057. try
  3058. {
  3059. client.clear();
  3060. }
  3061. catch (IException *e)
  3062. {
  3063. // suppress some more errors clearing client
  3064. EXCLOG(e,"cCommandProcessor::threadmain(2)");
  3065. e->Release();
  3066. }
  3067. }
  3068. virtual bool stop() override
  3069. {
  3070. return true;
  3071. }
  3072. virtual bool canReuse() const override
  3073. {
  3074. return false; // want to free owned socket
  3075. }
  3076. };
  3077. IArrayOf<CRemoteClientHandler> clients;
  3078. void validateSSLSetup()
  3079. {
  3080. if (!securitySettings.certificate)
  3081. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
  3082. if (!checkFileExists(securitySettings.certificate))
  3083. throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
  3084. if (!securitySettings.privateKey)
  3085. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
  3086. if (!checkFileExists(securitySettings.privateKey))
  3087. throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
  3088. }
  3089. public:
  3090. IMPLEMENT_IINTERFACE
  3091. CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *_keyPairInfo)
  3092. : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter"), keyPairInfo(_keyPairInfo)
  3093. {
  3094. lasthandle = 0;
  3095. selecthandler.setown(createSocketSelectHandler(NULL));
  3096. stdCmdThrottler.configure(DEFAULT_STDCMD_PARALLELREQUESTLIMIT, DEFAULT_STDCMD_THROTTLEDELAYMS, DEFAULT_STDCMD_THROTTLECPULIMIT, DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
  3097. slowCmdThrottler.configure(DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT, DEFAULT_SLOWCMD_THROTTLEDELAYMS, DEFAULT_SLOWCMD_THROTTLECPULIMIT, DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
  3098. unsigned targetMinThreads=maxThreads*20/100; // 20%
  3099. if (0 == targetMinThreads) targetMinThreads = 1;
  3100. targetActiveThreads=maxThreads*80/100; // 80%
  3101. if (0 == targetActiveThreads) targetActiveThreads = 1;
  3102. class CCommandFactory : public CSimpleInterfaceOf<IThreadFactory>
  3103. {
  3104. CRemoteFileServer &parent;
  3105. public:
  3106. CCommandFactory(CRemoteFileServer &_parent) : parent(_parent) { }
  3107. virtual IPooledThread *createNew()
  3108. {
  3109. return parent.createCommandProcessor();
  3110. }
  3111. };
  3112. Owned<IThreadFactory> factory = new CCommandFactory(*this); // NB: pool links factory, so takes ownership
  3113. threads.setown(createThreadPool("CRemoteFileServerPool", factory, NULL, maxThreads, maxThreadsDelayMs,
  3114. #ifdef __64BIT__
  3115. 0, // Unlimited stack size
  3116. #else
  3117. 0x10000,
  3118. #endif
  3119. INFINITE,targetMinThreads));
  3120. threads->setStartDelayTracing(60); // trace amount delayed every minute.
  3121. PROGLOG("CRemoteFileServer: maxThreads = %u, maxThreadsDelayMs = %u, maxAsyncCopy = %u", maxThreads, maxThreadsDelayMs, maxAsyncCopy);
  3122. stopping = false;
  3123. clientcounttick = msTick();
  3124. closedclients = 0;
  3125. atomic_set(&globallasttick,msTick());
  3126. }
  3127. ~CRemoteFileServer()
  3128. {
  3129. #ifdef _DEBUG
  3130. PROGLOG("Exiting CRemoteFileServer");
  3131. #endif
  3132. asyncCommandManager.join();
  3133. clients.kill();
  3134. #ifdef _DEBUG
  3135. PROGLOG("Exited CRemoteFileServer");
  3136. #endif
  3137. }
  3138. bool lookupFileIOHandle(int handle, OpenFileInfo &fileInfo, unsigned newFlags=0)
  3139. {
  3140. if (handle<=0)
  3141. return false;
  3142. CriticalBlock block(sect);
  3143. unsigned clientidx;
  3144. unsigned handleidx;
  3145. if (!findHandle(handle,clientidx,handleidx))
  3146. return false;
  3147. CRemoteClientHandler &client = clients.item(clientidx);
  3148. OpenFileInfo &openFileInfo = client.openFiles.element(handleidx); // NB: links members
  3149. openFileInfo.flags |= newFlags;
  3150. fileInfo = openFileInfo;
  3151. client.previdx = handleidx;
  3152. return true;
  3153. }
  3154. //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
  3155. // should throw a different exception
  3156. bool checkFileIOHandle(int handle, IFileIO *&fileio, bool del=false)
  3157. {
  3158. fileio = NULL;
  3159. if (handle<=0)
  3160. return false;
  3161. CriticalBlock block(sect);
  3162. unsigned clientidx;
  3163. unsigned handleidx;
  3164. if (findHandle(handle,clientidx,handleidx))
  3165. {
  3166. CRemoteClientHandler &client = clients.item(clientidx);
  3167. const OpenFileInfo &fileInfo = client.openFiles.item(handleidx);
  3168. if (del)
  3169. {
  3170. if (fileInfo.flags & of_key)
  3171. clearKeyStoreCacheEntry(fileInfo.fileIO);
  3172. client.openFiles.remove(handleidx);
  3173. client.previdx = (unsigned)-1;
  3174. }
  3175. else
  3176. {
  3177. fileio = client.openFiles.item(handleidx).fileIO;
  3178. client.previdx = handleidx;
  3179. }
  3180. return true;
  3181. }
  3182. return false;
  3183. }
  3184. void checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
  3185. {
  3186. if (!checkFileIOHandle(handle, fileio, del))
  3187. throw createDafsException(RFSERR_InvalidFileIOHandle, nullptr);
  3188. }
  3189. void onCloseSocket(CRemoteClientHandler *client, int which)
  3190. {
  3191. if (!client)
  3192. return;
  3193. CriticalBlock block(sect);
  3194. #ifdef _DEBUG
  3195. StringBuffer s(client->queryPeerName());
  3196. PROGLOG("onCloseSocket(%d) %s",which,s.str());
  3197. #endif
  3198. if (client->socket)
  3199. {
  3200. try
  3201. {
  3202. /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
  3203. * It has not been added to the selecthandler
  3204. * Harmless, but wasteful if so.
  3205. */
  3206. selecthandler->remove(client->socket);
  3207. }
  3208. catch (IException *e) {
  3209. EXCLOG(e,"CRemoteFileServer::onCloseSocket.1");
  3210. e->Release();
  3211. }
  3212. }
  3213. try {
  3214. clients.zap(*client);
  3215. }
  3216. catch (IException *e) {
  3217. EXCLOG(e,"CRemoteFileServer::onCloseSocket.2");
  3218. e->Release();
  3219. }
  3220. }
  3221. bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3222. {
  3223. Owned<StringAttrItem> name = new StringAttrItem;
  3224. byte mode;
  3225. byte share;
  3226. msg.read(name->text).read(mode).read(share);
  3227. // also try to recv extra byte
  3228. byte extra = 0;
  3229. unsigned short sMode = IFUnone;
  3230. unsigned short cFlags = IFUnone;
  3231. if (msg.remaining() >= sizeof(byte))
  3232. {
  3233. msg.read(extra);
  3234. // and then try to recv extra sMode, cFlags (always sent together)
  3235. if (msg.remaining() >= (sizeof(sMode) + sizeof(cFlags)))
  3236. msg.read(sMode).read(cFlags);
  3237. }
  3238. IFEflags extraFlags = (IFEflags)extra;
  3239. // none => nocache for remote (hint)
  3240. // can revert to previous behavior with conf file setting "allow_pgcache_flush=false"
  3241. if (extraFlags == IFEnone)
  3242. extraFlags = IFEnocache;
  3243. Owned<IFile> file = createIFile(name->text);
  3244. switch ((compatIFSHmode)share) {
  3245. case compatIFSHnone:
  3246. file->setCreateFlags(S_IRUSR|S_IWUSR);
  3247. file->setShareMode(IFSHnone);
  3248. break;
  3249. case compatIFSHread:
  3250. file->setShareMode(IFSHread);
  3251. break;
  3252. case compatIFSHwrite:
  3253. file->setShareMode(IFSHfull);
  3254. break;
  3255. case compatIFSHexec:
  3256. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
  3257. break;
  3258. case compatIFSHall:
  3259. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive
  3260. file->setShareMode(IFSHfull);
  3261. break;
  3262. }
  3263. // use sMode, cFlags if sent
  3264. if (sMode != IFUnone && cFlags != IFUnone)
  3265. {
  3266. file->setCreateFlags(cFlags);
  3267. file->setShareMode((IFSHmode)sMode);
  3268. }
  3269. if (TF_TRACE_PRE_IO)
  3270. PROGLOG("before open file '%s', (%d,%d,%d,%d,0%o)",name->text.get(),(int)mode,(int)share,extraFlags,sMode,cFlags);
  3271. Owned<IFileIO> fileio = file->open((IFOmode)mode,extraFlags);
  3272. int handle;
  3273. if (fileio)
  3274. {
  3275. CriticalBlock block(sect);
  3276. handle = getNextHandle();
  3277. client.previdx = client.openFiles.ordinality();
  3278. client.openFiles.append(OpenFileInfo(handle, fileio, name));
  3279. }
  3280. else
  3281. handle = 0;
  3282. reply.append(RFEnoerror);
  3283. reply.append(handle);
  3284. if (TF_TRACE)
  3285. PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle);
  3286. return true;
  3287. }
  3288. bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply)
  3289. {
  3290. int handle;
  3291. msg.read(handle);
  3292. IFileIO *fileio;
  3293. checkFileIOHandle(reply, handle, fileio, true);
  3294. if (TF_TRACE)
  3295. PROGLOG("close file, handle = %d",handle);
  3296. reply.append(RFEnoerror);
  3297. return true;
  3298. }
  3299. void cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  3300. {
  3301. int handle;
  3302. __int64 pos;
  3303. size32_t len;
  3304. msg.read(handle).read(pos).read(len);
  3305. IFileIO *fileio;
  3306. checkFileIOHandle(reply, handle, fileio);
  3307. //arrange it so we read directly into the reply buffer...
  3308. unsigned posOfErr = reply.length();
  3309. reply.append((unsigned)RFEnoerror);
  3310. size32_t numRead;
  3311. unsigned posOfLength = reply.length();
  3312. if (TF_TRACE_PRE_IO)
  3313. PROGLOG("before read file, handle = %d, toread = %d",handle,len);
  3314. reply.reserve(sizeof(numRead));
  3315. void *data = reply.reserve(len);
  3316. numRead = fileio->read(pos,len,data);
  3317. stats.addRead(len);
  3318. if (TF_TRACE)
  3319. PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
  3320. reply.setLength(posOfLength + sizeof(numRead) + numRead);
  3321. reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead);
  3322. }
  3323. void cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3324. {
  3325. int handle;
  3326. msg.read(handle);
  3327. IFileIO *fileio;
  3328. checkFileIOHandle(reply, handle, fileio);
  3329. __int64 size = fileio->size();
  3330. reply.append((unsigned)RFEnoerror).append(size);
  3331. if (TF_TRACE)
  3332. PROGLOG("size file, handle = %d, size = %" I64F "d",handle,size);
  3333. }
  3334. void cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3335. {
  3336. int handle;
  3337. offset_t size;
  3338. msg.read(handle).read(size);
  3339. IFileIO *fileio;
  3340. if (TF_TRACE)
  3341. PROGLOG("set size file, handle = %d, size = %" I64F "d",handle,size);
  3342. checkFileIOHandle(reply, handle, fileio);
  3343. fileio->setSize(size);
  3344. reply.append((unsigned)RFEnoerror);
  3345. }
  3346. void cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
  3347. {
  3348. int handle;
  3349. __int64 pos;
  3350. size32_t len;
  3351. msg.read(handle).read(pos).read(len);
  3352. IFileIO *fileio;
  3353. checkFileIOHandle(reply, handle, fileio);
  3354. const byte *data = (const byte *)msg.readDirect(len);
  3355. if (TF_TRACE_PRE_IO)
  3356. PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
  3357. size32_t numWritten = fileio->write(pos,len,data);
  3358. stats.addWrite(numWritten);
  3359. if (TF_TRACE)
  3360. PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
  3361. reply.append((unsigned)RFEnoerror).append(numWritten);
  3362. }
  3363. void cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3364. {
  3365. StringAttr name;
  3366. msg.read(name);
  3367. if (TF_TRACE)
  3368. PROGLOG("exists, '%s'",name.get());
  3369. Owned<IFile> file=createIFile(name);
  3370. bool e = file->exists();
  3371. reply.append((unsigned)RFEnoerror).append(e);
  3372. }
  3373. void cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3374. {
  3375. StringAttr name;
  3376. msg.read(name);
  3377. if (TF_TRACE)
  3378. PROGLOG("remove, '%s'",name.get());
  3379. Owned<IFile> file=createIFile(name);
  3380. bool e = file->remove();
  3381. reply.append((unsigned)RFEnoerror).append(e);
  3382. }
  3383. void cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply)
  3384. {
  3385. if (TF_TRACE)
  3386. PROGLOG("getVer");
  3387. /* weird backward compatibility convention,
  3388. * newer clients will send another unsigned to denote
  3389. * and result in the numeric DAFILESRV_VERSION being returned
  3390. * Ancient clients will get back the string form only (SERVER_VERSTRING)
  3391. */
  3392. if (msg.getPos()+sizeof(unsigned)>msg.length())
  3393. reply.append((unsigned)RFEnoerror);
  3394. else
  3395. reply.append((unsigned)DAFILESRV_VERSION+0x10000);
  3396. reply.append(DAFILESRV_VERSIONSTRING);
  3397. }
  3398. void cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3399. {
  3400. StringAttr fromname;
  3401. msg.read(fromname);
  3402. StringAttr toname;
  3403. msg.read(toname);
  3404. if (TF_TRACE)
  3405. PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get());
  3406. Owned<IFile> file=createIFile(fromname);
  3407. file->rename(toname);
  3408. reply.append((unsigned)RFEnoerror);
  3409. }
  3410. void cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3411. {
  3412. StringAttr fromname;
  3413. msg.read(fromname);
  3414. StringAttr toname;
  3415. msg.read(toname);
  3416. if (TF_TRACE)
  3417. PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get());
  3418. Owned<IFile> file=createIFile(fromname);
  3419. file->move(toname);
  3420. reply.append((unsigned)RFEnoerror);
  3421. }
  3422. void cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3423. {
  3424. StringAttr fromname;
  3425. msg.read(fromname);
  3426. StringAttr toname;
  3427. msg.read(toname);
  3428. if (TF_TRACE)
  3429. PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get());
  3430. copyFile(toname, fromname);
  3431. reply.append((unsigned)RFEnoerror);
  3432. }
  3433. void cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
  3434. {
  3435. int handle;
  3436. __int64 pos;
  3437. __int64 len;
  3438. StringAttr srcname;
  3439. msg.read(handle).read(srcname).read(pos).read(len);
  3440. IFileIO *fileio;
  3441. checkFileIOHandle(reply, handle, fileio);
  3442. Owned<IFile> file = createIFile(srcname.get());
  3443. __int64 written = fileio->appendFile(file,pos,len);
  3444. stats.addWrite(written);
  3445. if (TF_TRACE)
  3446. PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
  3447. reply.append((unsigned)RFEnoerror).append(written);
  3448. }
  3449. void cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3450. {
  3451. StringAttr name;
  3452. msg.read(name);
  3453. if (TF_TRACE)
  3454. PROGLOG("isFile, '%s'",name.get());
  3455. Owned<IFile> file=createIFile(name);
  3456. unsigned ret = (unsigned)file->isFile();
  3457. reply.append((unsigned)RFEnoerror).append(ret);
  3458. }
  3459. void cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3460. {
  3461. StringAttr name;
  3462. msg.read(name);
  3463. if (TF_TRACE)
  3464. PROGLOG("isDir, '%s'",name.get());
  3465. Owned<IFile> file=createIFile(name);
  3466. unsigned ret = (unsigned)file->isDirectory();
  3467. reply.append((unsigned)RFEnoerror).append(ret);
  3468. }
  3469. void cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3470. {
  3471. StringAttr name;
  3472. msg.read(name);
  3473. if (TF_TRACE)
  3474. PROGLOG("isReadOnly, '%s'",name.get());
  3475. Owned<IFile> file=createIFile(name);
  3476. unsigned ret = (unsigned)file->isReadOnly();
  3477. reply.append((unsigned)RFEnoerror).append(ret);
  3478. }
  3479. void cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3480. {
  3481. StringAttr name;
  3482. bool set;
  3483. msg.read(name).read(set);
  3484. if (TF_TRACE)
  3485. PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set);
  3486. Owned<IFile> file=createIFile(name);
  3487. file->setReadOnly(set);
  3488. reply.append((unsigned)RFEnoerror);
  3489. }
  3490. void cmdSetFilePerms(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3491. {
  3492. StringAttr name;
  3493. unsigned fPerms;
  3494. msg.read(name).read(fPerms);
  3495. if (TF_TRACE)
  3496. PROGLOG("setFilePerms, '%s' 0%o",name.get(),fPerms);
  3497. Owned<IFile> file=createIFile(name);
  3498. file->setFilePermissions(fPerms);
  3499. reply.append((unsigned)RFEnoerror);
  3500. }
  3501. void cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3502. {
  3503. StringAttr name;
  3504. msg.read(name);
  3505. if (TF_TRACE)
  3506. PROGLOG("getTime, '%s'",name.get());
  3507. Owned<IFile> file=createIFile(name);
  3508. CDateTime createTime;
  3509. CDateTime modifiedTime;
  3510. CDateTime accessedTime;
  3511. bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime);
  3512. reply.append((unsigned)RFEnoerror).append(ret);
  3513. if (ret)
  3514. {
  3515. createTime.serialize(reply);
  3516. modifiedTime.serialize(reply);
  3517. accessedTime.serialize(reply);
  3518. }
  3519. }
  3520. void cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3521. {
  3522. StringAttr name;
  3523. bool creategot;
  3524. CDateTime createTime;
  3525. bool modifiedgot;
  3526. CDateTime modifiedTime;
  3527. bool accessedgot;
  3528. CDateTime accessedTime;
  3529. msg.read(name);
  3530. msg.read(creategot);
  3531. if (creategot)
  3532. createTime.deserialize(msg);
  3533. msg.read(modifiedgot);
  3534. if (modifiedgot)
  3535. modifiedTime.deserialize(msg);
  3536. msg.read(accessedgot);
  3537. if (accessedgot)
  3538. accessedTime.deserialize(msg);
  3539. if (TF_TRACE)
  3540. PROGLOG("setTime, '%s'",name.get());
  3541. Owned<IFile> file=createIFile(name);
  3542. bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL);
  3543. reply.append((unsigned)RFEnoerror).append(ret);
  3544. }
  3545. void cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3546. {
  3547. StringAttr name;
  3548. msg.read(name);
  3549. if (TF_TRACE)
  3550. PROGLOG("CreateDir, '%s'",name.get());
  3551. Owned<IFile> dir=createIFile(name);
  3552. bool ret = dir->createDirectory();
  3553. reply.append((unsigned)RFEnoerror).append(ret);
  3554. }
  3555. void cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3556. {
  3557. StringAttr name;
  3558. StringAttr mask;
  3559. bool includedir;
  3560. bool sub;
  3561. byte stream = 0;
  3562. msg.read(name).read(mask).read(includedir).read(sub);
  3563. if (msg.remaining()>=sizeof(byte))
  3564. {
  3565. msg.read(stream);
  3566. if (stream==1)
  3567. client.opendir.clear();
  3568. }
  3569. if (TF_TRACE)
  3570. PROGLOG("GetDir, '%s', '%s', stream='%u'",name.get(),mask.get(),stream);
  3571. if (!stream && !containsFileWildcard(mask))
  3572. {
  3573. // if no streaming, and mask contains no wildcard, it is much more efficient to get the info without a directory iterator!
  3574. StringBuffer fullFilename(name);
  3575. addPathSepChar(fullFilename).append(mask);
  3576. Owned<IFile> iFile = createIFile(fullFilename);
  3577. // NB: This must preserve same serialization format as CRemoteDirectoryIterator::serialize produces for <=1 file.
  3578. reply.append((unsigned)RFEnoerror);
  3579. if (!iFile->exists())
  3580. reply.append((byte)0);
  3581. else
  3582. {
  3583. byte b=1;
  3584. reply.append(b);
  3585. bool isDir = foundYes == iFile->isDirectory();
  3586. reply.append(isDir);
  3587. reply.append(isDir ? 0 : iFile->size());
  3588. CDateTime dt;
  3589. iFile->getTime(nullptr, &dt, nullptr);
  3590. dt.serialize(reply);
  3591. reply.append(mask);
  3592. b = 0;
  3593. reply.append(b);
  3594. }
  3595. }
  3596. else
  3597. {
  3598. Owned<IFile> dir=createIFile(name);
  3599. Owned<IDirectoryIterator> iter;
  3600. if (stream>1)
  3601. iter.set(client.opendir);
  3602. else
  3603. {
  3604. iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
  3605. if (stream != 0)
  3606. client.opendir.set(iter);
  3607. }
  3608. if (!iter)
  3609. throw createDafsException(RFSERR_GetDirFailed, nullptr);
  3610. reply.append((unsigned)RFEnoerror);
  3611. if (serializeRemoteDirectoryIterator(reply,iter,stream?0x100000:0,stream<2))
  3612. {
  3613. if (stream != 0)
  3614. client.opendir.clear();
  3615. }
  3616. else
  3617. {
  3618. bool cont=true;
  3619. reply.append(cont);
  3620. }
  3621. }
  3622. }
  3623. void cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3624. {
  3625. StringAttr name;
  3626. StringAttr mask;
  3627. bool includedir;
  3628. bool sub;
  3629. unsigned checkinterval;
  3630. unsigned timeout;
  3631. __int64 cancelid; // not yet used
  3632. msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid);
  3633. byte isprev;
  3634. msg.read(isprev);
  3635. Owned<IDirectoryIterator> prev;
  3636. if (isprev==1)
  3637. {
  3638. SocketEndpoint ep;
  3639. prev.setown(createRemoteDirectorIterator(ep, name, msg));
  3640. }
  3641. if (TF_TRACE)
  3642. PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get());
  3643. Owned<IFile> dir=createIFile(name);
  3644. Owned<IDirectoryDifferenceIterator> iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout);
  3645. reply.append((unsigned)RFEnoerror);
  3646. byte state = (iter.get()==NULL)?0:1;
  3647. reply.append(state);
  3648. if (state==1)
  3649. serializeRemoteDirectoryDiff(reply, iter);
  3650. }
  3651. void cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3652. {
  3653. StringAttr uuid;
  3654. StringAttr fromFile;
  3655. StringAttr toFile;
  3656. offset_t toOfs;
  3657. offset_t fromOfs;
  3658. offset_t size;
  3659. offset_t sizeDone=0;
  3660. offset_t totalSize=(offset_t)-1;
  3661. unsigned timeout;
  3662. msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout);
  3663. AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout);
  3664. reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize);
  3665. }
  3666. static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
  3667. {
  3668. unsigned start = msTick();
  3669. Owned<IFile> dstfile = createIFile(dstfn);
  3670. // the following is really to check the dest node is up and working (otherwise not much point in continuing!)
  3671. if (dstfile->exists())
  3672. PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename());
  3673. Owned<IFile> srcfile = createIFile(srcfn);
  3674. unsigned lastmin = 0;
  3675. if (!srcfn.queryIP().ipequals(dstfn.queryIP())) {
  3676. CriticalBlock block(treeCopyCrit);
  3677. for (;;) {
  3678. CDateTime dt;
  3679. offset_t sz;
  3680. try {
  3681. sz = srcfile->size();
  3682. if (sz==(offset_t)-1) {
  3683. if (TF_TRACE_TREE_COPY)
  3684. PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename());
  3685. break;
  3686. }
  3687. srcfile->getTime(NULL,&dt,NULL);
  3688. }
  3689. catch (IException *e) {
  3690. EXCLOG(e,"treeCopyFile(1)");
  3691. e->Release();
  3692. break;
  3693. }
  3694. Linked<CTreeCopyItem> tc;
  3695. unsigned now = msTick();
  3696. ForEachItemInRev(i1,treeCopyArray) {
  3697. CTreeCopyItem &item = treeCopyArray.item(i1);
  3698. // prune old entries (not strictly needed buf I think better)
  3699. if (now-item.lastused>TREECOPYPRUNETIME)
  3700. treeCopyArray.remove(i1);
  3701. else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) {
  3702. tc.set(&item);
  3703. item.lastused = now;
  3704. }
  3705. }
  3706. if (!tc.get()) {
  3707. if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE)
  3708. treeCopyArray.remove(0);
  3709. tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt));
  3710. treeCopyArray.append(*tc.getLink());
  3711. }
  3712. ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?)
  3713. if (!tc->busy->testSet(cand)) {
  3714. // check file accessible and matches
  3715. if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor
  3716. continue;
  3717. bool ok = true;
  3718. Owned<IFile> rmtfile = createIFile(tc->loc.item(cand));
  3719. if (cand) { // only need to check if remote
  3720. try {
  3721. if (rmtfile->size()!=sz)
  3722. ok = false;
  3723. else {
  3724. CDateTime fdt;
  3725. rmtfile->getTime(NULL,&fdt,NULL);
  3726. ok = fdt.equals(dt);
  3727. }
  3728. }
  3729. catch (IException *e) {
  3730. EXCLOG(e,"treeCopyFile(2)");
  3731. e->Release();
  3732. ok = false;
  3733. }
  3734. }
  3735. if (ok) { // if not ok leave 'busy'
  3736. // finally lets try and copy!
  3737. try {
  3738. if (TF_TRACE_TREE_COPY)
  3739. PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  3740. {
  3741. CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
  3742. rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  3743. }
  3744. if (TF_TRACE_TREE_COPY)
  3745. PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  3746. tc->busy->set(cand,false);
  3747. if (treeCopyWaiting)
  3748. treeCopySem.signal((treeCopyWaiting>1)?2:1);
  3749. // add to known locations
  3750. tc->busy->set(tc->loc.ordinality(),false); // prob already is clear
  3751. tc->loc.append(dstfn);
  3752. ip.ipset(tc->loc.item(cand).queryIP());
  3753. return;
  3754. }
  3755. catch (IException *e) {
  3756. if (cand==0) {
  3757. tc->busy->set(0,false); // don't leave busy
  3758. if (treeCopyWaiting)
  3759. treeCopySem.signal();
  3760. throw; // what more can we do!
  3761. }
  3762. EXCLOG(e,"treeCopyFile(3)");
  3763. e->Release();
  3764. }
  3765. }
  3766. }
  3767. }
  3768. // all locations busy
  3769. if (msTick()-start>TREECOPYTIMEOUT) {
  3770. WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename());
  3771. break;
  3772. }
  3773. treeCopyWaiting++; // note this isn't precise - just indication
  3774. {
  3775. CriticalUnblock unblock(treeCopyCrit);
  3776. if (throttler)
  3777. {
  3778. CThrottleReleaseBlock block(*throttler, RFCtreecopy);
  3779. treeCopySem.wait(TREECOPYPOLLTIME);
  3780. }
  3781. else
  3782. treeCopySem.wait(TREECOPYPOLLTIME);
  3783. }
  3784. treeCopyWaiting--;
  3785. if ((msTick()-start)/10*1000!=lastmin) {
  3786. lastmin = (msTick()-start)/10*1000;
  3787. PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3788. }
  3789. }
  3790. }
  3791. else if (TF_TRACE_TREE_COPY)
  3792. PROGLOG("TREECOPY source on same node as destination");
  3793. if (TF_TRACE_TREE_COPY)
  3794. PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3795. try {
  3796. GetHostIp(ip);
  3797. srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  3798. }
  3799. catch (IException *e) {
  3800. EXCLOG(e,"TREECOPY(done,fallback)");
  3801. throw;
  3802. }
  3803. if (TF_TRACE_TREE_COPY)
  3804. PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  3805. }
  3806. void cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler, bool usetmp=false)
  3807. {
  3808. RemoteFilename src;
  3809. src.deserialize(msg);
  3810. RemoteFilename dst;
  3811. dst.deserialize(msg);
  3812. StringAttr net;
  3813. StringAttr mask;
  3814. msg.read(net).read(mask);
  3815. IpAddress ip;
  3816. treeCopyFile(src,dst,net,mask,ip,usetmp,throttler);
  3817. unsigned status = 0;
  3818. reply.append((unsigned)RFEnoerror).append((unsigned)status);
  3819. ip.ipserialize(reply);
  3820. }
  3821. void cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler)
  3822. {
  3823. cmdTreeCopy(msg, reply, client, throttler, true);
  3824. }
  3825. void cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3826. {
  3827. StringAttr name;
  3828. msg.read(name);
  3829. if (TF_TRACE)
  3830. PROGLOG("getCRC, '%s'",name.get());
  3831. Owned<IFile> file=createIFile(name);
  3832. unsigned ret = file->getCRC();
  3833. reply.append((unsigned)RFEnoerror).append(ret);
  3834. }
  3835. void cmdStop(MemoryBuffer &msg, MemoryBuffer &reply)
  3836. {
  3837. PROGLOG("Abort request received");
  3838. stopping = true;
  3839. if (acceptsock)
  3840. acceptsock->cancel_accept();
  3841. if (securesock)
  3842. securesock->cancel_accept();
  3843. if (rowServiceSock)
  3844. rowServiceSock->cancel_accept();
  3845. reply.append((unsigned)RFEnoerror);
  3846. }
  3847. void cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3848. {
  3849. StringAttr cmdLine;
  3850. msg.read(cmdLine);
  3851. // NB: legacy remoteExec used to simply pass error code and buffer back to caller.
  3852. VStringBuffer errMsg("Remote command execution no longer supported. Trying to execute cmdline=%s", cmdLine.get());
  3853. WARNLOG("%s", errMsg.str());
  3854. size32_t outSz = errMsg.length()+1; // reply with null terminated string
  3855. // reply with error code -1
  3856. reply.append((unsigned)-1).append((unsigned)0).append(outSz).append(outSz, errMsg.str());
  3857. }
  3858. void cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply)
  3859. {
  3860. byte flags;
  3861. msg.read(flags);
  3862. int retcode=-1;
  3863. if (flags!=255) // escape
  3864. {
  3865. retcode = traceFlags;
  3866. traceFlags = flags;
  3867. }
  3868. reply.append(retcode);
  3869. }
  3870. void cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply)
  3871. {
  3872. unsigned level=1;
  3873. if (msg.remaining() >= sizeof(unsigned))
  3874. msg.read(level);
  3875. StringBuffer retstr;
  3876. getInfo(retstr, level);
  3877. reply.append(RFEnoerror).append(retstr.str());
  3878. }
  3879. void cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply)
  3880. {
  3881. // TBD
  3882. StringBuffer retstr;
  3883. getInfo(retstr);
  3884. reply.append(RFEnoerror).append(retstr.str());
  3885. }
  3886. void cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3887. {
  3888. StringAttr prefix;
  3889. StringAttr filename;
  3890. msg.read(prefix).read(filename);
  3891. RemoteFilename rfn;
  3892. rfn.setLocalPath(filename);
  3893. ExtractedBlobArray extracted;
  3894. extractBlobElements(prefix, rfn, extracted);
  3895. unsigned n = extracted.ordinality();
  3896. reply.append((unsigned)RFEnoerror).append(n);
  3897. for (unsigned i=0;i<n;i++)
  3898. extracted.item(i).serialize(reply);
  3899. }
  3900. void cmdStreamGeneral(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3901. {
  3902. size32_t jsonSz;
  3903. msg.read(jsonSz);
  3904. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
  3905. cmdStreamCommon(requestTree, msg, reply, client);
  3906. }
  3907. /* Notes on protocol:
  3908. *
  3909. * A JSON request with these top-level fields:
  3910. * "format" - the format of the reply. Supported formats = "binary", "xml", "json"
  3911. * "handle" - the handle of for a file session that was previously open (for continuation)
  3912. * "commCompression" - compression format of the communication protocol. Supports "LZ4", "LZW", "FLZ" (TBD: "ZLIB")
  3913. * "replyLimit" - Number of K to limit each reply size to. (default 1024)
  3914. * "node" - contains all 'activity' properties below:
  3915. *
  3916. * For a secured dafilesrv (streaming protocol), requests will only be accepted if the meta blob ("metaInfo") has a matching signature.
  3917. * The request must specify "filePart" (1 based) to denote the partition # being read from or written to.
  3918. *
  3919. * "filePartCopy" (1 based) defaults to 1
  3920. *
  3921. * "kind" - supported kinds = "diskread", "diskwrite", "indexread", "indexcount" (TBD: "diskcount", "indexwrite", "disklookup")
  3922. * NB: disk vs index will be auto detected if "kind" is absent.
  3923. *
  3924. * "action" - supported actions = "count" (used if "kind" is auto-detected to specify count should be performed instead of read)
  3925. *
  3926. * "keyFilter" - filter the results by this expression (See: HPCC-18474 for more details).
  3927. *
  3928. * "chooseN" - maximum # of results to return
  3929. *
  3930. * "compressed" - specifies whether input file is compressed. NB: not relevant to "index" types. Default = false. Auto-detected.
  3931. *
  3932. * "input" - specifies layout on disk of the file being read.
  3933. *
  3934. * "output" - where relavant, specifies the output format to be returned
  3935. *
  3936. * "fileName" is only used for unsecured non signed connections (normally forbidden), and specifies the fully qualified path to a physical file.
  3937. *
  3938. */
  3939. void cmdStreamCommon(IPropertyTree *requestTree, MemoryBuffer &rest, MemoryBuffer &reply, CRemoteClientHandler &client)
  3940. {
  3941. /* Example JSON request:
  3942. *
  3943. * {
  3944. * "format" : "binary",
  3945. * "handle" : "1234",
  3946. * "replyLimit" : "64",
  3947. * "commCompression" : "LZ4",
  3948. * "node" : {
  3949. * "metaInfo" : "",
  3950. * "filePart" : 2,
  3951. * "filePartCopy" : 1,
  3952. * "kind" : "diskread",
  3953. * "fileName": "examplefilename",
  3954. * "keyFilter" : "f1='1 '",
  3955. * "chooseN" : 5,
  3956. * "compressed" : "false"
  3957. * "input" : {
  3958. * "f1" : "string5",
  3959. * "f2" : "string5"
  3960. * },
  3961. * "output" : {
  3962. * "f2" : "string",
  3963. * "f1" : "real"
  3964. * }
  3965. * }
  3966. * }
  3967. * OR
  3968. * {
  3969. * "format" : "binary",
  3970. * "handle" : "1234",
  3971. * "replyLimit" : "64",
  3972. * "commCompression" : "LZ4",
  3973. * "node" : {
  3974. * "kind" : "diskread",
  3975. * "fileName": "examplefilename",
  3976. * "keyFilter" : "f1='1 '",
  3977. * "chooseN" : 5,
  3978. * "compressed" : "false"
  3979. * "input" : {
  3980. * "f1" : "string5",
  3981. * "f2" : "string5"
  3982. * },
  3983. * "output" : {
  3984. * "f2" : "string",
  3985. * "f1" : "real"
  3986. * }
  3987. * }
  3988. * }
  3989. * OR
  3990. * {
  3991. * "format" : "xml",
  3992. * "handle" : "1234",
  3993. * "replyLimit" : "64",
  3994. * "node" : {
  3995. * "kind" : "diskread",
  3996. * "fileName": "examplefilename",
  3997. * "keyFilter" : "f1='1 '",
  3998. * "chooseN" : 5,
  3999. * "compressed" : "false"
  4000. * "input" : {
  4001. * "f1" : "string5",
  4002. * "f2" : "string5"
  4003. * },
  4004. * "output" : {
  4005. * "f2" : "string",
  4006. * "f1" : "real"
  4007. * }
  4008. * }
  4009. * }
  4010. * OR
  4011. * {
  4012. * "format" : "xml",
  4013. * "handle" : "1234",
  4014. * "node" : {
  4015. * "kind" : "indexread",
  4016. * "fileName": "examplefilename",
  4017. * "keyFilter" : "f1='1 '",
  4018. * "input" : {
  4019. * "f1" : "string5",
  4020. * "f2" : "string5"
  4021. * },
  4022. * "output" : {
  4023. * "f2" : "string",
  4024. * "f1" : "real"
  4025. * }
  4026. * }
  4027. * OR
  4028. * {
  4029. * "format" : "xml",
  4030. * "node" : {
  4031. * "kind" : "xmlread",
  4032. * "fileName": "examplefilename",
  4033. * "keyFilter" : "f1='1 '",
  4034. * "input" : {
  4035. * "f1" : "string5",
  4036. * "f2" : "string5"
  4037. * },
  4038. * "output" : {
  4039. * "f2" : "string",
  4040. * "f1" : "real"
  4041. * }
  4042. * "ActivityOptions" : { // usually not required, options here may override file meta info.
  4043. * "rowTag" : "/Dataset/OtherRow"
  4044. * }
  4045. * }
  4046. * OR
  4047. * {
  4048. * "format" : "xml",
  4049. * "node" : {
  4050. * "kind" : "csvread",
  4051. * "fileName": "examplefilename",
  4052. * "keyFilter" : "f1='1 '",
  4053. * "input" : {
  4054. * "f1" : "string5",
  4055. * "f2" : "string5"
  4056. * },
  4057. * "output" : {
  4058. * "f2" : "string",
  4059. * "f1" : "real"
  4060. * }
  4061. * "ActivityOptions" : { // usually not required, options here may override file meta info.
  4062. * "csvQuote" : "\"",
  4063. * "csvSeparate" : ","
  4064. * "csvTerminate" : "\\n,\\r\\n",
  4065. * }
  4066. * }
  4067. * OR
  4068. * {
  4069. * "format" : "xml",
  4070. * "node" : {
  4071. * "action" : "count", // if present performs count with/without filter and returns count
  4072. * "fileName": "examplefilename", // can be either index or flat file
  4073. * "keyFilter" : "f1='1 '",
  4074. * "input" : {
  4075. * "f1" : "string5",
  4076. * "f2" : "string5"
  4077. * },
  4078. * }
  4079. * }
  4080. * OR
  4081. * {
  4082. * "format" : "binary",
  4083. * "handle" : "1234",
  4084. * "replyLimit" : "64",
  4085. * "commCompression" : "LZ4",
  4086. * "node" : {
  4087. * "kind" : "diskwrite",
  4088. * "fileName": "examplefilename",
  4089. * "compressed" : "false" (or "LZ4", "FLZ", "LZW")
  4090. * "input" : {
  4091. * "f1" : "string5",
  4092. * "f2" : "string5"
  4093. * }
  4094. * }
  4095. * }
  4096. * OR
  4097. * {
  4098. * "format" : "binary",
  4099. * "handle" : "1234",
  4100. * "replyLimit" : "64",
  4101. * "node" : {
  4102. * "kind" : "indexwrite",
  4103. * "fileName": "examplefilename",
  4104. * "input" : {
  4105. * "f1" : "string5",
  4106. * "f2" : "string5"
  4107. * }
  4108. * }
  4109. * }
  4110. *
  4111. */
  4112. int cursorHandle = requestTree->getPropInt("handle");
  4113. OutputFormat outputFormat = outFmt_Xml;
  4114. Owned<ICompressor> compressor;
  4115. Owned<IExpander> expander;
  4116. Owned<CRemoteRequest> remoteRequest;
  4117. Owned<IRemoteActivity> outputActivity;
  4118. OpenFileInfo fileInfo;
  4119. if (!cursorHandle)
  4120. {
  4121. const char *outputFmtStr = requestTree->queryProp("format");
  4122. if (nullptr == outputFmtStr)
  4123. outputFormat = outFmt_Xml; // default
  4124. else if (strieq("xml", outputFmtStr))
  4125. outputFormat = outFmt_Xml;
  4126. else if (strieq("json", outputFmtStr))
  4127. outputFormat = outFmt_Json;
  4128. else if (strieq("binary", outputFmtStr))
  4129. outputFormat = outFmt_Binary;
  4130. else
  4131. throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
  4132. /* pre-version 2.4, "outputCompression" denoted data was compressed in communication protocol and only applied to reply row data
  4133. * Since 2.5 "commCompression" replaces "outputCompression", and applies to both incoming row data (write) and outgoing row data (read).
  4134. * But "outputCompression" is checked for backward compatibility.
  4135. */
  4136. if (requestTree->hasProp("outputCompression") || requestTree->hasProp("commCompression"))
  4137. {
  4138. const char *commCompressionType = requestTree->queryProp("commCompression");
  4139. if (isEmptyString(commCompressionType))
  4140. commCompressionType = requestTree->queryProp("outputCompression");
  4141. if (isEmptyString(commCompressionType))
  4142. {
  4143. compressor.setown(queryDefaultCompressHandler()->getCompressor());
  4144. expander.setown(queryDefaultCompressHandler()->getExpander());
  4145. }
  4146. else if (outFmt_Binary == outputFormat)
  4147. {
  4148. compressor.setown(getCompressor(commCompressionType));
  4149. expander.setown(getExpander(commCompressionType));
  4150. if (!compressor)
  4151. WARNLOG("Unknown compressor type specified: %s", commCompressionType);
  4152. }
  4153. else
  4154. WARNLOG("Communication protocol compression not supported for format: %s", outputFmtStr);
  4155. }
  4156. /* NB: unless client call is on dedicated service, allow non-authorized requests through, e.g. from engines talking to unsecured port
  4157. * In a secure setup, this service will be configured on a dedicated port, and the std. insecure dafilesrv will be unreachable.
  4158. */
  4159. bool authorizedOnly = rowServiceSock && client.isRowServiceClient();
  4160. // In future this may be passed the request and build a chain of activities and return sink.
  4161. outputActivity.setown(createOutputActivity(*requestTree, authorizedOnly, keyPairInfo));
  4162. {
  4163. CriticalBlock block(sect);
  4164. cursorHandle = getNextHandle();
  4165. }
  4166. remoteRequest.setown(new CRemoteRequest(cursorHandle, outputFormat, compressor, expander, outputActivity));
  4167. StringBuffer requestStr("jsonrequest:");
  4168. outputActivity->getInfoStr(requestStr);
  4169. Owned<StringAttrItem> name = new StringAttrItem(requestStr);
  4170. CriticalBlock block(sect);
  4171. client.previdx = client.openFiles.ordinality();
  4172. client.openFiles.append(OpenFileInfo(cursorHandle, remoteRequest, name));
  4173. }
  4174. else if (!lookupFileIOHandle(cursorHandle, fileInfo))
  4175. cursorHandle = 0; // challenge response ..
  4176. else // known handle, continuation
  4177. {
  4178. remoteRequest.set(fileInfo.remoteRequest);
  4179. outputFormat = fileInfo.remoteRequest->queryFormat();
  4180. }
  4181. if (cursorHandle)
  4182. remoteRequest->process(requestTree, rest, reply);
  4183. else
  4184. {
  4185. const char *outputFmtStr = requestTree->queryProp("format");
  4186. if (nullptr == outputFmtStr)
  4187. outputFormat = outFmt_Xml; // default
  4188. else if (strieq("xml", outputFmtStr))
  4189. outputFormat = outFmt_Xml;
  4190. else if (strieq("json", outputFmtStr))
  4191. outputFormat = outFmt_Json;
  4192. else if (strieq("binary", outputFmtStr))
  4193. outputFormat = outFmt_Binary;
  4194. else
  4195. throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
  4196. if (outFmt_Binary == outputFormat)
  4197. reply.append(cursorHandle);
  4198. else // outFmt_Xml || outFmt_Json
  4199. {
  4200. Owned<IXmlWriterExt> responseWriter = createIXmlWriterExt(0, 0, nullptr, outFmt_Xml == outputFormat ? WTStandard : WTJSONObject);
  4201. responseWriter->outputBeginNested("Response", true);
  4202. if (outFmt_Xml == outputFormat)
  4203. responseWriter->outputCString("urn:hpcc:dfs", "@xmlns:dfs");
  4204. responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
  4205. responseWriter->outputEndNested("Response");
  4206. responseWriter->finalize();
  4207. reply.append(responseWriter->length(), responseWriter->str());
  4208. }
  4209. }
  4210. }
  4211. void cmdStreamReadCommon(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4212. {
  4213. size32_t jsonSz = msg.remaining();
  4214. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
  4215. cmdStreamCommon(requestTree, msg, reply, client);
  4216. }
  4217. // NB: JSON header to message, for some requests (e.g. write), there will be trailing raw data (e.g. row data)
  4218. void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4219. {
  4220. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4221. cmdStreamReadCommon(msg, reply, client);
  4222. }
  4223. void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4224. {
  4225. /* NB: exactly the same handling as cmdStreamReadStd(RFCStreamRead) for now,
  4226. * may want to differentiate later
  4227. * i.e. return format is { len[unsigned4-bigendian], errorcode[unsigned4-bigendian], result } - where result format depends on request output type.
  4228. * errorcode = 0 means no error
  4229. */
  4230. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4231. cmdStreamReadCommon(msg, reply, client);
  4232. }
  4233. void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  4234. {
  4235. reply.append('J');
  4236. cmdStreamReadCommon(msg, reply, client);
  4237. }
  4238. // legacy version
  4239. void cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
  4240. {
  4241. unsigned limit, delayMs, cpuThreshold;
  4242. msg.read(limit);
  4243. msg.read(delayMs);
  4244. msg.read(cpuThreshold);
  4245. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, (unsigned)-1);
  4246. reply.append((unsigned)RFEnoerror);
  4247. }
  4248. void cmdSetThrottle2(MemoryBuffer & msg, MemoryBuffer & reply)
  4249. {
  4250. unsigned throttleClass, limit, delayMs, cpuThreshold, queueLimit;
  4251. msg.read(throttleClass);
  4252. msg.read(limit);
  4253. msg.read(delayMs);
  4254. msg.read(cpuThreshold);
  4255. msg.read(queueLimit);
  4256. setThrottle((ThrottleClass)throttleClass, limit, delayMs, cpuThreshold, queueLimit);
  4257. reply.append((unsigned)RFEnoerror);
  4258. }
  4259. void formatException(MemoryBuffer &reply, IException *e, RemoteFileCommandType cmd, bool testSocketFlag, unsigned _dfsErrorCode, CRemoteClientHandler *client)
  4260. {
  4261. unsigned dfsErrorCode = _dfsErrorCode;
  4262. if (!dfsErrorCode)
  4263. {
  4264. if (e)
  4265. dfsErrorCode = (QUERYINTERFACE(e, IDAFS_Exception)) ? e->errorCode() : RFSERR_InternalError;
  4266. else
  4267. dfsErrorCode = RFSERR_InternalError;
  4268. }
  4269. VStringBuffer errMsg("ERROR: cmd=%s, error=%s", getRFCText(cmd), getRFSERRText(dfsErrorCode));
  4270. if (e)
  4271. {
  4272. errMsg.appendf(" (%u, ", e->errorCode());
  4273. unsigned len = errMsg.length();
  4274. e->errorMessage(errMsg);
  4275. if (len == errMsg.length())
  4276. errMsg.setLength(len-2); // strip off ", " if no message in exception
  4277. errMsg.append(")");
  4278. }
  4279. if (testSocketFlag)
  4280. reply.append('-');
  4281. else
  4282. reply.append(dfsErrorCode);
  4283. reply.append(errMsg.str());
  4284. if (client && cmd!=RFCunlock)
  4285. {
  4286. const char *peer = client->queryPeerName();
  4287. if (peer)
  4288. {
  4289. VStringBuffer err("%s. Client: %s", errMsg.str(), peer);
  4290. PROGLOG("%s", err.str());
  4291. }
  4292. client->logPrevHandle();
  4293. }
  4294. }
  4295. void throttleCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CRemoteClientHandler *client)
  4296. {
  4297. switch (cmd)
  4298. {
  4299. case RFCexec:
  4300. case RFCgetcrc:
  4301. case RFCcopy:
  4302. case RFCappend:
  4303. case RFCtreecopy:
  4304. case RFCtreecopytmp:
  4305. slowCmdThrottler.addCommand(cmd, msg, client);
  4306. return;
  4307. case RFCcloseIO:
  4308. case RFCopenIO:
  4309. case RFCread:
  4310. case RFCsize:
  4311. case RFCwrite:
  4312. case RFCexists:
  4313. case RFCremove:
  4314. case RFCrename:
  4315. case RFCgetver:
  4316. case RFCisfile:
  4317. case RFCisdirectory:
  4318. case RFCisreadonly:
  4319. case RFCsetreadonly:
  4320. case RFCsetfileperms:
  4321. case RFCreadfilteredindex:
  4322. case RFCreadfilteredindexcount:
  4323. case RFCreadfilteredindexblob:
  4324. case RFCgettime:
  4325. case RFCsettime:
  4326. case RFCcreatedir:
  4327. case RFCgetdir:
  4328. case RFCmonitordir:
  4329. case RFCstop:
  4330. case RFCextractblobelements:
  4331. case RFCredeploy:
  4332. case RFCmove:
  4333. case RFCsetsize:
  4334. case RFCsettrace:
  4335. case RFCgetinfo:
  4336. case RFCfirewall:
  4337. case RFCStreamRead:
  4338. case RFCStreamReadTestSocket:
  4339. case RFCStreamReadJSON:
  4340. stdCmdThrottler.addCommand(cmd, msg, client);
  4341. return;
  4342. // NB: The following commands are still bound by the the thread pool
  4343. case RFCsetthrottle: // legacy version
  4344. case RFCsetthrottle2:
  4345. case RFCcopysection: // slightly odd, but has it's own limit
  4346. default:
  4347. {
  4348. client->processCommand(cmd, msg, NULL);
  4349. break;
  4350. }
  4351. }
  4352. }
  4353. void checkAuthorizedStreamCommand(CRemoteClientHandler &client)
  4354. {
  4355. if (!rowServiceOnStdPort && !client.isRowServiceClient())
  4356. throw createDafsException(DAFSERR_cmdstream_unauthorized, "Unauthorized command");
  4357. }
  4358. bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
  4359. {
  4360. Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
  4361. bool testSocketFlag = false;
  4362. unsigned posOfErr = reply.length();
  4363. try
  4364. {
  4365. switch(cmd)
  4366. {
  4367. MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
  4368. MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
  4369. MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
  4370. MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
  4371. MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
  4372. MAPCOMMAND(RFCsize, cmdSize);
  4373. MAPCOMMANDCLIENT(RFCexists, cmdExists, *client);
  4374. MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client);
  4375. MAPCOMMANDCLIENT(RFCrename, cmdRename, *client);
  4376. MAPCOMMAND(RFCgetver, cmdGetVer);
  4377. MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client);
  4378. MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client);
  4379. MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client);
  4380. MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client);
  4381. MAPCOMMANDCLIENT(RFCsetfileperms, cmdSetFilePerms, *client);
  4382. MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client);
  4383. MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client);
  4384. MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client);
  4385. MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client);
  4386. MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client);
  4387. MAPCOMMAND(RFCstop, cmdStop);
  4388. MAPCOMMANDCLIENT(RFCexec, cmdExec, *client);
  4389. MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client);
  4390. MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client);
  4391. MAPCOMMANDCLIENT(RFCmove, cmdMove, *client);
  4392. MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client);
  4393. MAPCOMMAND(RFCsetsize, cmdSetSize);
  4394. MAPCOMMAND(RFCsettrace, cmdSetTrace);
  4395. MAPCOMMAND(RFCgetinfo, cmdGetInfo);
  4396. MAPCOMMAND(RFCfirewall, cmdFirewall);
  4397. MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
  4398. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
  4399. MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
  4400. MAPCOMMAND(RFCsetthrottle, cmdSetThrottle); // legacy version
  4401. MAPCOMMAND(RFCsetthrottle2, cmdSetThrottle2);
  4402. // row service commands
  4403. case RFCStreamGeneral:
  4404. {
  4405. checkAuthorizedStreamCommand(*client);
  4406. reply.append(RFEnoerror); // gets patched if there is a follow on error
  4407. cmdStreamGeneral(msg, reply, *client);
  4408. break;
  4409. }
  4410. case RFCStreamRead:
  4411. {
  4412. checkAuthorizedStreamCommand(*client);
  4413. cmdStreamReadStd(msg, reply, *client);
  4414. break;
  4415. }
  4416. case RFCStreamReadJSON:
  4417. {
  4418. checkAuthorizedStreamCommand(*client);
  4419. cmdStreamReadJSON(msg, reply, *client);
  4420. break;
  4421. }
  4422. case RFCStreamReadTestSocket:
  4423. {
  4424. testSocketFlag = true;
  4425. checkAuthorizedStreamCommand(*client);
  4426. cmdStreamReadTestSocket(msg, reply, *client);
  4427. break;
  4428. }
  4429. default:
  4430. formatException(reply, nullptr, cmd, false, RFSERR_InvalidCommand, client);
  4431. break;
  4432. }
  4433. }
  4434. catch (IException *e)
  4435. {
  4436. reply.setWritePos(posOfErr);
  4437. formatException(reply, e, cmd, testSocketFlag, 0, client);
  4438. }
  4439. return testSocketFlag;
  4440. }
  4441. IPooledThread *createCommandProcessor()
  4442. {
  4443. return new cCommandProcessor();
  4444. }
  4445. virtual void run(DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
  4446. {
  4447. SocketEndpoint sslep(listenep);
  4448. if (sslPort)
  4449. sslep.port = sslPort;
  4450. else
  4451. sslep.port = securitySettings.daFileSrvSSLPort;
  4452. Owned<ISocket> acceptSock, secureSock, rowServiceSock;
  4453. if (_connectMethod != SSLOnly)
  4454. {
  4455. if (listenep.port == 0)
  4456. throw createDafsException(DAFSERR_serverinit_failed, "dafilesrv port not specified");
  4457. if (listenep.isNull())
  4458. acceptSock.setown(ISocket::create(listenep.port));
  4459. else
  4460. {
  4461. StringBuffer ips;
  4462. listenep.getIpText(ips);
  4463. acceptSock.setown(ISocket::create_ip(listenep.port,ips.str()));
  4464. }
  4465. }
  4466. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  4467. {
  4468. if (sslep.port == 0)
  4469. throw createDafsException(DAFSERR_serverinit_failed, "Secure dafilesrv port not specified");
  4470. if (_connectMethod == UnsecureFirst)
  4471. {
  4472. // don't fail, but warn - this allows for fast SSL client rejections
  4473. if (!securitySettings.certificate)
  4474. WARNLOG("SSL Certificate information not found in environment.conf, cannot accept SSL connections");
  4475. else if ( !checkFileExists(securitySettings.certificate) )
  4476. {
  4477. WARNLOG("SSL Certificate File not found in environment.conf, cannot accept SSL connections");
  4478. securitySettings.certificate = nullptr;
  4479. }
  4480. if (!securitySettings.privateKey)
  4481. WARNLOG("SSL Key information not found in environment.conf, cannot accept SSL connections");
  4482. else if ( !checkFileExists(securitySettings.privateKey) )
  4483. {
  4484. WARNLOG("SSL Key File not found in environment.conf, cannot accept SSL connections");
  4485. securitySettings.privateKey = nullptr;
  4486. }
  4487. }
  4488. else
  4489. validateSSLSetup();
  4490. if (sslep.isNull())
  4491. secureSock.setown(ISocket::create(sslep.port));
  4492. else
  4493. {
  4494. StringBuffer ips;
  4495. sslep.getIpText(ips);
  4496. secureSock.setown(ISocket::create_ip(sslep.port,ips.str()));
  4497. }
  4498. }
  4499. if (rowServiceEp)
  4500. {
  4501. rowServiceSSL = _rowServiceSSL;
  4502. rowServiceOnStdPort = _rowServiceOnStdPort;
  4503. if (rowServiceEp->isNull())
  4504. rowServiceSock.setown(ISocket::create(rowServiceEp->port));
  4505. else
  4506. {
  4507. StringBuffer ips;
  4508. rowServiceEp->getIpText(ips);
  4509. rowServiceSock.setown(ISocket::create_ip(rowServiceEp->port, ips.str()));
  4510. }
  4511. #ifdef _USE_OPENSSL
  4512. if (rowServiceSSL)
  4513. validateSSLSetup();
  4514. #else
  4515. rowServiceSSL = false;
  4516. #endif
  4517. }
  4518. run(_connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
  4519. }
  4520. virtual void run(DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock) override
  4521. {
  4522. acceptsock.setown(_acceptSock);
  4523. securesock.setown(_secureSock);
  4524. rowServiceSock.setown(_rowServiceSock);
  4525. if (_connectMethod != SSLOnly)
  4526. {
  4527. if (!acceptsock)
  4528. throw createDafsException(DAFSERR_serverinit_failed, "Invalid non-secure socket");
  4529. }
  4530. if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
  4531. {
  4532. if (!securesock)
  4533. throw createDafsException(DAFSERR_serverinit_failed, "Invalid secure socket");
  4534. }
  4535. selecthandler->start();
  4536. for (;;)
  4537. {
  4538. Owned<ISocket> sock;
  4539. Owned<ISocket> sockSSL;
  4540. Owned<ISocket> acceptedRSSock;
  4541. bool sockavail = false;
  4542. bool securesockavail = false;
  4543. bool rowServiceSockAvail = false;
  4544. if (_connectMethod == SSLNone && (nullptr == rowServiceSock.get()))
  4545. sockavail = acceptsock->wait_read(1000*60*1)!=0;
  4546. else if (_connectMethod == SSLOnly && (nullptr == rowServiceSock.get()))
  4547. securesockavail = securesock->wait_read(1000*60*1)!=0;
  4548. else
  4549. {
  4550. UnsignedArray readSocks;
  4551. UnsignedArray waitingSocks;
  4552. if (acceptsock)
  4553. readSocks.append(acceptsock->OShandle());
  4554. if (securesock)
  4555. readSocks.append(securesock->OShandle());
  4556. if (rowServiceSock)
  4557. readSocks.append(rowServiceSock->OShandle());
  4558. int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
  4559. if (numReady > 0)
  4560. {
  4561. for (int idx = 0; idx < numReady; idx++)
  4562. {
  4563. unsigned waitingSock = waitingSocks.item(idx);
  4564. if (acceptsock && (waitingSock == acceptsock->OShandle()))
  4565. sockavail = true;
  4566. else if (securesock && (waitingSock == securesock->OShandle()))
  4567. securesockavail = true;
  4568. else if (rowServiceSock && (waitingSock == rowServiceSock->OShandle()))
  4569. rowServiceSockAvail = true;
  4570. }
  4571. }
  4572. }
  4573. #if 0
  4574. if (!sockavail && !securesockavail && !rowServiceSockAvail)
  4575. {
  4576. JSocketStatistics stats;
  4577. getSocketStatistics(stats);
  4578. StringBuffer s;
  4579. getSocketStatisticsString(stats,s);
  4580. PROGLOG( "Socket statistics : \n%s\n",s.str());
  4581. }
  4582. #endif
  4583. if (stopping)
  4584. break;
  4585. if (sockavail || securesockavail || rowServiceSockAvail)
  4586. {
  4587. if (sockavail)
  4588. {
  4589. try
  4590. {
  4591. sock.setown(acceptsock->accept(true));
  4592. if (!sock||stopping)
  4593. break;
  4594. }
  4595. catch (IException *e)
  4596. {
  4597. EXCLOG(e,"CRemoteFileServer");
  4598. e->Release();
  4599. continue;
  4600. }
  4601. }
  4602. if (securesockavail)
  4603. {
  4604. Owned<ISecureSocket> ssock;
  4605. try
  4606. {
  4607. sockSSL.setown(securesock->accept(true));
  4608. if (!sockSSL||stopping)
  4609. break;
  4610. if ( (_connectMethod == UnsecureFirst) && (!securitySettings.certificate || !securitySettings.privateKey) )
  4611. {
  4612. // for client secure_connect() to fail quickly ...
  4613. cleanupDaFsSocket(sockSSL);
  4614. sockSSL.clear();
  4615. securesockavail = false;
  4616. }
  4617. else
  4618. {
  4619. ssock.setown(createSecureSocket(sockSSL.getClear(), ServerSocket));
  4620. int status = ssock->secure_accept();
  4621. if (status < 0)
  4622. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection");
  4623. sockSSL.setown(ssock.getLink());
  4624. }
  4625. }
  4626. catch (IJSOCK_Exception *e)
  4627. {
  4628. // accept failed ...
  4629. EXCLOG(e,"CRemoteFileServer (secure)");
  4630. e->Release();
  4631. break;
  4632. }
  4633. catch (IException *e) // IDAFS_Exception also ...
  4634. {
  4635. EXCLOG(e,"CRemoteFileServer1 (secure)");
  4636. e->Release();
  4637. cleanupDaFsSocket(sockSSL);
  4638. sockSSL.clear();
  4639. cleanupDaFsSocket(ssock);
  4640. ssock.clear();
  4641. securesockavail = false;
  4642. }
  4643. }
  4644. if (rowServiceSockAvail)
  4645. {
  4646. Owned<ISecureSocket> ssock;
  4647. try
  4648. {
  4649. acceptedRSSock.setown(rowServiceSock->accept(true));
  4650. if (!acceptedRSSock||stopping)
  4651. break;
  4652. if (rowServiceSSL) // NB: will be disabled if !_USE_OPENSLL
  4653. {
  4654. ssock.setown(createSecureSocket(acceptedRSSock.getClear(), ServerSocket));
  4655. int status = ssock->secure_accept();
  4656. if (status < 0)
  4657. throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish SSL row service connection");
  4658. acceptedRSSock.setown(ssock.getLink());
  4659. }
  4660. }
  4661. catch (IJSOCK_Exception *e)
  4662. {
  4663. // accept failed ...
  4664. EXCLOG(e,"CRemoteFileServer (row service)");
  4665. e->Release();
  4666. break;
  4667. }
  4668. catch (IException *e) // IDAFS_Exception also ...
  4669. {
  4670. EXCLOG(e,"CRemoteFileServer1 (row service)");
  4671. e->Release();
  4672. cleanupDaFsSocket(acceptedRSSock);
  4673. sockSSL.clear();
  4674. cleanupDaFsSocket(ssock);
  4675. ssock.clear();
  4676. rowServiceSockAvail = false;
  4677. }
  4678. }
  4679. #ifdef _DEBUG
  4680. SocketEndpoint eps;
  4681. StringBuffer peerURL;
  4682. #endif
  4683. if (sockavail)
  4684. {
  4685. #ifdef _DEBUG
  4686. sock->getPeerEndpoint(eps);
  4687. eps.getUrlStr(peerURL);
  4688. PROGLOG("Server accepting from %s", peerURL.str());
  4689. #endif
  4690. runClient(sock.getClear(), false);
  4691. }
  4692. if (securesockavail)
  4693. {
  4694. #ifdef _DEBUG
  4695. sockSSL->getPeerEndpoint(eps);
  4696. eps.getUrlStr(peerURL.clear());
  4697. PROGLOG("Server accepting SECURE from %s", peerURL.str());
  4698. #endif
  4699. runClient(sockSSL.getClear(), false);
  4700. }
  4701. if (rowServiceSockAvail)
  4702. {
  4703. #ifdef _DEBUG
  4704. acceptedRSSock->getPeerEndpoint(eps);
  4705. eps.getUrlStr(peerURL.clear());
  4706. PROGLOG("Server accepting row service socket from %s", peerURL.str());
  4707. #endif
  4708. runClient(acceptedRSSock.getClear(), true);
  4709. }
  4710. }
  4711. else
  4712. checkTimeout();
  4713. }
  4714. if (TF_TRACE_CLIENT_STATS)
  4715. PROGLOG("CRemoteFileServer:run exiting");
  4716. selecthandler->stop(true);
  4717. }
  4718. void processUnauthenticatedCommand(RemoteFileCommandType cmd, ISocket *socket, MemoryBuffer &msg)
  4719. {
  4720. // these are unauthenticated commands
  4721. if (cmd != RFCgetver)
  4722. cmd = RFCinvalid;
  4723. MemoryBuffer reply;
  4724. bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
  4725. sendDaFsBuffer(socket, reply, testSocketFlag);
  4726. }
  4727. void runClient(ISocket *sock, bool rowService) // rowService used to distinguish client calls
  4728. {
  4729. cCommandProcessor::cCommandProcessorParams params;
  4730. params.client = new CRemoteClientHandler(this, sock, globallasttick, rowService);
  4731. {
  4732. CriticalBlock block(sect);
  4733. clients.append(*LINK(params.client));
  4734. }
  4735. // NB: This could be blocked, by thread pool limit
  4736. threads->start(&params);
  4737. }
  4738. void stop()
  4739. {
  4740. // stop accept loop
  4741. if (TF_TRACE_CLIENT_STATS)
  4742. PROGLOG("CRemoteFileServer::stop");
  4743. if (acceptsock)
  4744. acceptsock->cancel_accept();
  4745. if (securesock)
  4746. securesock->cancel_accept();
  4747. threads->stopAll();
  4748. threads->joinAll(true,60*1000);
  4749. }
  4750. bool notify(CRemoteClientHandler *_client, MemoryBuffer &msg)
  4751. {
  4752. Linked<CRemoteClientHandler> client;
  4753. client.set(_client);
  4754. if (TF_TRACE_FULL)
  4755. PROGLOG("notify %d", msg.length());
  4756. if (msg.length())
  4757. {
  4758. if (TF_TRACE_FULL)
  4759. PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
  4760. cCommandProcessor::cCommandProcessorParams params;
  4761. params.client = client.getClear();
  4762. params.msg.swapWith(msg);
  4763. /* This can block because the thread pool is full and therefore block the selecthandler
  4764. * This is akin to the main server blocking post accept() for the same reason.
  4765. */
  4766. threads->start(&params);
  4767. }
  4768. else
  4769. onCloseSocket(client,3); // removes owned handles
  4770. return false;
  4771. }
  4772. void addClient(CRemoteClientHandler *client)
  4773. {
  4774. if (client&&client->socket)
  4775. selecthandler->add(client->socket,SELECTMODE_READ,client);
  4776. }
  4777. void checkTimeout()
  4778. {
  4779. if (msTick()-clientcounttick>1000*60*60)
  4780. {
  4781. CriticalBlock block(ClientCountSect);
  4782. if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
  4783. PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
  4784. clientcounttick = msTick();
  4785. MaxClientCount = ClientCount;
  4786. if (closedclients)
  4787. {
  4788. if (TF_TRACE_CLIENT_STATS)
  4789. PROGLOG("Closed client count = %d",closedclients);
  4790. closedclients = 0;
  4791. }
  4792. }
  4793. CriticalBlock block(sect);
  4794. ForEachItemInRev(i,clients)
  4795. {
  4796. CRemoteClientHandler &client = clients.item(i);
  4797. if (client.timedOut())
  4798. {
  4799. StringBuffer s;
  4800. bool ok = client.getInfo(s); // will spot duff sockets
  4801. if (ok&&(client.openFiles.ordinality()!=0))
  4802. {
  4803. if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
  4804. WARNLOG("Inactive %s",s.str());
  4805. }
  4806. else
  4807. {
  4808. #ifndef _DEBUG
  4809. if (TF_TRACE_CLIENT_CONN)
  4810. #endif
  4811. PROGLOG("Timing out %s",s.str());
  4812. closedclients++;
  4813. onCloseSocket(&client,4); // removes owned handles
  4814. }
  4815. }
  4816. }
  4817. }
  4818. void getInfo(StringBuffer &info, unsigned level=1)
  4819. {
  4820. {
  4821. CriticalBlock block(ClientCountSect);
  4822. info.append(DAFILESRV_VERSIONSTRING).append('\n');
  4823. info.appendf("Client count = %d\n",ClientCount);
  4824. info.appendf("Max client count = %d",MaxClientCount);
  4825. }
  4826. CriticalBlock block(sect);
  4827. ForEachItemIn(i,clients)
  4828. {
  4829. info.newline().append(i).append(": ");
  4830. clients.item(i).getInfo(info);
  4831. }
  4832. info.newline().appendf("Running threads: %u", threadRunningCount());
  4833. info.newline();
  4834. stdCmdThrottler.getInfo(info);
  4835. info.newline();
  4836. slowCmdThrottler.getInfo(info);
  4837. clientStatsTable.getInfo(info, level);
  4838. }
  4839. unsigned threadRunningCount()
  4840. {
  4841. if (!threads)
  4842. return 0;
  4843. return threads->runningCount();
  4844. }
  4845. unsigned idleTime()
  4846. {
  4847. unsigned t = (unsigned)atomic_read(&globallasttick);
  4848. return msTick()-t;
  4849. }
  4850. void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs, unsigned cpuThreshold, unsigned queueLimit)
  4851. {
  4852. switch (throttleClass)
  4853. {
  4854. case ThrottleStd:
  4855. stdCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  4856. break;
  4857. case ThrottleSlow:
  4858. slowCmdThrottler.configure(limit, delayMs, cpuThreshold, queueLimit);
  4859. break;
  4860. default:
  4861. {
  4862. StringBuffer availableClasses("{ ");
  4863. for (unsigned c=0; c<ThrottleClassMax; c++)
  4864. {
  4865. availableClasses.append(c).append(" = ").append(getThrottleClassText((ThrottleClass)c));
  4866. if (c+1<ThrottleClassMax)
  4867. availableClasses.append(", ");
  4868. }
  4869. availableClasses.append(" }");
  4870. throw MakeStringException(0, "Unknown throttle class: %u, available classes are: %s", (unsigned)throttleClass, availableClasses.str());
  4871. }
  4872. }
  4873. }
  4874. StringBuffer &getStats(StringBuffer &stats, bool reset)
  4875. {
  4876. CriticalBlock block(sect);
  4877. stdCmdThrottler.getStats(stats, reset).newline();
  4878. slowCmdThrottler.getStats(stats, reset);
  4879. if (reset)
  4880. clientStatsTable.reset();
  4881. return stats;
  4882. }
  4883. };
  4884. IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *keyPairInfo)
  4885. {
  4886. return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo);
  4887. }
  4888. int setDaliServerTrace(byte flags)
  4889. {
  4890. byte ret = traceFlags;
  4891. traceFlags = flags;
  4892. return ret;
  4893. }
  4894. #ifdef _USE_CPPUNIT
  4895. #include "unittests.hpp"
  4896. #include "rmtfile.hpp"
  4897. /* MP_START_PORT -> MP_END_PORT is the MP reserved dynamic port range, and is used here for convenience.
  4898. * MP_START_PORT is used as starting point to find an available port for the temporary dafilesrv service in these unittests.
  4899. * All (MP) components using this range always check and find an unused port.
  4900. */
  4901. static unsigned serverPort = MP_START_PORT;
  4902. static StringBuffer basePath;
  4903. static Owned<CSimpleInterface> serverThread;
  4904. class RemoteFileSlowTest : public CppUnit::TestFixture
  4905. {
  4906. CPPUNIT_TEST_SUITE(RemoteFileSlowTest);
  4907. CPPUNIT_TEST(testRemoteFilename);
  4908. CPPUNIT_TEST(testStartServer);
  4909. CPPUNIT_TEST(testBasicFunctionality);
  4910. CPPUNIT_TEST(testCopy);
  4911. CPPUNIT_TEST(testOther);
  4912. CPPUNIT_TEST(testConfiguration);
  4913. CPPUNIT_TEST(testDirectoryMonitoring);
  4914. CPPUNIT_TEST(testFinish);
  4915. CPPUNIT_TEST_SUITE_END();
  4916. size32_t testLen = 1024;
  4917. protected:
  4918. void testRemoteFilename()
  4919. {
  4920. const char *rfns = "//1.2.3.4/dir1/file1|//1.2.3.4:7100/dir1/file1,"
  4921. "//1.2.3.4:7100/dir1/file1|//1.2.3.4:7100/dir1/file1,"
  4922. "//1.2.3.4/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
  4923. "//1.2.3.4:7100/c$/dir1/file1|//1.2.3.4:7100/c$/dir1/file1,"
  4924. "//1.2.3.4:7100/d$/dir1/file1|//1.2.3.4:7100/d$/dir1/file1";
  4925. StringArray tests;
  4926. tests.appendList(rfns, ",");
  4927. ForEachItemIn(i, tests)
  4928. {
  4929. StringArray inOut;
  4930. const char *pair = tests.item(i);
  4931. inOut.appendList(pair, "|");
  4932. const char *rfn = inOut.item(0);
  4933. const char *expected = inOut.item(1);
  4934. Owned<IFile> iFile = createIFile(rfn);
  4935. const char *res = iFile->queryFilename();
  4936. if (!streq(expected, res))
  4937. {
  4938. StringBuffer errMsg("testRemoteFilename MISMATCH");
  4939. errMsg.newline().append("Expected: ").append(expected);
  4940. errMsg.newline().append("Got: ").append(res);
  4941. PROGLOG("%s", errMsg.str());
  4942. CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
  4943. }
  4944. else
  4945. PROGLOG("MATCH: %s", res);
  4946. }
  4947. }
  4948. void testStartServer()
  4949. {
  4950. Owned<ISocket> socket;
  4951. unsigned endPort = MP_END_PORT;
  4952. while (1)
  4953. {
  4954. try
  4955. {
  4956. socket.setown(ISocket::create(serverPort));
  4957. break;
  4958. }
  4959. catch (IJSOCK_Exception *e)
  4960. {
  4961. if (e->errorCode() != JSOCKERR_port_in_use)
  4962. {
  4963. StringBuffer eStr;
  4964. e->errorMessage(eStr);
  4965. e->Release();
  4966. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  4967. }
  4968. else if (serverPort == endPort)
  4969. {
  4970. e->Release();
  4971. CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
  4972. }
  4973. }
  4974. ++serverPort;
  4975. }
  4976. basePath.append("//");
  4977. SocketEndpoint ep(serverPort);
  4978. ep.getUrlStr(basePath);
  4979. char cpath[_MAX_DIR];
  4980. if (!GetCurrentDirectory(_MAX_DIR, cpath))
  4981. CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
  4982. else
  4983. basePath.append(cpath);
  4984. addPathSepChar(basePath);
  4985. PROGLOG("basePath = %s", basePath.str());
  4986. class CServerThread : public CSimpleInterface, implements IThreaded
  4987. {
  4988. CThreaded threaded;
  4989. Owned<CRemoteFileServer> server;
  4990. Linked<ISocket> socket;
  4991. public:
  4992. CServerThread(CRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
  4993. {
  4994. threaded.init(this);
  4995. }
  4996. ~CServerThread()
  4997. {
  4998. threaded.join();
  4999. }
  5000. // IThreaded
  5001. virtual void threadmain() override
  5002. {
  5003. DAFSConnectCfg sslCfg = SSLNone;
  5004. server->run(sslCfg, socket, nullptr, nullptr);
  5005. }
  5006. };
  5007. Owned<IRemoteFileServer> server = createRemoteFileServer();
  5008. serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), CRemoteFileServer), socket.getClear()));
  5009. }
  5010. void testBasicFunctionality()
  5011. {
  5012. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5013. // create file
  5014. Owned<IFile> iFile = createIFile(filePath);
  5015. CPPUNIT_ASSERT(iFile);
  5016. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  5017. CPPUNIT_ASSERT(iFileIO);
  5018. // write out 1k of random data and crc
  5019. MemoryBuffer mb;
  5020. char *buf = (char *)mb.reserveTruncate(testLen);
  5021. for (unsigned b=0; b<1024; b++)
  5022. buf[b] = getRandom()%256;
  5023. CRC32 crc;
  5024. crc.tally(testLen, buf);
  5025. unsigned writeCrc = crc.get();
  5026. size32_t sz = iFileIO->write(0, testLen, buf);
  5027. CPPUNIT_ASSERT(sz == testLen);
  5028. // close file
  5029. iFileIO.clear();
  5030. // validate remote crc
  5031. CPPUNIT_ASSERT(writeCrc == iFile->getCRC());
  5032. // exists
  5033. CPPUNIT_ASSERT(iFile->exists());
  5034. // validate size
  5035. CPPUNIT_ASSERT(iFile->size() == testLen);
  5036. // read back and validate read data's crc against written
  5037. iFileIO.setown(iFile->open(IFOread));
  5038. CPPUNIT_ASSERT(iFileIO);
  5039. sz = iFileIO->read(0, testLen, buf);
  5040. iFileIO.clear();
  5041. CPPUNIT_ASSERT(sz == testLen);
  5042. crc.reset();
  5043. crc.tally(testLen, buf);
  5044. CPPUNIT_ASSERT(writeCrc == crc.get());
  5045. }
  5046. void testCopy()
  5047. {
  5048. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5049. Owned<IFile> iFile = createIFile(filePath);
  5050. // test file copy
  5051. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  5052. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  5053. iFile->copyTo(iFile1Copy);
  5054. // read back copy and validate read data's crc against written
  5055. Owned<IFileIO> iFileIO = iFile1Copy->open(IFOreadwrite); // open read/write for appendFile in next step.
  5056. CPPUNIT_ASSERT(iFileIO);
  5057. MemoryBuffer mb;
  5058. char *buf = (char *)mb.reserveTruncate(testLen);
  5059. size32_t sz = iFileIO->read(0, testLen, buf);
  5060. CPPUNIT_ASSERT(sz == testLen);
  5061. CRC32 crc;
  5062. crc.tally(testLen, buf);
  5063. CPPUNIT_ASSERT(iFile->getCRC() == crc.get());
  5064. // check appendFile functionality. NB after this "file1copy" should be 2*testLen
  5065. CPPUNIT_ASSERT(testLen == iFileIO->appendFile(iFile));
  5066. iFileIO.clear();
  5067. // validate new size
  5068. CPPUNIT_ASSERT(iFile1Copy->size() == 2 * testLen);
  5069. // setSize test, truncate copy to original size
  5070. iFileIO.setown(iFile1Copy->open(IFOreadwrite));
  5071. iFileIO->setSize(testLen);
  5072. // validate new size
  5073. CPPUNIT_ASSERT(iFile1Copy->size() == testLen);
  5074. }
  5075. void testOther()
  5076. {
  5077. VStringBuffer filePath("%s%s", basePath.str(), "file1");
  5078. Owned<IFile> iFile = createIFile(filePath);
  5079. // rename
  5080. iFile->rename("file2");
  5081. // create a directory
  5082. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5083. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5084. subDirIFile->createDirectory();
  5085. // check isDirectory result
  5086. CPPUNIT_ASSERT(subDirIFile->isDirectory());
  5087. // move previous created and renamed file into new sub-directory
  5088. // ensure not present before move
  5089. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  5090. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  5091. iFile2->remove();
  5092. iFile->move(subDirFilePath);
  5093. // open sub-directory file2 explicitly
  5094. RemoteFilename rfn;
  5095. rfn.setRemotePath(subDirPath.str());
  5096. Owned<IFile> dir = createIFile(rfn);
  5097. Owned<IDirectoryIterator> diriter = dir->directoryFiles("file2");
  5098. if (!diriter->first())
  5099. {
  5100. CPPUNIT_ASSERT_MESSAGE("Error, file2 diriter->first() is null", 0);
  5101. }
  5102. Linked<IFile> iFile3 = &diriter->query();
  5103. diriter.clear();
  5104. dir.clear();
  5105. OwnedIFileIO iFile3IO = iFile3->openShared(IFOread, IFSHfull);
  5106. if (!iFile3IO)
  5107. {
  5108. CPPUNIT_ASSERT_MESSAGE("Error, file2 openShared() failed", 0);
  5109. }
  5110. iFile3IO->close();
  5111. // count sub-directory files with a wildcard
  5112. unsigned count=0;
  5113. Owned<IDirectoryIterator> iter = subDirIFile->directoryFiles("*2");
  5114. ForEach(*iter)
  5115. ++count;
  5116. CPPUNIT_ASSERT(1 == count);
  5117. // check isFile result
  5118. CPPUNIT_ASSERT(iFile2->isFile());
  5119. // validate isReadOnly before after setting
  5120. CPPUNIT_ASSERT(!iFile2->isReadOnly());
  5121. iFile2->setReadOnly(true);
  5122. CPPUNIT_ASSERT(iFile2->isReadOnly());
  5123. // get/set Time and validate result
  5124. CDateTime createTime, modifiedTime, accessedTime;
  5125. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  5126. CDateTime newModifiedTime = modifiedTime;
  5127. newModifiedTime.adjustTime(-86400); // -1 day
  5128. CPPUNIT_ASSERT(subDirIFile->setTime(&createTime, &newModifiedTime, &accessedTime));
  5129. CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
  5130. CPPUNIT_ASSERT(modifiedTime == newModifiedTime);
  5131. // test set file permissions
  5132. try
  5133. {
  5134. iFile2->setFilePermissions(0777);
  5135. }
  5136. catch (...)
  5137. {
  5138. CPPUNIT_ASSERT_MESSAGE("iFile2->setFilePermissions() exception", 0);
  5139. }
  5140. }
  5141. void testConfiguration()
  5142. {
  5143. SocketEndpoint ep(serverPort); // test trace open connections
  5144. CPPUNIT_ASSERT(setDafileSvrTraceFlags(ep, 0x08));
  5145. StringBuffer infoStr;
  5146. CPPUNIT_ASSERT(RFEnoerror == getDafileSvrInfo(ep, 10, infoStr));
  5147. CPPUNIT_ASSERT(RFEnoerror == setDafileSvrThrottleLimit(ep, ThrottleStd, DEFAULT_STDCMD_PARALLELREQUESTLIMIT+1, DEFAULT_STDCMD_THROTTLEDELAYMS+1, DEFAULT_STDCMD_THROTTLECPULIMIT+1, DEFAULT_STDCMD_THROTTLEQUEUELIMIT+1));
  5148. }
  5149. void testDirectoryMonitoring()
  5150. {
  5151. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5152. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5153. subDirIFile->createDirectory();
  5154. VStringBuffer filePath("%s/%s", subDirPath.str(), "file1");
  5155. class CDelayedFileCreate : implements IThreaded
  5156. {
  5157. CThreaded threaded;
  5158. StringAttr filePath;
  5159. Semaphore doneSem;
  5160. public:
  5161. CDelayedFileCreate(const char *_filePath) : filePath(_filePath), threaded("CDelayedFileCreate")
  5162. {
  5163. threaded.init(this);
  5164. }
  5165. ~CDelayedFileCreate()
  5166. {
  5167. stop();
  5168. }
  5169. void stop()
  5170. {
  5171. doneSem.signal();
  5172. threaded.join();
  5173. }
  5174. // IThreaded impl.
  5175. virtual void threadmain() override
  5176. {
  5177. MilliSleep(1000); // give monitorDirectory a chance to be monitoring
  5178. // create file
  5179. Owned<IFile> iFile = createIFile(filePath);
  5180. CPPUNIT_ASSERT(iFile);
  5181. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  5182. CPPUNIT_ASSERT(iFileIO);
  5183. iFileIO.clear();
  5184. doneSem.wait(60 * 1000);
  5185. CPPUNIT_ASSERT(iFile->remove());
  5186. }
  5187. } delayedFileCreate(filePath);
  5188. Owned<IDirectoryDifferenceIterator> iter = subDirIFile->monitorDirectory(nullptr, nullptr, false, false, 2000, 60 * 1000);
  5189. ForEach(*iter)
  5190. {
  5191. StringBuffer fname;
  5192. iter->getName(fname);
  5193. PROGLOG("fname = %s", fname.str());
  5194. }
  5195. delayedFileCreate.stop();
  5196. }
  5197. void testFinish()
  5198. {
  5199. // clearup
  5200. VStringBuffer filePathCopy("%s%s", basePath.str(), "file1copy");
  5201. Owned<IFile> iFile1Copy = createIFile(filePathCopy);
  5202. CPPUNIT_ASSERT(iFile1Copy->remove());
  5203. VStringBuffer subDirPath("%s%s", basePath.str(), "subdir1");
  5204. VStringBuffer subDirFilePath("%s/%s", subDirPath.str(), "file2");
  5205. Owned<IFile> iFile2 = createIFile(subDirFilePath);
  5206. CPPUNIT_ASSERT(iFile2->remove());
  5207. Owned<IFile> subDirIFile = createIFile(subDirPath);
  5208. CPPUNIT_ASSERT(subDirIFile->remove());
  5209. SocketEndpoint ep(serverPort);
  5210. Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
  5211. CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
  5212. serverThread.clear();
  5213. }
  5214. };
  5215. CPPUNIT_TEST_SUITE_REGISTRATION( RemoteFileSlowTest );
  5216. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RemoteFileSlowTest, "RemoteFileSlowTests" );
  5217. #endif // _USE_CPPUNIT