sockfile.cpp 148 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // todo look at IRemoteFileServer stop
  14. #include "platform.h"
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jmisc.hpp"
  21. #include "jthread.hpp"
  22. #include "securesocket.hpp"
  23. #include "sockfile.hpp"
  24. #include "portlist.h"
  25. #include "jsocket.hpp"
  26. #include "jencrypt.hpp"
  27. #include "jset.hpp"
  28. #include "remoteerr.hpp"
  29. #define SOCKET_CACHE_MAX 500
  30. #define MAX_THREADS 100
  31. #define TARGET_MIN_THREADS 20
  32. #define TARGET_ACTIVE_THREADS 80
  33. #ifdef _DEBUG
  34. //#define SIMULATE_PACKETLOSS 1
  35. #endif
  36. #define TREECOPYTIMEOUT (60*60*1000) // 1Hr (I guess could take longer for big file but at least will stagger)
  37. #define TREECOPYPOLLTIME (60*1000*5) // for tracing that delayed
  38. #define TREECOPYPRUNETIME (24*60*60*1000) // 1 day
  39. #if SIMULATE_PACKETLOSS
  40. #define TESTING_FAILURE_RATE_LOST_SEND 10 // per 1000
  41. #define TESTING_FAILURE_RATE_LOST_RECV 10 // per 1000
  42. #define DUMMY_TIMEOUT_MAX (1000*10)
  43. static bool errorSimulationOn = true;
  44. static ISocket *timeoutreadsock = NULL; // used to trigger
  45. struct dummyReadWrite
  46. {
  47. class X
  48. {
  49. dummyReadWrite *parent;
  50. public:
  51. X(dummyReadWrite *_parent)
  52. {
  53. parent = _parent;
  54. }
  55. ~X()
  56. {
  57. delete parent;
  58. }
  59. };
  60. class TimeoutSocketException: public CInterface, public IJSOCK_Exception
  61. {
  62. public:
  63. IMPLEMENT_IINTERFACE;
  64. TimeoutSocketException()
  65. {
  66. }
  67. virtual ~TimeoutSocketException()
  68. {
  69. }
  70. int errorCode() const { return JSOCKERR_timeout_expired; }
  71. StringBuffer & errorMessage(StringBuffer &str) const
  72. {
  73. return str.append("timeout expired");
  74. }
  75. MessageAudience errorAudience() const
  76. {
  77. return MSGAUD_user;
  78. }
  79. };
  80. ISocket *sock;
  81. dummyReadWrite(ISocket *_sock)
  82. {
  83. sock = _sock;
  84. }
  85. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, time_t timeout)
  86. {
  87. X x(this);
  88. unsigned t = msTick();
  89. unsigned r = getRandom();
  90. bool timeoutread = (timeoutreadsock==sock);
  91. timeoutreadsock=NULL;
  92. if (!timeoutread)
  93. sock->readtms(buf, min_size, max_size, size_read, timeout);
  94. if (timeoutread||(errorSimulationOn&&(TESTING_FAILURE_RATE_LOST_RECV>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_RECV))) {
  95. PrintStackReport();
  96. if (timeoutread)
  97. PROGLOG("** Simulate timeout");
  98. else
  99. PROGLOG("** Simulate Packet loss (size %d,%d)",min_size,max_size);
  100. if (timeout>DUMMY_TIMEOUT_MAX)
  101. timeout = DUMMY_TIMEOUT_MAX;
  102. t = msTick()-t;
  103. if (t<timeout)
  104. Sleep(timeout-t);
  105. IJSOCK_Exception *e = new TimeoutSocketException;
  106. throw e;
  107. }
  108. }
  109. size32_t write(void const* buf, size32_t size)
  110. {
  111. X x(this);
  112. timeoutreadsock=NULL;
  113. unsigned r = getRandom();
  114. if (errorSimulationOn&&(TESTING_FAILURE_RATE_LOST_SEND>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_SEND)) {
  115. PrintStackReport();
  116. PROGLOG("** Simulate Packet loss (size %d)",size);
  117. timeoutreadsock=sock;
  118. return size;
  119. }
  120. return sock->write(buf,size);
  121. }
  122. };
  123. #define SOCKWRITE(sock) (new dummyReadWrite(sock))->write
  124. #define SOCKREADTMS(sock) (new dummyReadWrite(sock))->readtms
  125. #else
  126. #define SOCKWRITE(sock) sock->write
  127. #define SOCKREADTMS(sock) sock->readtms
  128. #endif
  129. // backward compatible modes
  130. typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
  131. static const char *VERSTRING= "DS V1.7e - 7 " // dont forget FILESRV_VERSION in header
  132. #ifdef _WIN32
  133. "Windows ";
  134. #else
  135. "Linux ";
  136. #endif
  137. typedef unsigned char RemoteFileCommandType;
  138. typedef int RemoteFileIOHandle;
  139. static unsigned maxConnectTime = 0;
  140. static unsigned maxReceiveTime = 0;
  141. //Security and default port attributes
  142. static class _securitySettings
  143. {
  144. public:
  145. bool useSSL;
  146. unsigned short daFileSrvPort;
  147. const char * certificate;
  148. const char * privateKey;
  149. _securitySettings()
  150. {
  151. querySecuritySettings(&useSSL, &daFileSrvPort, &certificate, &privateKey);
  152. }
  153. } securitySettings;
  154. static CriticalSection secureContextCrit;
  155. static Owned<ISecureSocketContext> secureContext;
  156. static ISecureSocket *createSecureSocket(ISocket *sock,SecureSocketType type)
  157. {
  158. {
  159. CriticalBlock b(secureContextCrit);
  160. if (!secureContext)
  161. {
  162. if (securitySettings.certificate)
  163. secureContext.setown(createSecureSocketContextEx(securitySettings.certificate,securitySettings.privateKey, NULL, type));
  164. else
  165. secureContext.setown(createSecureSocketContext(type));
  166. }
  167. }
  168. #ifdef _DEBUG
  169. return secureContext->createSecureSocket(sock, SSLogMax);
  170. #else
  171. return secureContext->createSecureSocket(sock);
  172. #endif
  173. }
  174. void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  175. {
  176. maxConnectTime = maxconnecttime;
  177. maxReceiveTime = maxreadtime;
  178. }
  179. struct sRFTM
  180. {
  181. CTimeMon *timemon;
  182. sRFTM() { timemon = maxReceiveTime?new CTimeMon(maxReceiveTime):NULL; }
  183. ~sRFTM() { delete timemon; }
  184. };
  185. const char *remoteServerVersionString() { return VERSTRING; }
  186. static bool AuthenticationEnabled = true;
  187. bool enableDafsAuthentication(bool on)
  188. {
  189. bool ret = AuthenticationEnabled;
  190. AuthenticationEnabled = on;
  191. return ret;
  192. }
  193. #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
  194. #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
  195. #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
  196. // (increased when waiting for large block)
  197. #define DAFS_CONNECT_FAIL_RETRY_TIME (1000*60*15)
  198. #ifdef SIMULATE_PACKETLOSS
  199. #define NORMAL_RETRIES (1)
  200. #define LENGTHY_RETRIES (1)
  201. #else
  202. #define NORMAL_RETRIES (3)
  203. #define LENGTHY_RETRIES (12)
  204. #endif
  205. #ifdef _DEBUG
  206. static byte traceFlags=0x30;
  207. #else
  208. static byte traceFlags=0x20;
  209. #endif
  210. #define TF_TRACE (traceFlags&1)
  211. #define TF_TRACE_PRE_IO (traceFlags&2)
  212. #define TF_TRACE_FULL (traceFlags&4)
  213. #define TF_TRACE_CLIENT_CONN (traceFlags&8)
  214. #define TF_TRACE_TREE_COPY (traceFlags&0x10)
  215. #define TF_TRACE_CLIENT_STATS (traceFlags&0x20)
  216. enum {
  217. RFCopenIO, // 0
  218. RFCcloseIO,
  219. RFCread,
  220. RFCwrite,
  221. RFCsize,
  222. RFCexists,
  223. RFCremove,
  224. RFCrename,
  225. RFCgetver,
  226. RFCisfile,
  227. RFCisdirectory, // 10
  228. RFCisreadonly,
  229. RFCsetreadonly,
  230. RFCgettime,
  231. RFCsettime,
  232. RFCcreatedir,
  233. RFCgetdir,
  234. RFCstop,
  235. RFCexec,
  236. RFCkill,
  237. RFCredeploy, // 20
  238. RFCgetcrc,
  239. RFCmove,
  240. // 1.5 features below
  241. RFCsetsize,
  242. RFCextractblobelements,
  243. RFCcopy,
  244. RFCappend,
  245. RFCmonitordir,
  246. RFCsettrace,
  247. RFCgetinfo,
  248. RFCfirewall, // not used currently // 30
  249. RFCunlock,
  250. RFCunlockreply,
  251. RFCinvalid,
  252. RFCcopysection,
  253. // 1.7e
  254. RFCtreecopy,
  255. // 1.7e - 1
  256. RFCtreecopytmp,
  257. RFCmax,
  258. };
  259. typedef enum { ACScontinue, ACSdone, ACSerror} AsyncCommandStatus;
  260. typedef byte OnceKey[16];
  261. static void genOnce(OnceKey &key)
  262. {
  263. static __int64 inc=0;
  264. *(unsigned *)&key[0] = getRandom();
  265. *(__int64 *)&key[4] = ++inc;
  266. *(unsigned *)&key[12] = getRandom();
  267. }
  268. static void mergeOnce(OnceKey &key,size32_t sz,const void *data)
  269. {
  270. assertex(sz<=sizeof(OnceKey));
  271. const byte *p = (const byte *)data;
  272. while (sz)
  273. key[--sz] ^= *(p++);
  274. }
  275. //---------------------------------------------------------------------------
  276. class CThrottler
  277. {
  278. Semaphore &sem;
  279. bool got;
  280. public:
  281. CThrottler(Semaphore &_sem) : sem(_sem), got(false)
  282. {
  283. take();
  284. }
  285. ~CThrottler()
  286. {
  287. release();
  288. }
  289. bool take()
  290. {
  291. assertex(!got);
  292. got = false;
  293. loop {
  294. if (sem.wait(5000)) {
  295. got = true;
  296. break;
  297. }
  298. unsigned cpu = getLatestCPUUsage();
  299. PROGLOG("Throttler stalled (%d%% cpu)",cpu);
  300. if (getLatestCPUUsage()<75)
  301. break;
  302. }
  303. return got;
  304. }
  305. bool release()
  306. {
  307. if (got)
  308. {
  309. got = false;
  310. sem.signal();
  311. return true;
  312. }
  313. return false;
  314. }
  315. };
  316. // temporarily release a throttler slot
  317. class CThrottleReleaseBlock
  318. {
  319. CThrottler &throttler;
  320. bool had;
  321. public:
  322. CThrottleReleaseBlock(CThrottler &_throttler) : throttler(_throttler)
  323. {
  324. had = throttler.release();
  325. }
  326. ~CThrottleReleaseBlock()
  327. {
  328. if (had)
  329. throttler.take();
  330. }
  331. };
  332. //---------------------------------------------------------------------------
  333. class CDafsException: public CInterface, public IDAFS_Exception
  334. {
  335. StringAttr msg;
  336. int errcode;
  337. public:
  338. IMPLEMENT_IINTERFACE;
  339. CDafsException(int code,const char *_msg)
  340. : errcode(code), msg(_msg)
  341. {
  342. };
  343. int errorCode() const
  344. {
  345. return errcode;
  346. }
  347. StringBuffer & errorMessage(StringBuffer &str) const
  348. {
  349. return str.append(msg);
  350. }
  351. MessageAudience errorAudience() const
  352. {
  353. return MSGAUD_user;
  354. }
  355. };
  356. static IDAFS_Exception *createDafsException(int code,const char *msg)
  357. {
  358. return new CDafsException(code,msg);
  359. }
  360. void setDafsEndpointPort(SocketEndpoint &ep)
  361. {
  362. // odd kludge (don't do this at home)
  363. byte ipb[4];
  364. if (ep.getNetAddress(sizeof(ipb),&ipb)==sizeof(ipb)) {
  365. if ((ipb[0]==255)&&(ipb[1]==255)) {
  366. ep.port = (((unsigned)ipb[2])<<8)+ipb[3];
  367. ep.ipset(queryLocalIP());
  368. }
  369. }
  370. if (ep.port==0)
  371. {
  372. ep.port = securitySettings.daFileSrvPort;
  373. }
  374. }
  375. inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff)
  376. {
  377. buff.setEndian(__BIG_ENDIAN); // transfer as big endian...
  378. buff.append((unsigned)0); // reserve space for length prefix
  379. return buff;
  380. }
  381. inline void sendBuffer(ISocket * socket, MemoryBuffer & src)
  382. {
  383. unsigned length = src.length() - sizeof(unsigned);
  384. byte * buffer = (byte *)src.toByteArray();
  385. if (TF_TRACE_FULL)
  386. PROGLOG("sendBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]);
  387. _WINCPYREV(buffer, &length, sizeof(unsigned));
  388. SOCKWRITE(socket)(buffer, src.length());
  389. }
  390. inline size32_t receiveBufferSize(ISocket * socket, unsigned numtries=NORMAL_RETRIES,CTimeMon *timemon=NULL)
  391. {
  392. unsigned timeout = SERVER_TIMEOUT;
  393. if (numtries==0) {
  394. numtries = 1;
  395. timeout = 10*1000; // 10s
  396. }
  397. while (numtries--) {
  398. try {
  399. if (timemon) {
  400. unsigned remaining;
  401. if (timemon->timedout(&remaining)||(remaining<10))
  402. remaining = 10;
  403. if (remaining<timeout)
  404. timeout = remaining;
  405. }
  406. size32_t szread;
  407. size32_t gotLength;
  408. SOCKREADTMS(socket)(&gotLength, sizeof(gotLength), sizeof(gotLength), szread, timeout);
  409. _WINREV(gotLength);
  410. if (TF_TRACE_FULL)
  411. PROGLOG("receiveBufferSized %d",gotLength);
  412. return gotLength;
  413. }
  414. catch (IJSOCK_Exception *e) {
  415. if ((numtries==0)||(e->errorCode()!=JSOCKERR_timeout_expired)||(timemon&&timemon->timedout())) {
  416. throw;
  417. }
  418. StringBuffer err;
  419. char peername[256];
  420. socket->peer_name(peername,sizeof(peername)-1);
  421. WARNLOG("Remote connection %s: %s",peername,e->errorMessage(err).str()); // why no peername
  422. e->Release();
  423. Sleep(500+getRandom()%1000); // ~1s
  424. }
  425. }
  426. return 0;
  427. }
  428. static void flush(ISocket *socket)
  429. {
  430. MemoryBuffer sendbuf;
  431. initSendBuffer(sendbuf);
  432. sendbuf.append((RemoteFileCommandType)RFCgetver);
  433. sendbuf.append((unsigned)RFCgetver);
  434. MemoryBuffer reply;
  435. size32_t totread=0;
  436. try {
  437. sendBuffer(socket, sendbuf);
  438. char buf[1024];
  439. loop {
  440. Sleep(1000); // breathe
  441. size32_t szread;
  442. SOCKREADTMS(socket)(buf, 1, sizeof(buf), szread, 1000*60);
  443. totread += szread;
  444. }
  445. }
  446. catch (IJSOCK_Exception *e) {
  447. if (totread)
  448. PROGLOG("%d bytes discarded",totread);
  449. if (e->errorCode()!=JSOCKERR_timeout_expired)
  450. EXCLOG(e,"flush");
  451. e->Release();
  452. }
  453. }
  454. inline void receiveBuffer(ISocket * socket, MemoryBuffer & tgt, unsigned numtries=1, size32_t maxsz=0x7fffffff)
  455. // maxsz is a guess at a resonable upper max to catch where protocol error
  456. {
  457. sRFTM tm;
  458. size32_t gotLength = receiveBufferSize(socket, numtries,tm.timemon);
  459. if (gotLength) {
  460. size32_t origlen = tgt.length();
  461. try {
  462. if (gotLength>maxsz) {
  463. StringBuffer msg;
  464. msg.appendf("receiveBuffer maximum block size exceeded %d/%d",gotLength,maxsz);
  465. PrintStackReport();
  466. throw createDafsException(DAFSERR_protocol_failure,msg.str());
  467. }
  468. unsigned timeout = SERVER_TIMEOUT*(numtries?numtries:1);
  469. if (tm.timemon) {
  470. unsigned remaining;
  471. if (tm.timemon->timedout(&remaining)||(remaining<10))
  472. remaining = 10;
  473. if (remaining<timeout)
  474. timeout = remaining;
  475. }
  476. size32_t szread;
  477. SOCKREADTMS(socket)((gotLength<4000)?tgt.reserve(gotLength):tgt.reserveTruncate(gotLength), gotLength, gotLength, szread, timeout);
  478. }
  479. catch (IJSOCK_Exception *e) {
  480. if (e->errorCode()!=JSOCKERR_timeout_expired) {
  481. EXCLOG(e,"receiveBuffer(1)");
  482. PrintStackReport();
  483. if (!tm.timemon||!tm.timemon->timedout())
  484. flush(socket);
  485. }
  486. else {
  487. EXCLOG(e,"receiveBuffer");
  488. PrintStackReport();
  489. }
  490. tgt.setLength(origlen);
  491. throw;
  492. }
  493. catch (IException *e) {
  494. EXCLOG(e,"receiveBuffer(2)");
  495. PrintStackReport();
  496. if (!tm.timemon||!tm.timemon->timedout())
  497. flush(socket);
  498. tgt.setLength(origlen);
  499. throw;
  500. }
  501. }
  502. tgt.setEndian(__BIG_ENDIAN);
  503. }
  504. struct CConnectionRec
  505. {
  506. SocketEndpoint ep;
  507. unsigned tick;
  508. IArrayOf<ISocket> socks; // relies on isShared
  509. };
  510. //---------------------------------------------------------------------------
  511. // Local mount redirect
  512. struct CLocalMountRec: public CInterface
  513. {
  514. IpAddress ip;
  515. StringAttr dir; // dir path on remote ip
  516. StringAttr local; // local dir path
  517. };
  518. static CIArrayOf<CLocalMountRec> localMounts;
  519. static CriticalSection localMountCrit;
  520. void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  521. {
  522. CriticalBlock block(localMountCrit);
  523. ForEachItemInRev(i,localMounts) {
  524. CLocalMountRec &mount = localMounts.item(i);
  525. if (dir==NULL) { // remove all matching mount
  526. if (!mountdir)
  527. return;
  528. if (strcmp(mount.local,mountdir)==0)
  529. localMounts.remove(i);
  530. }
  531. else if (mount.ip.ipequals(ip)&&(strcmp(mount.dir,dir)==0)) {
  532. if (mountdir) {
  533. mount.local.set(mountdir);
  534. return;
  535. }
  536. else
  537. localMounts.remove(i);
  538. }
  539. }
  540. if (dir&&mountdir) {
  541. CLocalMountRec &mount = *new CLocalMountRec;
  542. mount.ip.ipset(ip);
  543. mount.dir.set(dir);
  544. mount.local.set(mountdir);
  545. localMounts.append(mount);
  546. }
  547. }
  548. IFile *createFileLocalMount(const IpAddress &ip, const char * filename)
  549. {
  550. CriticalBlock block(localMountCrit);
  551. ForEachItemInRev(i,localMounts) {
  552. CLocalMountRec &mount = localMounts.item(i);
  553. if (mount.ip.ipequals(ip)) {
  554. size32_t bl = mount.dir.length();
  555. if (isPathSepChar(mount.dir[bl-1]))
  556. bl--;
  557. if ((memcmp((void *)filename,(void *)mount.dir.get(),bl)==0)&&(isPathSepChar(filename[bl])||!filename[bl])) { // match
  558. StringBuffer locpath(mount.local);
  559. if (filename[bl])
  560. addPathSepChar(locpath).append(filename+bl+1);
  561. locpath.replace((PATHSEPCHAR=='\\')?'/':'\\',PATHSEPCHAR);
  562. return createIFile(locpath.str());
  563. }
  564. }
  565. }
  566. return NULL;
  567. }
  568. //---------------------------------------------------------------------------
  569. static class CConnectionTable: public SuperHashTableOf<CConnectionRec,SocketEndpoint>
  570. {
  571. void onAdd(void *) {}
  572. void onRemove(void *e)
  573. {
  574. CConnectionRec *r=(CConnectionRec *)e;
  575. delete r;
  576. }
  577. unsigned getHashFromElement(const void *e) const
  578. {
  579. const CConnectionRec &elem=*(const CConnectionRec *)e;
  580. return elem.ep.hash(0);
  581. }
  582. unsigned getHashFromFindParam(const void *fp) const
  583. {
  584. return ((const SocketEndpoint *)fp)->hash(0);
  585. }
  586. const void * getFindParam(const void *p) const
  587. {
  588. const CConnectionRec &elem=*(const CConnectionRec *)p;
  589. return (void *)&elem.ep;
  590. }
  591. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  592. {
  593. return ((CConnectionRec *)et)->ep.equals(*(SocketEndpoint *)fp);
  594. }
  595. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CConnectionRec,SocketEndpoint);
  596. unsigned numsockets;
  597. public:
  598. static CriticalSection crit;
  599. CConnectionTable()
  600. {
  601. numsockets = 0;
  602. }
  603. ~CConnectionTable() {
  604. releaseAll();
  605. }
  606. ISocket *lookup(const SocketEndpoint &ep)
  607. {
  608. // always called from crit block
  609. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  610. if (r) {
  611. ForEachItemIn(i,r->socks) {
  612. ISocket *s = &r->socks.item(i);
  613. if (!QUERYINTERFACE(s, CInterface)->IsShared()) {
  614. r->tick = msTick();
  615. s->Link();
  616. return s;
  617. }
  618. }
  619. }
  620. return NULL;
  621. }
  622. void addLink(SocketEndpoint &ep,ISocket *sock)
  623. {
  624. // always called from crit block
  625. while (numsockets>=SOCKET_CACHE_MAX) {
  626. // find oldest
  627. CConnectionRec *c = NULL;
  628. unsigned oldest = 0;
  629. CConnectionRec *old = NULL;
  630. unsigned oldi;
  631. unsigned now = msTick();
  632. loop {
  633. c = (CConnectionRec *)SuperHashTableOf<CConnectionRec,SocketEndpoint>::next(c);
  634. if (!c)
  635. break;
  636. ForEachItemIn(i,c->socks) {
  637. ISocket *s = &c->socks.item(i);
  638. if (!QUERYINTERFACE(s, CInterface)->IsShared()) { // candidate to remove
  639. unsigned t = now-c->tick;
  640. if (t>oldest) {
  641. oldest = t;
  642. old = c;
  643. oldi = i;
  644. }
  645. }
  646. }
  647. }
  648. if (!old)
  649. return;
  650. old->socks.remove(oldi);
  651. numsockets--;
  652. }
  653. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  654. if (!r) {
  655. r = new CConnectionRec;
  656. r->ep = ep;
  657. SuperHashTableOf<CConnectionRec,SocketEndpoint>::add(*r);
  658. }
  659. sock->Link();
  660. r->socks.append(*sock);
  661. numsockets++;
  662. r->tick = msTick();
  663. }
  664. void remove(SocketEndpoint &ep,ISocket *sock)
  665. {
  666. // always called from crit block
  667. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  668. if (r)
  669. if (r->socks.zap(*sock)&&numsockets)
  670. numsockets--;
  671. }
  672. } *ConnectionTable = NULL;
  673. CriticalSection CConnectionTable::crit;
  674. void clientSetDaliServixSocketCaching(bool on)
  675. {
  676. CriticalBlock block(CConnectionTable::crit);
  677. if (on) {
  678. if (!ConnectionTable)
  679. ConnectionTable = new CConnectionTable;
  680. }
  681. else {
  682. delete ConnectionTable;
  683. ConnectionTable = NULL;
  684. }
  685. }
  686. //---------------------------------------------------------------------------
  687. // TreeCopy
  688. #define TREECOPY_CACHE_SIZE 50
  689. struct CTreeCopyItem: public CInterface
  690. {
  691. StringAttr net;
  692. StringAttr mask;
  693. offset_t sz; // original size
  694. CDateTime dt; // original date
  695. RemoteFilenameArray loc; // locations for file - 0 is original
  696. Owned<IBitSet> busy;
  697. unsigned lastused;
  698. CTreeCopyItem(RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  699. : net(_net), mask(_mask)
  700. {
  701. loc.append(orig);
  702. dt.set(_dt);
  703. sz = _sz;
  704. busy.setown(createThreadSafeBitSet());
  705. lastused = msTick();
  706. }
  707. bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt)
  708. {
  709. if (!orig.equals(loc.item(0)))
  710. return false;
  711. if (strcmp(_net,net)!=0)
  712. return false;
  713. if (strcmp(_mask,mask)!=0)
  714. return false;
  715. if (sz!=_sz)
  716. return false;
  717. return (dt.equals(_dt,false));
  718. }
  719. };
  720. static CIArrayOf<CTreeCopyItem> treeCopyArray;
  721. static CriticalSection treeCopyCrit;
  722. static unsigned treeCopyWaiting=0;
  723. static Semaphore treeCopySem;
  724. #define DEBUGSAMEIP false
  725. static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
  726. {
  727. unsigned start = msTick();
  728. Owned<IFile> dstfile = createIFile(dstfn);
  729. // the following is really to check the dest node is up and working (otherwise not much point in continuing!)
  730. if (dstfile->exists())
  731. PROGLOG("TREECOPY overwriting '%s'",dstfile->queryFilename());
  732. Owned<IFile> srcfile = createIFile(srcfn);
  733. unsigned lastmin = 0;
  734. if (!srcfn.queryIP().ipequals(dstfn.queryIP())) {
  735. CriticalBlock block(treeCopyCrit);
  736. loop {
  737. CDateTime dt;
  738. offset_t sz;
  739. try {
  740. sz = srcfile->size();
  741. if (sz==(offset_t)-1) {
  742. if (TF_TRACE_TREE_COPY)
  743. PROGLOG("TREECOPY source not found '%s'",srcfile->queryFilename());
  744. break;
  745. }
  746. srcfile->getTime(NULL,&dt,NULL);
  747. }
  748. catch (IException *e) {
  749. EXCLOG(e,"treeCopyFile(1)");
  750. e->Release();
  751. break;
  752. }
  753. Linked<CTreeCopyItem> tc;
  754. unsigned now = msTick();
  755. ForEachItemInRev(i1,treeCopyArray) {
  756. CTreeCopyItem &item = treeCopyArray.item(i1);
  757. // prune old entries (not strictly needed buf I think better)
  758. if (now-item.lastused>TREECOPYPRUNETIME)
  759. treeCopyArray.remove(i1);
  760. else if (!tc.get()&&item.equals(srcfn,net,mask,sz,dt)) {
  761. tc.set(&item);
  762. item.lastused = now;
  763. }
  764. }
  765. if (!tc.get()) {
  766. if (treeCopyArray.ordinality()>=TREECOPY_CACHE_SIZE)
  767. treeCopyArray.remove(0);
  768. tc.setown(new CTreeCopyItem(srcfn,net,mask,sz,dt));
  769. treeCopyArray.append(*tc.getLink());
  770. }
  771. ForEachItemInRev(cand,tc->loc) { // rev to choose copied locations first (maybe optional?)
  772. if (!tc->busy->testSet(cand)) {
  773. // check file accessible and matches
  774. if (!cand&&dstfn.equals(tc->loc.item(cand))) // hmm trying to overwrite existing, better humor
  775. continue;
  776. bool ok = true;
  777. Owned<IFile> rmtfile = createIFile(tc->loc.item(cand));
  778. if (cand) { // only need to check if remote
  779. try {
  780. if (rmtfile->size()!=sz)
  781. ok = false;
  782. else {
  783. CDateTime fdt;
  784. rmtfile->getTime(NULL,&fdt,NULL);
  785. ok = fdt.equals(dt);
  786. }
  787. }
  788. catch (IException *e) {
  789. EXCLOG(e,"treeCopyFile(2)");
  790. e->Release();
  791. ok = false;
  792. }
  793. }
  794. if (ok) { // if not ok leave 'busy'
  795. // finally lets try and copy!
  796. try {
  797. if (TF_TRACE_TREE_COPY)
  798. PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  799. {
  800. CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
  801. rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  802. }
  803. if (TF_TRACE_TREE_COPY)
  804. PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
  805. tc->busy->set(cand,false);
  806. if (treeCopyWaiting)
  807. treeCopySem.signal((treeCopyWaiting>1)?2:1);
  808. // add to known locations
  809. tc->busy->set(tc->loc.ordinality(),false); // prob already is clear
  810. tc->loc.append(dstfn);
  811. ip.ipset(tc->loc.item(cand).queryIP());
  812. return;
  813. }
  814. catch (IException *e) {
  815. if (cand==0) {
  816. tc->busy->set(0,false); // don't leave busy
  817. if (treeCopyWaiting)
  818. treeCopySem.signal();
  819. throw; // what more can we do!
  820. }
  821. EXCLOG(e,"treeCopyFile(3)");
  822. e->Release();
  823. }
  824. }
  825. }
  826. }
  827. // all locations busy
  828. if (msTick()-start>TREECOPYTIMEOUT) {
  829. WARNLOG("Treecopy %s wait timed out", srcfile->queryFilename());
  830. break;
  831. }
  832. treeCopyWaiting++; // note this isn't precise - just indication
  833. {
  834. CriticalUnblock unblock(treeCopyCrit);
  835. if (throttler)
  836. {
  837. CThrottleReleaseBlock block(*throttler);
  838. treeCopySem.wait(TREECOPYPOLLTIME);
  839. }
  840. else
  841. treeCopySem.wait(TREECOPYPOLLTIME);
  842. }
  843. treeCopyWaiting--;
  844. if ((msTick()-start)/10*1000!=lastmin) {
  845. lastmin = (msTick()-start)/10*1000;
  846. PROGLOG("treeCopyFile delayed: %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  847. }
  848. }
  849. }
  850. else if (TF_TRACE_TREE_COPY)
  851. PROGLOG("TREECOPY source on same node as destination");
  852. if (TF_TRACE_TREE_COPY)
  853. PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  854. try {
  855. GetHostIp(ip);
  856. srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  857. }
  858. catch (IException *e) {
  859. EXCLOG(e,"TREECOPY(done,fallback)");
  860. throw;
  861. }
  862. if (TF_TRACE_TREE_COPY)
  863. PROGLOG("TREECOPY(done,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
  864. }
  865. //---------------------------------------------------------------------------
  866. class CRemoteBase: public CInterface
  867. {
  868. Owned<ISocket> socket;
  869. static SocketEndpoint lastfailep;
  870. static unsigned lastfailtime;
  871. bool useSSL;
  872. void connectSocket(SocketEndpoint &ep)
  873. {
  874. #ifdef _DEBUG
  875. StringBuffer sb;
  876. ep.getUrlStr(sb);
  877. DBGLOG("Client connecting %sto dafilesrv %s", useSSL?"SECURE ":"", sb.str());
  878. #endif
  879. sRFTM tm;
  880. // called in CConnectionTable::crit
  881. unsigned retries = 3;
  882. if (ep.equals(lastfailep)) {
  883. if (msTick()-lastfailtime<DAFS_CONNECT_FAIL_RETRY_TIME) {
  884. StringBuffer msg("Failed to connect to dafilesrv/daliservix on ");
  885. ep.getUrlStr(msg);
  886. throw createDafsException(DAFSERR_connection_failed,msg.str());
  887. }
  888. lastfailep.set(NULL);
  889. retries = 1; // on probation
  890. }
  891. while(retries--) {
  892. CriticalUnblock unblock(CConnectionTable::crit); // allow others to connect
  893. StringBuffer eps;
  894. if (TF_TRACE_CLIENT_CONN) {
  895. ep.getUrlStr(eps);
  896. PROGLOG("Connecting %sto %s",useSSL?"SECURE ":"",eps.str());
  897. //PrintStackReport();
  898. }
  899. bool ok = true;
  900. try {
  901. if (tm.timemon) {
  902. unsigned remaining;
  903. tm.timemon->timedout(&remaining);
  904. socket.setown(ISocket::connect_timeout(ep,remaining));
  905. }
  906. else
  907. socket.setown(ISocket::connect(ep));
  908. if (useSSL)
  909. {
  910. Owned<ISecureSocket> ssock = createSecureSocket(socket.getClear(), ClientSocket);
  911. int status = ssock->secure_connect();
  912. if (status < 0)
  913. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection");
  914. socket.setown(ssock.getLink());
  915. }
  916. }
  917. catch (IJSOCK_Exception *e) {
  918. ok = false;
  919. if (!retries||(tm.timemon&&tm.timemon->timedout())) {
  920. if (e->errorCode()==JSOCKERR_connection_failed) {
  921. lastfailep.set(ep);
  922. lastfailtime = msTick();
  923. e->Release();
  924. StringBuffer msg("Failed to connect to dafilesrv/daliservix on ");
  925. ep.getUrlStr(msg);
  926. throw createDafsException(DAFSERR_connection_failed,msg.str());
  927. }
  928. throw;
  929. }
  930. StringBuffer err;
  931. WARNLOG("Remote file connect %s",e->errorMessage(err).str());
  932. e->Release();
  933. }
  934. if (ok) {
  935. if (TF_TRACE_CLIENT_CONN) {
  936. PROGLOG("Connected to %s",eps.str());
  937. }
  938. if (AuthenticationEnabled) {
  939. try {
  940. sendAuthentication(ep); // this will log error
  941. break;
  942. }
  943. catch (IJSOCK_Exception *e) {
  944. StringBuffer err;
  945. WARNLOG("Remote file authenticate %s for %s ",e->errorMessage(err).str(),ep.getUrlStr(eps.clear()).str());
  946. e->Release();
  947. if (!retries)
  948. break;
  949. }
  950. }
  951. else
  952. break;
  953. }
  954. unsigned sleeptime = getRandom()%3000+1000;
  955. if (tm.timemon) {
  956. unsigned remaining;
  957. tm.timemon->timedout(&remaining);
  958. if (remaining/2<sleeptime)
  959. sleeptime = remaining/2;
  960. }
  961. Sleep(sleeptime); // prevent multiple retries beating
  962. PROGLOG("Retrying %sconnect",useSSL?"SECURE ":"");
  963. }
  964. if (ConnectionTable)
  965. ConnectionTable->addLink(ep,socket);
  966. }
  967. void killSocket(SocketEndpoint &tep)
  968. {
  969. CriticalBlock block2(CConnectionTable::crit); // this is nested with crit
  970. if (socket) {
  971. try {
  972. Owned<ISocket> s = socket.getClear();
  973. if (ConnectionTable)
  974. ConnectionTable->remove(tep,s);
  975. }
  976. catch (IJSOCK_Exception *e) {
  977. e->Release(); // ignore errors closing
  978. }
  979. Sleep(getRandom()%1000*5+500); // prevent multiple beating
  980. }
  981. }
  982. protected: friend class CRemoteFileIO;
  983. StringAttr filename;
  984. CriticalSection crit;
  985. SocketEndpoint ep;
  986. void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false)
  987. {
  988. CriticalBlock block(crit); // serialize commands on same file
  989. SocketEndpoint tep(ep);
  990. setDafsEndpointPort(tep);
  991. unsigned nretries = retry?3:0;
  992. Owned<IJSOCK_Exception> firstexc; // when retrying return first error if fails
  993. loop {
  994. try {
  995. if (socket) {
  996. sendBuffer(socket, src);
  997. receiveBuffer(socket, reply, lengthy?LENGTHY_RETRIES:NORMAL_RETRIES);
  998. break;
  999. }
  1000. }
  1001. catch (IJSOCK_Exception *e) {
  1002. if (!nretries--) {
  1003. if (firstexc) {
  1004. e->Release();
  1005. e = firstexc.getClear();
  1006. }
  1007. killSocket(tep);
  1008. throw e;
  1009. }
  1010. StringBuffer str;
  1011. e->errorMessage(str);
  1012. WARNLOG("Remote File: %s, retrying (%d)",str.str(),nretries);
  1013. if (firstexc)
  1014. e->Release();
  1015. else
  1016. firstexc.setown(e);
  1017. killSocket(tep);
  1018. }
  1019. CriticalBlock block2(CConnectionTable::crit); // this is nested with crit
  1020. if (ConnectionTable) {
  1021. socket.setown(ConnectionTable->lookup(tep));
  1022. if (socket) {
  1023. // validate existing socket by sending an 'exists' command with short time out
  1024. // (use exists for backward compatibility)
  1025. bool ok = false;
  1026. try {
  1027. MemoryBuffer sendbuf;
  1028. initSendBuffer(sendbuf);
  1029. MemoryBuffer replybuf;
  1030. sendbuf.append((RemoteFileCommandType)RFCexists).append(filename);
  1031. sendBuffer(socket, sendbuf);
  1032. receiveBuffer(socket, replybuf, 0, 1024);
  1033. ok = true;
  1034. }
  1035. catch (IException *e) {
  1036. e->Release();
  1037. }
  1038. if (!ok)
  1039. killSocket(tep);
  1040. }
  1041. }
  1042. if (!socket) {
  1043. bool tryNonSecure = true;
  1044. if (useSSL) {
  1045. try {
  1046. connectSocket(tep);//first try secure connect
  1047. tryNonSecure = false;
  1048. }
  1049. catch (IException *e) {
  1050. StringBuffer s;
  1051. e->errorMessage(s);
  1052. e->Release();
  1053. WARNLOG("Secure connect failed, retrying on legacy port (%s)",s.str());
  1054. useSSL = false;
  1055. tep.port = DAFILESRV_PORT;//retry on nonsecure port
  1056. tryNonSecure = true;
  1057. }
  1058. }
  1059. if (tryNonSecure)
  1060. connectSocket(tep);
  1061. }
  1062. }
  1063. unsigned errCode;
  1064. reply.read(errCode);
  1065. if (errCode) {
  1066. StringBuffer msg;
  1067. if (filename.get())
  1068. msg.append(filename);
  1069. ep.getUrlStr(msg.append('[')).append("] ");
  1070. size32_t pos = reply.getPos();
  1071. if (pos<reply.length()) {
  1072. size32_t len = reply.length()-pos;
  1073. const byte *rest = reply.readDirect(len);
  1074. if (errCode==RFSERR_InvalidCommand) {
  1075. const char *s = (const char *)rest;
  1076. const char *e = (const char *)rest+len;
  1077. while (*s&&(s!=e))
  1078. s++;
  1079. msg.append(s-(const char *)rest,(const char *)rest);
  1080. }
  1081. else if (len&&(rest[len-1]==0))
  1082. msg.append((const char *)rest);
  1083. else {
  1084. msg.appendf("extra data[%d]",len);
  1085. for (unsigned i=0;(i<16)&&(i<len);i++)
  1086. msg.appendf(" %2x",(int)rest[i]);
  1087. }
  1088. }
  1089. else if(errCode == 8209)
  1090. msg.append("Failed to open directory.");
  1091. else
  1092. msg.append("ERROR #").append(errCode);
  1093. #ifdef _DEBUG
  1094. ERRLOG("%s",msg.str());
  1095. PrintStackReport();
  1096. #endif
  1097. throw createDafsException(errCode,msg.str());
  1098. }
  1099. }
  1100. void sendRemoteCommand(MemoryBuffer & src, bool retry)
  1101. {
  1102. MemoryBuffer reply;
  1103. sendRemoteCommand(src, reply, retry);
  1104. }
  1105. void throwUnauthenticated(const IpAddress &ip,const char *user,unsigned err=0)
  1106. {
  1107. if (err==0)
  1108. err = RFSERR_AuthenticateFailed;
  1109. StringBuffer msg;
  1110. msg.appendf("Authentication for %s on ",user);
  1111. ip.getIpText(msg);
  1112. msg.append(" failed");
  1113. throw createDafsException(err, msg.str());
  1114. }
  1115. void sendAuthentication(const IpAddress &serverip)
  1116. {
  1117. // send my sig
  1118. // first send my sig which if stream unencrypted will get returned as a bad command
  1119. OnceKey oncekey;
  1120. genOnce(oncekey);
  1121. MemoryBuffer sendbuf;
  1122. initSendBuffer(sendbuf);
  1123. MemoryBuffer replybuf;
  1124. MemoryBuffer encbuf; // because aesEncrypt clears input
  1125. sendbuf.append((RemoteFileCommandType)RFCunlock).append(sizeof(oncekey),&oncekey);
  1126. try {
  1127. sendBuffer(socket, sendbuf);
  1128. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  1129. }
  1130. catch (IException *e)
  1131. {
  1132. EXCLOG(e,"Remote file - sendAuthentication(1)");
  1133. throw;
  1134. }
  1135. unsigned errCode;
  1136. replybuf.read(errCode);
  1137. if (errCode!=0) // no authentication required
  1138. return;
  1139. SocketEndpoint ep;
  1140. ep.setLocalHost(0);
  1141. byte ipdata[16];
  1142. size32_t ipds = ep.getNetAddress(sizeof(ipdata),&ipdata);
  1143. mergeOnce(oncekey,ipds,&ipdata);
  1144. StringBuffer username;
  1145. StringBuffer password;
  1146. IPasswordProvider * pp = queryPasswordProvider();
  1147. if (pp)
  1148. pp->getPassword(serverip, username, password);
  1149. if (!username.length())
  1150. username.append("sds_system"); // default account (note if exists should have restricted access!)
  1151. if (!password.length())
  1152. password.append("sds_man");
  1153. if (replybuf.remaining()<=sizeof(size32_t))
  1154. throwUnauthenticated(serverip,username.str());
  1155. size32_t bs;
  1156. replybuf.read(bs);
  1157. if (replybuf.remaining()<bs)
  1158. throwUnauthenticated(serverip,username.str());
  1159. MemoryBuffer skeybuf;
  1160. aesDecrypt(&oncekey,sizeof(oncekey),replybuf.readDirect(bs),bs,skeybuf);
  1161. if (skeybuf.remaining()<sizeof(OnceKey))
  1162. throwUnauthenticated(serverip,username.str());
  1163. OnceKey sokey;
  1164. skeybuf.read(sizeof(OnceKey),&sokey);
  1165. // now we have the key to use to send user/password
  1166. MemoryBuffer tosend;
  1167. tosend.append((byte)2).append(username).append(password);
  1168. initSendBuffer(sendbuf.clear());
  1169. sendbuf.append((RemoteFileCommandType)RFCunlockreply);
  1170. aesEncrypt(&sokey, sizeof(oncekey), tosend.toByteArray(), tosend.length(), encbuf);
  1171. sendbuf.append(encbuf.length());
  1172. sendbuf.append(encbuf);
  1173. try {
  1174. sendBuffer(socket, sendbuf);
  1175. receiveBuffer(socket, replybuf.clear(), NORMAL_RETRIES, 1024);
  1176. }
  1177. catch (IException *e)
  1178. {
  1179. EXCLOG(e,"Remote file - sendAuthentication(2)");
  1180. throw;
  1181. }
  1182. replybuf.read(errCode);
  1183. if (errCode==0) // suceeded!
  1184. return;
  1185. throwUnauthenticated(serverip,username.str(),errCode);
  1186. }
  1187. public:
  1188. SocketEndpoint &queryEp() { return ep; };
  1189. CRemoteBase(const SocketEndpoint &_ep, const char * _filename)
  1190. : filename(_filename)
  1191. {
  1192. ep = _ep;
  1193. useSSL = securitySettings.useSSL;
  1194. }
  1195. void connect()
  1196. {
  1197. CriticalBlock block(crit);
  1198. CriticalBlock block2(CConnectionTable::crit); // this shouldn't ever block
  1199. if (AuthenticationEnabled) {
  1200. SocketEndpoint tep(ep);
  1201. setDafsEndpointPort(tep);
  1202. connectSocket(tep);
  1203. }
  1204. }
  1205. void disconnect()
  1206. {
  1207. CriticalBlock block(crit);
  1208. CriticalBlock block2(CConnectionTable::crit); // this shouldn't ever block
  1209. ISocket *s = socket.getClear();
  1210. if (ConnectionTable) {
  1211. SocketEndpoint tep(ep);
  1212. setDafsEndpointPort(tep);
  1213. ConnectionTable->remove(tep,s);
  1214. }
  1215. ::Release(s);
  1216. }
  1217. const char *queryLocalName()
  1218. {
  1219. return filename;
  1220. }
  1221. };
  1222. SocketEndpoint CRemoteBase::lastfailep;
  1223. unsigned CRemoteBase::lastfailtime;
  1224. //---------------------------------------------------------------------------
  1225. class CRemoteDirectoryIterator : public CInterface, implements IDirectoryDifferenceIterator
  1226. {
  1227. Owned<IFile> cur;
  1228. bool curvalid;
  1229. bool curisdir;
  1230. StringAttr curname;
  1231. CDateTime curdt;
  1232. __int64 cursize;
  1233. StringAttr dir;
  1234. SocketEndpoint ep;
  1235. byte *flags;
  1236. unsigned numflags;
  1237. unsigned curidx;
  1238. unsigned mask;
  1239. MemoryBuffer buf;
  1240. public:
  1241. static CriticalSection crit;
  1242. CRemoteDirectoryIterator(SocketEndpoint &_ep,const char *_dir)
  1243. : dir(_dir)
  1244. {
  1245. // an extended difference iterator starts with 2 (for bwd compatibility)
  1246. ep = _ep;
  1247. curisdir = false;
  1248. curvalid = false;
  1249. cursize = 0;
  1250. curidx = (unsigned)-1;
  1251. mask = 0;
  1252. numflags = 0;
  1253. flags = NULL;
  1254. }
  1255. bool appendBuf(MemoryBuffer &_buf)
  1256. {
  1257. buf.setSwapEndian(_buf.needSwapEndian());
  1258. byte hdr;
  1259. _buf.read(hdr);
  1260. if (hdr==2) {
  1261. _buf.read(numflags);
  1262. flags = (byte *)malloc(numflags);
  1263. _buf.read(numflags,flags);
  1264. }
  1265. else {
  1266. buf.append(hdr);
  1267. flags = NULL;
  1268. numflags = 0;
  1269. }
  1270. size32_t rest = _buf.length()-_buf.getPos();
  1271. const byte *rb = (const byte *)_buf.readDirect(rest);
  1272. bool ret = true;
  1273. // At the last byte of the rb (rb[rest-1]) is the stream live flag
  1274. // True if the stream has more data
  1275. // False at the end of stream
  1276. // The previous byte (rb[rest-2]) is the flag to signal there are more
  1277. // valid entries in this block
  1278. // True if there are valid directory entry follows this flag
  1279. // False if there are no more valid entry in this block aka end of block
  1280. // If there is more data in the stream, the end of block flag should be removed
  1281. if (rest&&(rb[rest-1]!=0))
  1282. {
  1283. rest--; // remove stream live flag
  1284. if(rest && (0 == rb[rest-1]))
  1285. rest--; //Remove end of block flag
  1286. ret = false; // continuation
  1287. }
  1288. buf.append(rest,rb);
  1289. return ret;
  1290. }
  1291. ~CRemoteDirectoryIterator()
  1292. {
  1293. free(flags);
  1294. }
  1295. IMPLEMENT_IINTERFACE
  1296. bool first()
  1297. {
  1298. curidx = (unsigned)-1;
  1299. buf.reset();
  1300. return next();
  1301. }
  1302. bool next()
  1303. {
  1304. loop {
  1305. curidx++;
  1306. cur.clear();
  1307. curdt.clear();
  1308. curname.clear();
  1309. cursize = 0;
  1310. curisdir = false;
  1311. if (buf.getPos()>=buf.length())
  1312. return false;
  1313. byte isValidEntry;
  1314. buf.read(isValidEntry);
  1315. curvalid = isValidEntry!=0;
  1316. if (!curvalid)
  1317. return false;
  1318. buf.read(curisdir);
  1319. buf.read(cursize);
  1320. curdt.deserialize(buf);
  1321. buf.read(curname);
  1322. // kludge for bug in old linux jlibs
  1323. if (strchr(curname,'\\')&&(getPathSepChar(dir)=='/')) {
  1324. StringBuffer temp(curname);
  1325. temp.replace('\\','/');
  1326. curname.set(temp.str());
  1327. }
  1328. if ((mask==0)||(getFlags()&mask))
  1329. break;
  1330. }
  1331. return true;
  1332. }
  1333. bool isValid()
  1334. {
  1335. return curvalid;
  1336. }
  1337. IFile & query()
  1338. {
  1339. if (!cur) {
  1340. StringBuffer full(dir);
  1341. addPathSepChar(full).append(curname);
  1342. if (ep.isNull())
  1343. cur.setown(createIFile(full.str()));
  1344. else {
  1345. RemoteFilename rfn;
  1346. rfn.setPath(ep,full.str());
  1347. cur.setown(createIFile(rfn));
  1348. }
  1349. }
  1350. return *cur;
  1351. }
  1352. StringBuffer &getName(StringBuffer &buf)
  1353. {
  1354. return buf.append(curname);
  1355. }
  1356. bool isDir()
  1357. {
  1358. return curisdir;
  1359. }
  1360. __int64 getFileSize()
  1361. {
  1362. if (curisdir)
  1363. return -1;
  1364. return cursize;
  1365. }
  1366. bool getModifiedTime(CDateTime &ret)
  1367. {
  1368. ret = curdt;
  1369. return true;
  1370. }
  1371. void setMask(unsigned _mask)
  1372. {
  1373. mask = _mask;
  1374. }
  1375. virtual unsigned getFlags()
  1376. {
  1377. if (flags&&(curidx<numflags))
  1378. return flags[curidx];
  1379. return 0;
  1380. }
  1381. static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize, bool first)
  1382. {
  1383. bool ret = true;
  1384. byte b=1;
  1385. StringBuffer tmp;
  1386. if (first ? iter->first() : iter->next()) {
  1387. loop {
  1388. mb.append(b);
  1389. bool isdir = iter->isDir();
  1390. __int64 sz = isdir?0:iter->getFileSize();
  1391. CDateTime dt;
  1392. iter->getModifiedTime(dt);
  1393. iter->getName(tmp.clear());
  1394. mb.append(isdir).append(sz);
  1395. dt.serialize(mb);
  1396. mb.append(tmp.str());
  1397. if (bufsize&&(mb.length()>=bufsize-1)) {
  1398. ret = false;
  1399. break;
  1400. }
  1401. if (!iter->next())
  1402. break;
  1403. }
  1404. }
  1405. b = 0;
  1406. mb.append(b);
  1407. return ret;
  1408. }
  1409. static void serializeDiff(MemoryBuffer &mb,IDirectoryDifferenceIterator *iter)
  1410. {
  1411. // bit slow
  1412. MemoryBuffer flags;
  1413. ForEach(*iter)
  1414. flags.append((byte)iter->getFlags());
  1415. if (flags.length()) {
  1416. byte b = 2;
  1417. mb.append(b).append((unsigned)flags.length()).append(flags);
  1418. }
  1419. serialize(mb,iter,0,true);
  1420. }
  1421. void serialize(MemoryBuffer &mb,bool isdiff)
  1422. {
  1423. byte b;
  1424. if (isdiff&&numflags&&flags) {
  1425. b = 2;
  1426. mb.append(b).append(numflags).append(numflags,flags);
  1427. }
  1428. serialize(mb,this,0,true);
  1429. }
  1430. };
  1431. class CCritTable;
  1432. class CEndpointCS : public CriticalSection, public CInterface
  1433. {
  1434. CCritTable &table;
  1435. const SocketEndpoint ep;
  1436. public:
  1437. CEndpointCS(CCritTable &_table, const SocketEndpoint &_ep) : table(_table), ep(_ep) { }
  1438. const void *queryFindParam() const { return &ep; }
  1439. virtual void beforeDispose();
  1440. };
  1441. class CCritTable : private SimpleHashTableOf<CEndpointCS, const SocketEndpoint>
  1442. {
  1443. typedef SimpleHashTableOf<CEndpointCS, const SocketEndpoint> PARENT;
  1444. CriticalSection crit;
  1445. public:
  1446. CEndpointCS *getCrit(const SocketEndpoint &ep)
  1447. {
  1448. CriticalBlock b(crit);
  1449. Linked<CEndpointCS> clientCrit = find(ep);
  1450. if (!clientCrit || !clientCrit->isAlive()) // if !isAlive(), then it is in the process of being destroyed/removed.
  1451. {
  1452. clientCrit.setown(new CEndpointCS(*this, ep));
  1453. replace(*clientCrit); // NB table doesn't own
  1454. }
  1455. return clientCrit.getClear();
  1456. }
  1457. unsigned getHashFromElement(const void *e) const
  1458. {
  1459. const CEndpointCS &elem=*(const CEndpointCS *)e;
  1460. return getHashFromFindParam(elem.queryFindParam());
  1461. }
  1462. unsigned getHashFromFindParam(const void *fp) const
  1463. {
  1464. return ((const SocketEndpoint *)fp)->hash(0);
  1465. }
  1466. void removeExact(CEndpointCS *clientCrit)
  1467. {
  1468. CriticalBlock b(crit);
  1469. PARENT::removeExact(clientCrit); // NB may not exist, could have been replaced if detected !isAlive() in getCrit()
  1470. }
  1471. } *dirCSTable;
  1472. MODULE_INIT(INIT_PRIORITY_STANDARD)
  1473. {
  1474. dirCSTable = new CCritTable;
  1475. return true;
  1476. }
  1477. MODULE_EXIT()
  1478. {
  1479. delete dirCSTable;
  1480. }
  1481. void CEndpointCS::beforeDispose()
  1482. {
  1483. table.removeExact(this);
  1484. }
  1485. class CRemoteFile : public CRemoteBase, implements IFile
  1486. {
  1487. StringAttr remotefilename;
  1488. unsigned flags;
  1489. public:
  1490. IMPLEMENT_IINTERFACE
  1491. CRemoteFile(const SocketEndpoint &_ep, const char * _filename)
  1492. : CRemoteBase(_ep, _filename)
  1493. {
  1494. flags = ((unsigned)IFSHread)|((S_IRUSR|S_IWUSR)<<16);
  1495. }
  1496. bool exists()
  1497. {
  1498. MemoryBuffer sendBuffer;
  1499. initSendBuffer(sendBuffer);
  1500. MemoryBuffer replyBuffer;
  1501. sendBuffer.append((RemoteFileCommandType)RFCexists).append(filename);
  1502. sendRemoteCommand(sendBuffer, replyBuffer);
  1503. bool ok;
  1504. replyBuffer.read(ok);
  1505. return ok;
  1506. }
  1507. bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime)
  1508. {
  1509. CDateTime dummyTime;
  1510. if (!createTime)
  1511. createTime = &dummyTime;
  1512. if (!modifiedTime)
  1513. modifiedTime = &dummyTime;
  1514. if (!accessedTime)
  1515. accessedTime = &dummyTime;
  1516. MemoryBuffer sendBuffer;
  1517. initSendBuffer(sendBuffer);
  1518. MemoryBuffer replyBuffer;
  1519. sendBuffer.append((RemoteFileCommandType)RFCgettime).append(filename);
  1520. sendRemoteCommand(sendBuffer, replyBuffer);
  1521. bool ok;
  1522. replyBuffer.read(ok);
  1523. if (ok) {
  1524. createTime->deserialize(replyBuffer);
  1525. modifiedTime->deserialize(replyBuffer);
  1526. accessedTime->deserialize(replyBuffer);
  1527. }
  1528. return ok;
  1529. }
  1530. bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime)
  1531. {
  1532. MemoryBuffer sendBuffer;
  1533. initSendBuffer(sendBuffer);
  1534. MemoryBuffer replyBuffer;
  1535. sendBuffer.append((RemoteFileCommandType)RFCsettime).append(filename);
  1536. if (createTime) {
  1537. sendBuffer.append((bool)true);
  1538. createTime->serialize(sendBuffer);
  1539. }
  1540. else
  1541. sendBuffer.append((bool)false);
  1542. if (modifiedTime) {
  1543. sendBuffer.append((bool)true);
  1544. modifiedTime->serialize(sendBuffer);
  1545. }
  1546. else
  1547. sendBuffer.append((bool)false);
  1548. if (accessedTime) {
  1549. sendBuffer.append((bool)true);
  1550. accessedTime->serialize(sendBuffer);
  1551. }
  1552. else
  1553. sendBuffer.append((bool)false);
  1554. sendRemoteCommand(sendBuffer, replyBuffer);
  1555. bool ok;
  1556. replyBuffer.read(ok);
  1557. return ok;
  1558. }
  1559. fileBool isDirectory()
  1560. {
  1561. MemoryBuffer sendBuffer;
  1562. initSendBuffer(sendBuffer);
  1563. MemoryBuffer replyBuffer;
  1564. sendBuffer.append((RemoteFileCommandType)RFCisdirectory).append(filename);
  1565. sendRemoteCommand(sendBuffer, replyBuffer);
  1566. unsigned ret;
  1567. replyBuffer.read(ret);
  1568. return (fileBool)ret;
  1569. }
  1570. fileBool isFile()
  1571. {
  1572. MemoryBuffer sendBuffer;
  1573. initSendBuffer(sendBuffer);
  1574. MemoryBuffer replyBuffer;
  1575. sendBuffer.append((RemoteFileCommandType)RFCisfile).append(filename);
  1576. sendRemoteCommand(sendBuffer, replyBuffer);
  1577. unsigned ret;
  1578. replyBuffer.read(ret);
  1579. return (fileBool)ret;
  1580. }
  1581. fileBool isReadOnly()
  1582. {
  1583. MemoryBuffer sendBuffer;
  1584. initSendBuffer(sendBuffer);
  1585. MemoryBuffer replyBuffer;
  1586. sendBuffer.append((RemoteFileCommandType)RFCisreadonly).append(filename);
  1587. sendRemoteCommand(sendBuffer, replyBuffer);
  1588. unsigned ret;
  1589. replyBuffer.read(ret);
  1590. return (fileBool)ret;
  1591. }
  1592. IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone);
  1593. IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
  1594. IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported
  1595. const char * queryFilename()
  1596. {
  1597. if (remotefilename.isEmpty()) {
  1598. RemoteFilename rfn;
  1599. rfn.setPath(ep,filename);
  1600. StringBuffer path;
  1601. rfn.getRemotePath(path);
  1602. remotefilename.set(path);
  1603. }
  1604. return remotefilename.get();
  1605. }
  1606. void resetLocalFilename(const char *name)
  1607. {
  1608. remotefilename.clear();
  1609. filename.set(name);
  1610. }
  1611. bool remove()
  1612. {
  1613. MemoryBuffer sendBuffer;
  1614. initSendBuffer(sendBuffer);
  1615. MemoryBuffer replyBuffer;
  1616. sendBuffer.append((RemoteFileCommandType)RFCremove).append(filename);
  1617. sendRemoteCommand(sendBuffer, replyBuffer);
  1618. bool ok;
  1619. replyBuffer.read(ok);
  1620. return ok;
  1621. }
  1622. void rename(const char *newname)
  1623. {
  1624. // currently ignores directory on newname (in future versions newname will be required to be tail only and not full path)
  1625. StringBuffer path;
  1626. splitDirTail(filename,path);
  1627. StringBuffer newdir;
  1628. path.append(splitDirTail(newname,newdir));
  1629. if (newdir.length()&&(strcmp(newdir.str(),path.str())!=0))
  1630. WARNLOG("CRemoteFile::rename passed full path '%s' that may not to match original directory '%s'",newname,path.str());
  1631. MemoryBuffer sendBuffer;
  1632. initSendBuffer(sendBuffer);
  1633. MemoryBuffer replyBuffer;
  1634. sendBuffer.append((RemoteFileCommandType)RFCrename).append(filename).append(path);
  1635. sendRemoteCommand(sendBuffer, replyBuffer);
  1636. filename.set(path);
  1637. remotefilename.clear();
  1638. }
  1639. void move(const char *newname)
  1640. {
  1641. // like rename except between directories
  1642. // first create replote path
  1643. if (!newname||!*newname)
  1644. return;
  1645. RemoteFilename destrfn;
  1646. if (isPathSepChar(newname[0])&&isPathSepChar(newname[1])) {
  1647. destrfn.setRemotePath(newname);
  1648. if (!destrfn.queryEndpoint().ipequals(ep)) {
  1649. StringBuffer msg;
  1650. msg.appendf("IFile::move %s to %s, destination node must match source node", queryFilename(), newname);
  1651. throw createDafsException(RFSERR_MoveFailed,msg.str());
  1652. }
  1653. }
  1654. else
  1655. destrfn.setPath(ep,newname);
  1656. StringBuffer dest;
  1657. newname = destrfn.getLocalPath(dest).str();
  1658. MemoryBuffer sendBuffer;
  1659. initSendBuffer(sendBuffer);
  1660. MemoryBuffer replyBuffer;
  1661. StringBuffer path;
  1662. splitDirTail(filename,path);
  1663. StringBuffer newdir;
  1664. const char *newtail = splitDirTail(newname,newdir);
  1665. if (strcmp(newdir.str(),path.str())==0) {
  1666. path.append(newtail);
  1667. newname = path;
  1668. sendBuffer.append((RemoteFileCommandType)RFCrename); // use rename if we can (supported on older dafilesrv)
  1669. }
  1670. else
  1671. sendBuffer.append((RemoteFileCommandType)RFCmove);
  1672. sendBuffer.append(filename).append(newname);
  1673. sendRemoteCommand(sendBuffer, replyBuffer);
  1674. filename.set(newname);
  1675. remotefilename.clear();
  1676. }
  1677. void setReadOnly(bool set)
  1678. {
  1679. MemoryBuffer sendBuffer;
  1680. initSendBuffer(sendBuffer);
  1681. MemoryBuffer replyBuffer;
  1682. sendBuffer.append((RemoteFileCommandType)RFCsetreadonly).append(filename).append(set);
  1683. sendRemoteCommand(sendBuffer, replyBuffer);
  1684. }
  1685. offset_t size()
  1686. {
  1687. #if 1 // faster method (consistant with IFile)
  1688. // do this by using dir call (could be improved with new function but this not *too* bad)
  1689. if (isSpecialPath(filename))
  1690. return 0; // queries deemed to always exist (though don't know size).
  1691. // if needed to get size I guess could use IFileIO method and cache (bit of pain though)
  1692. StringBuffer dir;
  1693. const char *tail = splitDirTail(filename,dir);
  1694. if (!dir.length())
  1695. return false;
  1696. MemoryBuffer sendBuffer;
  1697. initSendBuffer(sendBuffer);
  1698. MemoryBuffer replyBuffer;
  1699. bool includedirs = true;
  1700. bool sub=false;
  1701. {
  1702. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  1703. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1704. CriticalBlock block(*crit);
  1705. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  1706. sendRemoteCommand(sendBuffer, replyBuffer);
  1707. }
  1708. // now should be 0 or 1 files returned
  1709. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  1710. iter->appendBuf(replyBuffer);
  1711. if (!iter->first())
  1712. return (offset_t)-1;
  1713. return (offset_t) iter->getFileSize();
  1714. #else
  1715. IFileIO * io = open(IFOread);
  1716. offset_t length = (offset_t)-1;
  1717. if (io)
  1718. {
  1719. length = io->size();
  1720. io->Release();
  1721. }
  1722. return length;
  1723. #endif
  1724. }
  1725. bool createDirectory()
  1726. {
  1727. MemoryBuffer sendBuffer;
  1728. initSendBuffer(sendBuffer);
  1729. MemoryBuffer replyBuffer;
  1730. sendBuffer.append((RemoteFileCommandType)RFCcreatedir).append(filename);
  1731. sendRemoteCommand(sendBuffer, replyBuffer);
  1732. bool ok;
  1733. replyBuffer.read(ok);
  1734. return ok;
  1735. }
  1736. virtual IDirectoryIterator *directoryFiles(const char *mask,bool sub,bool includedirs)
  1737. {
  1738. if (mask&&!*mask)
  1739. return createDirectoryIterator("",""); // NULL iterator
  1740. CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename);
  1741. byte stream=1;
  1742. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1743. CriticalBlock block(*crit);
  1744. loop {
  1745. MemoryBuffer sendBuffer;
  1746. initSendBuffer(sendBuffer);
  1747. MemoryBuffer replyBuffer;
  1748. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(filename).append(mask?mask:"").append(includedirs).append(sub).append(stream);
  1749. sendRemoteCommand(sendBuffer, replyBuffer);
  1750. if (ret->appendBuf(replyBuffer))
  1751. break;
  1752. stream = 2;
  1753. }
  1754. return ret;
  1755. }
  1756. IDirectoryDifferenceIterator *monitorDirectory(
  1757. IDirectoryIterator *prev=NULL, // in (NULL means use current as baseline)
  1758. const char *mask=NULL,
  1759. bool sub=false,
  1760. bool includedirs=false,
  1761. unsigned checkinterval=60*1000,
  1762. unsigned timeout=(unsigned)-1,
  1763. Semaphore *abortsem=NULL) // returns NULL if timed out
  1764. {
  1765. // abortsem not yet supported
  1766. MemoryBuffer sendBuffer;
  1767. initSendBuffer(sendBuffer);
  1768. MemoryBuffer replyBuffer;
  1769. sendBuffer.append((RemoteFileCommandType)RFCmonitordir).append(filename).append(mask?mask:"").append(includedirs).append(sub);
  1770. sendBuffer.append(checkinterval).append(timeout);
  1771. __int64 cancelid=0; // not yet used
  1772. sendBuffer.append(cancelid);
  1773. byte isprev=(prev!=NULL)?1:0;
  1774. sendBuffer.append(isprev);
  1775. if (prev)
  1776. CRemoteDirectoryIterator::serialize(sendBuffer,prev,0,true);
  1777. sendRemoteCommand(sendBuffer, replyBuffer);
  1778. byte status;
  1779. replyBuffer.read(status);
  1780. if (status==1) {
  1781. CRemoteDirectoryIterator *iter = new CRemoteDirectoryIterator(ep, filename);
  1782. iter->appendBuf(replyBuffer);
  1783. return iter;
  1784. }
  1785. return NULL;
  1786. }
  1787. bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
  1788. {
  1789. // do this by using dir call (could be improved with new function but this not *too* bad)
  1790. StringBuffer dir;
  1791. const char *tail = splitDirTail(filename,dir);
  1792. if (!dir.length())
  1793. return false;
  1794. MemoryBuffer sendBuffer;
  1795. initSendBuffer(sendBuffer);
  1796. MemoryBuffer replyBuffer;
  1797. bool includedirs = true;
  1798. bool sub=false;
  1799. {
  1800. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  1801. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  1802. CriticalBlock block(*crit);
  1803. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  1804. sendRemoteCommand(sendBuffer, replyBuffer);
  1805. }
  1806. // now should be 0 or 1 files returned
  1807. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  1808. iter->appendBuf(replyBuffer);
  1809. if (!iter->first())
  1810. return false;
  1811. isdir = iter->isDir();
  1812. size = (offset_t) iter->getFileSize();
  1813. iter->getModifiedTime(modtime);
  1814. return true;
  1815. }
  1816. bool setCompression(bool set)
  1817. {
  1818. assertex(!"Need to implement compress()");
  1819. return false;
  1820. }
  1821. offset_t compressedSize()
  1822. {
  1823. assertex(!"Need to implement actualSize()");
  1824. return (offset_t)-1;
  1825. }
  1826. void serialize(MemoryBuffer &tgt)
  1827. {
  1828. throwUnexpected();
  1829. }
  1830. void deserialize(MemoryBuffer &src)
  1831. {
  1832. throwUnexpected();
  1833. }
  1834. unsigned getCRC()
  1835. {
  1836. MemoryBuffer sendBuffer;
  1837. initSendBuffer(sendBuffer);
  1838. MemoryBuffer replyBuffer;
  1839. sendBuffer.append((RemoteFileCommandType)RFCgetcrc).append(filename);
  1840. sendRemoteCommand(sendBuffer, replyBuffer, true, true);
  1841. unsigned crc;
  1842. replyBuffer.read(crc);
  1843. return crc;
  1844. }
  1845. void setCreateFlags(unsigned cflags)
  1846. {
  1847. flags |= (cflags<<16);
  1848. }
  1849. void setShareMode(IFSHmode shmode)
  1850. {
  1851. flags &= ~(IFSHfull|IFSHread);
  1852. flags |= (unsigned)(shmode&(IFSHfull|IFSHread));
  1853. }
  1854. void remoteExtractBlobElements(const char * prefix, ExtractedBlobArray & extracted)
  1855. {
  1856. MemoryBuffer sendBuffer;
  1857. initSendBuffer(sendBuffer);
  1858. sendBuffer.append((RemoteFileCommandType)RFCextractblobelements).append(prefix).append(filename);
  1859. MemoryBuffer replyBuffer;
  1860. sendRemoteCommand(sendBuffer, replyBuffer, true, true); // handles error code
  1861. unsigned n;
  1862. replyBuffer.read(n);
  1863. for (unsigned i=0;i<n;i++) {
  1864. ExtractedBlobInfo *item = new ExtractedBlobInfo;
  1865. item->deserialize(replyBuffer);
  1866. extracted.append(*item);
  1867. }
  1868. }
  1869. bool copySectionAsync(const char *uuid,const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout)
  1870. {
  1871. // now if we get here is it can be assumed the source file is local to where we send the command
  1872. StringBuffer tos;
  1873. dest.getRemotePath(tos);
  1874. MemoryBuffer sendBuffer;
  1875. initSendBuffer(sendBuffer);
  1876. MemoryBuffer replyBuffer;
  1877. sendBuffer.append((RemoteFileCommandType)RFCcopysection).append(uuid).append(queryLocalName()).append(tos).append(toOfs).append(fromOfs).append(size).append(timeout);
  1878. sendRemoteCommand(sendBuffer, replyBuffer);
  1879. unsigned status;
  1880. replyBuffer.read(status);
  1881. if (progress) {
  1882. offset_t sizeDone;
  1883. offset_t totalSize;
  1884. replyBuffer.read(sizeDone).read(totalSize);
  1885. progress->onProgress(sizeDone,totalSize);
  1886. }
  1887. return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception
  1888. }
  1889. void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags=CFnone)
  1890. {
  1891. StringBuffer uuid;
  1892. genUUID(uuid,true);
  1893. unsigned timeout = 60*1000; // check every minute
  1894. while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout));
  1895. }
  1896. void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
  1897. virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write)
  1898. {
  1899. return NULL;
  1900. }
  1901. void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp,CFflags copyFlags=CFnone)
  1902. {
  1903. resfrom.ipset(NULL);
  1904. MemoryBuffer sendBuffer;
  1905. initSendBuffer(sendBuffer);
  1906. MemoryBuffer replyBuffer;
  1907. sendBuffer.append((RemoteFileCommandType)(usetmp?RFCtreecopytmp:RFCtreecopy));
  1908. RemoteFilename rfn;
  1909. rfn.setPath(ep,filename);
  1910. rfn.serialize(sendBuffer);
  1911. const char *d = dest->queryFilename();
  1912. if (!isAbsolutePath(d))
  1913. throw MakeStringException(-1,"treeCopyFile destination '%s' is not an absolute path", d);
  1914. rfn.setRemotePath(d);
  1915. rfn.serialize(sendBuffer);
  1916. StringBuffer tmp;
  1917. subnet.getNetText(tmp);
  1918. sendBuffer.append(tmp);
  1919. subnet.getMaskText(tmp.clear());
  1920. sendBuffer.append(tmp);
  1921. unsigned status=1;
  1922. try {
  1923. sendRemoteCommand(sendBuffer, replyBuffer);
  1924. replyBuffer.read(status);
  1925. }
  1926. catch (IDAFS_Exception *e) {
  1927. if (e->errorCode()!=RFSERR_InvalidCommand)
  1928. throw;
  1929. e->Release();
  1930. status = (unsigned)-1;
  1931. }
  1932. if (status==-1) {
  1933. resfrom.ipset(ep);
  1934. StringBuffer tmp;
  1935. WARNLOG("dafilesrv on %s does not support treeCopyTo - falling back to copyTo",resfrom.getIpText(tmp).str());
  1936. copyTo(dest,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
  1937. status = 0;
  1938. }
  1939. else if (status==0)
  1940. resfrom.ipdeserialize(replyBuffer);
  1941. }
  1942. };
  1943. void clientCacheFileConnect(SocketEndpoint &_ep,unsigned timeout)
  1944. {
  1945. if (!timeout) {
  1946. SocketEndpoint ep(_ep);
  1947. setDafsEndpointPort(ep);
  1948. Owned<CRemoteFile> cfile = new CRemoteFile(ep, "null");
  1949. cfile->connect();
  1950. return; // frees file and adds its socket to cache
  1951. }
  1952. // timeout needed so start a thread (that may become orphaned)
  1953. class cThread: public Thread
  1954. {
  1955. SocketEndpoint ep;
  1956. public:
  1957. cThread(SocketEndpoint &_ep)
  1958. : Thread("cacheFileConnect")
  1959. {
  1960. ep = _ep;
  1961. }
  1962. int run()
  1963. {
  1964. try {
  1965. clientCacheFileConnect(ep,0);
  1966. }
  1967. catch (IException *e) {
  1968. CriticalBlock block(sect);
  1969. except.setown(e);
  1970. }
  1971. return 0;
  1972. }
  1973. Owned<IException> except;
  1974. CriticalSection sect;
  1975. } *thread;
  1976. thread = new cThread(_ep);
  1977. thread->start();
  1978. IException *e =NULL;
  1979. if (!thread->join(timeout)) {
  1980. StringBuffer msg("Timed out connecting to ");
  1981. _ep.getUrlStr(msg);
  1982. e = createDafsException(RFSERR_AuthenticateFailed,msg.str());
  1983. }
  1984. {
  1985. CriticalBlock block(thread->sect);
  1986. if (!e&&thread->except)
  1987. e = thread->except.getClear();
  1988. }
  1989. thread->Release();
  1990. if (e)
  1991. throw e;
  1992. }
  1993. void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket)
  1994. {
  1995. CriticalBlock block(CConnectionTable::crit);
  1996. if (ConnectionTable)
  1997. ConnectionTable->addLink(ep,socket);
  1998. }
  1999. IFile * createRemoteFile(SocketEndpoint &ep, const char * filename)
  2000. {
  2001. IFile *ret = createFileLocalMount(ep,filename);
  2002. if (ret)
  2003. return ret;
  2004. return new CRemoteFile(ep, filename);
  2005. }
  2006. void clientDisconnectRemoteFile(IFile *file)
  2007. {
  2008. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  2009. if (cfile)
  2010. cfile->disconnect();
  2011. }
  2012. bool clientResetFilename(IFile *file, const char *newname) // returns false if not remote
  2013. {
  2014. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  2015. if (!cfile)
  2016. return false;
  2017. cfile->resetLocalFilename(newname);
  2018. return true;
  2019. }
  2020. extern bool clientAsyncCopyFileSection(const char *uuid,
  2021. IFile *from, // expected to be remote
  2022. RemoteFilename &to,
  2023. offset_t toOfs, // -1 created file and copies to start
  2024. offset_t fromOfs,
  2025. offset_t size,
  2026. ICopyFileProgress *progress,
  2027. unsigned timeout) // returns true when done
  2028. {
  2029. CRemoteFile *cfile = QUERYINTERFACE(from,CRemoteFile);
  2030. if (!cfile) {
  2031. // local - do sync
  2032. from->copySection(to,toOfs,fromOfs,size,progress);
  2033. return true;
  2034. }
  2035. return cfile->copySectionAsync(uuid,to,toOfs,fromOfs, size, progress, timeout);
  2036. }
  2037. //---------------------------------------------------------------------------
  2038. class CRemoteFileIO : public CInterface, implements IFileIO
  2039. {
  2040. protected:
  2041. Linked<CRemoteFile> parent;
  2042. RemoteFileIOHandle handle;
  2043. IFOmode mode;
  2044. compatIFSHmode compatmode;
  2045. IFEflags extraFlags;
  2046. bool disconnectonexit;
  2047. public:
  2048. IMPLEMENT_IINTERFACE
  2049. CRemoteFileIO(CRemoteFile *_parent)
  2050. : parent(_parent)
  2051. {
  2052. handle = 0;
  2053. disconnectonexit = false;
  2054. }
  2055. ~CRemoteFileIO()
  2056. {
  2057. if (handle) {
  2058. try {
  2059. close();
  2060. }
  2061. catch (IException *e) {
  2062. StringBuffer s;
  2063. e->errorMessage(s);
  2064. WARNLOG("CRemoteFileIO close file: %s",s.str());
  2065. e->Release();
  2066. }
  2067. }
  2068. if (disconnectonexit)
  2069. parent->disconnect();
  2070. }
  2071. void close()
  2072. {
  2073. if (handle) {
  2074. try {
  2075. MemoryBuffer sendBuffer;
  2076. initSendBuffer(sendBuffer);
  2077. sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
  2078. parent->sendRemoteCommand(sendBuffer,false);
  2079. }
  2080. catch (IDAFS_Exception *e) {
  2081. if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
  2082. throw;
  2083. e->Release();
  2084. }
  2085. handle = 0;
  2086. }
  2087. }
  2088. bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
  2089. {
  2090. MemoryBuffer sendBuffer;
  2091. initSendBuffer(sendBuffer);
  2092. MemoryBuffer replyBuffer;
  2093. const char *localname = parent->queryLocalName();
  2094. localname = skipSpecialPath(localname);
  2095. // also send _extraFlags
  2096. sendBuffer.append((RemoteFileCommandType)RFCopenIO).append(localname).append((byte)_mode).append((byte)_compatmode).append((byte)_extraFlags);
  2097. parent->sendRemoteCommand(sendBuffer, replyBuffer);
  2098. replyBuffer.read(handle);
  2099. if (!handle)
  2100. return false;
  2101. switch (_mode) {
  2102. case IFOcreate:
  2103. mode = IFOwrite;
  2104. break;
  2105. case IFOcreaterw:
  2106. mode = IFOreadwrite;
  2107. break;
  2108. default:
  2109. mode = _mode;
  2110. }
  2111. compatmode = _compatmode;
  2112. extraFlags = _extraFlags;
  2113. return true;
  2114. }
  2115. bool reopen()
  2116. {
  2117. StringBuffer s;
  2118. PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str());
  2119. if (open(mode,compatmode,extraFlags)) {
  2120. return true;
  2121. }
  2122. return false;
  2123. }
  2124. offset_t size()
  2125. {
  2126. MemoryBuffer sendBuffer;
  2127. initSendBuffer(sendBuffer);
  2128. MemoryBuffer replyBuffer;
  2129. sendBuffer.append((RemoteFileCommandType)RFCsize).append(handle);
  2130. parent->sendRemoteCommand(sendBuffer, replyBuffer, false);
  2131. // Retry using reopen TBD
  2132. offset_t ret;
  2133. replyBuffer.read(ret);
  2134. return ret;
  2135. }
  2136. size32_t read(offset_t pos, size32_t len, void * data)
  2137. {
  2138. size32_t got;
  2139. MemoryBuffer replyBuffer;
  2140. const void *b = doRead(pos,len,replyBuffer,got,data);
  2141. if (b!=data)
  2142. memcpy(data,b,got);
  2143. return got;
  2144. }
  2145. virtual void flush()
  2146. {
  2147. }
  2148. const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
  2149. {
  2150. unsigned tries=0;
  2151. loop {
  2152. try {
  2153. MemoryBuffer sendBuffer;
  2154. initSendBuffer(sendBuffer);
  2155. replyBuffer.clear();
  2156. sendBuffer.append((RemoteFileCommandType)RFCread).append(handle).append(pos).append(len);
  2157. parent->sendRemoteCommand(sendBuffer, replyBuffer,false);
  2158. // kludge dafilesrv versions <= 1.5e don't return error correctly
  2159. if (replyBuffer.length()>len+sizeof(size32_t)+sizeof(unsigned)) {
  2160. size32_t save = replyBuffer.getPos();
  2161. replyBuffer.reset(len+sizeof(size32_t)+sizeof(unsigned));
  2162. unsigned errCode;
  2163. replyBuffer.read(errCode);
  2164. if (errCode) {
  2165. StringBuffer msg;
  2166. parent->ep.getUrlStr(msg.append('[')).append("] ");
  2167. if (replyBuffer.getPos()<replyBuffer.length()) {
  2168. StringAttr s;
  2169. replyBuffer.read(s);
  2170. msg.append(s);
  2171. }
  2172. else
  2173. msg.append("ERROR #").append(errCode);
  2174. throw createDafsException(errCode, msg.str());
  2175. }
  2176. else
  2177. replyBuffer.reset(save);
  2178. }
  2179. replyBuffer.read(got);
  2180. if ((got>replyBuffer.remaining())||(got>len)) {
  2181. PROGLOG("Read beyond buffer %d,%d,%d",got,replyBuffer.remaining(),len);
  2182. throw createDafsException(RFSERR_ReadFailed, "Read beyond buffer");
  2183. }
  2184. return replyBuffer.readDirect(got);
  2185. }
  2186. catch (IJSOCK_Exception *e) {
  2187. EXCLOG(e,"CRemoteFileIO::read");
  2188. if (++tries>3)
  2189. throw;
  2190. WARNLOG("Retrying read of %s (%d)",parent->queryLocalName(),tries);
  2191. Owned<IException> exc = e;
  2192. if (!reopen())
  2193. throw exc.getClear();
  2194. }
  2195. }
  2196. got = 0;
  2197. return NULL;
  2198. }
  2199. size32_t write(offset_t pos, size32_t len, const void * data)
  2200. {
  2201. unsigned tries=0;
  2202. size32_t ret = 0;
  2203. loop {
  2204. try {
  2205. MemoryBuffer replyBuffer;
  2206. MemoryBuffer sendBuffer;
  2207. initSendBuffer(sendBuffer);
  2208. sendBuffer.append((RemoteFileCommandType)RFCwrite).append(handle).append(pos).append(len).append(len, data);
  2209. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  2210. replyBuffer.read(ret);
  2211. break;
  2212. }
  2213. catch (IJSOCK_Exception *e) {
  2214. EXCLOG(e,"CRemoteFileIO::write");
  2215. if (++tries>3)
  2216. throw;
  2217. WARNLOG("Retrying write(%" I64F "d,%d) of %s (%d)",pos,len,parent->queryLocalName(),tries);
  2218. Owned<IException> exc = e;
  2219. if (!reopen())
  2220. throw exc.getClear();
  2221. }
  2222. }
  2223. if ((ret==(size32_t)-1) || (ret < len))
  2224. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?");
  2225. return ret;
  2226. }
  2227. offset_t appendFile(IFile *file,offset_t pos,offset_t len)
  2228. {
  2229. MemoryBuffer sendBuffer;
  2230. initSendBuffer(sendBuffer);
  2231. MemoryBuffer replyBuffer;
  2232. const char * fname = file->queryFilename();
  2233. sendBuffer.append((RemoteFileCommandType)RFCappend).append(handle).append(fname).append(pos).append(len);
  2234. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry not safe
  2235. offset_t ret;
  2236. replyBuffer.read(ret);
  2237. if ((ret==(offset_t)-1) || (ret < len))
  2238. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"append failed, disk full?"); // though could be file missing TBD
  2239. return ret;
  2240. }
  2241. void setSize(offset_t size)
  2242. {
  2243. MemoryBuffer sendBuffer;
  2244. initSendBuffer(sendBuffer);
  2245. MemoryBuffer replyBuffer;
  2246. sendBuffer.append((RemoteFileCommandType)RFCsetsize).append(handle).append(size);
  2247. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  2248. // retry using reopen TBD
  2249. }
  2250. void setDisconnectOnExit(bool set) { disconnectonexit = set; }
  2251. };
  2252. void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set)
  2253. {
  2254. CRemoteFileIO *cfileio = QUERYINTERFACE(fileio,CRemoteFileIO);
  2255. if (cfileio)
  2256. cfileio->setDisconnectOnExit(set);
  2257. }
  2258. IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags)
  2259. {
  2260. assertex(((unsigned)shmode&0xffffffc7)==0);
  2261. compatIFSHmode compatmode;
  2262. unsigned fileflags = (flags>>16) & (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH);
  2263. if (fileflags&S_IXUSR) // this is bit hit and miss but backward compatible
  2264. compatmode = compatIFSHexec;
  2265. else if (fileflags&(S_IWGRP|S_IWOTH))
  2266. compatmode = compatIFSHall;
  2267. else if (shmode&IFSHfull)
  2268. compatmode = compatIFSHwrite;
  2269. else if (((shmode&(IFSHread|IFSHfull))==0) && ((fileflags&(S_IRGRP|S_IROTH))==0))
  2270. compatmode = compatIFSHnone;
  2271. else
  2272. compatmode = compatIFSHread;
  2273. Owned<CRemoteFileIO> res = new CRemoteFileIO(this);
  2274. if (res->open(mode,compatmode,extraFlags))
  2275. return res.getClear();
  2276. return NULL;
  2277. }
  2278. IFileIO * CRemoteFile::open(IFOmode mode,IFEflags extraFlags)
  2279. {
  2280. return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)),extraFlags);
  2281. }
  2282. //---------------------------------------------------------------------------
  2283. void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags)
  2284. {
  2285. CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile);
  2286. if (dstfile&&!dstfile->queryEp().isLocal()) {
  2287. StringBuffer tmpname;
  2288. Owned<IFile> destf;
  2289. RemoteFilename dest;
  2290. if (usetmp) {
  2291. makeTempCopyName(tmpname,dstfile->queryLocalName());
  2292. dest.setPath(dstfile->queryEp(),tmpname.str());
  2293. }
  2294. else
  2295. dest.setPath(dstfile->queryEp(),dstfile->queryLocalName());
  2296. destf.setown(createIFile(dest));
  2297. try {
  2298. // following may fail if new dafilesrv not deployed on src
  2299. copySection(dest,(offset_t)-1,0,(offset_t)-1,progress,copyFlags);
  2300. if (usetmp) {
  2301. StringAttr tail(pathTail(dstfile->queryLocalName()));
  2302. dstfile->remove();
  2303. destf->rename(tail);
  2304. }
  2305. return;
  2306. }
  2307. catch (IException *e)
  2308. {
  2309. StringBuffer s;
  2310. s.appendf("Remote File Copy (%d): ",e->errorCode());
  2311. e->errorMessage(s);
  2312. s.append(", retrying local");
  2313. WARNLOG("%s",s.str());
  2314. e->Release();
  2315. }
  2316. // delete dest
  2317. try {
  2318. destf->remove();
  2319. }
  2320. catch (IException *e)
  2321. {
  2322. EXCLOG(e,"Remote File Copy, Deleting temporary file");
  2323. e->Release();
  2324. }
  2325. }
  2326. // assumption if we get here that source remote, dest local (or equiv)
  2327. class cIntercept: implements ICopyFileIntercept
  2328. {
  2329. MemoryAttr ma;
  2330. MemoryBuffer mb;
  2331. virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz)
  2332. {
  2333. if (ma.length()<sz)
  2334. ma.allocate(sz); // may be not used
  2335. void *buf = ma.bufferBase();
  2336. size32_t got;
  2337. CRemoteFileIO *srcio = QUERYINTERFACE(from,CRemoteFileIO);
  2338. const void *dst;
  2339. if (srcio)
  2340. dst = srcio->doRead(ofs,sz,mb.clear(),got,buf);
  2341. else {
  2342. // shouldn't ever get here if source remote
  2343. got = from->read(ofs, sz, buf);
  2344. dst = buf;
  2345. }
  2346. if (got != 0)
  2347. to->write(ofs, got, dst);
  2348. return got;
  2349. }
  2350. } intercept;
  2351. doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
  2352. }
  2353. unsigned getRemoteVersion(ISocket * socket, StringBuffer &ver)
  2354. {
  2355. static CriticalSection sect;
  2356. CriticalBlock block(sect);
  2357. if (!socket)
  2358. return 0;
  2359. unsigned ret;
  2360. MemoryBuffer sendbuf;
  2361. initSendBuffer(sendbuf);
  2362. sendbuf.append((RemoteFileCommandType)RFCgetver);
  2363. sendbuf.append((unsigned)RFCgetver);
  2364. MemoryBuffer reply;
  2365. try {
  2366. sendBuffer(socket, sendbuf);
  2367. receiveBuffer(socket, reply, 1 ,4096);
  2368. unsigned errCode;
  2369. reply.read(errCode);
  2370. if (errCode==RFSERR_InvalidCommand) {
  2371. ver.append("DS V1.0");
  2372. return 10;
  2373. }
  2374. else if (errCode==0)
  2375. ret = 11;
  2376. else if (errCode<0x10000)
  2377. return 0;
  2378. else
  2379. ret = errCode-0x10000;
  2380. }
  2381. catch (IException *e) {
  2382. EXCLOG(e);
  2383. ::Release(e);
  2384. return 0;
  2385. }
  2386. StringAttr vers;
  2387. reply.read(vers);
  2388. ver.append(vers);
  2389. return ret;
  2390. }
  2391. extern unsigned stopRemoteServer(ISocket * socket)
  2392. {
  2393. static CriticalSection sect;
  2394. CriticalBlock block(sect);
  2395. if (!socket)
  2396. return 0;
  2397. MemoryBuffer sendbuf;
  2398. initSendBuffer(sendbuf);
  2399. sendbuf.append((RemoteFileCommandType)RFCstop);
  2400. sendbuf.append((unsigned)RFCstop);
  2401. MemoryBuffer replybuf;
  2402. unsigned errCode = RFSERR_InvalidCommand;
  2403. try {
  2404. sendBuffer(socket, sendbuf);
  2405. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  2406. replybuf.read(errCode);
  2407. }
  2408. catch (IJSOCK_Exception *e) {
  2409. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  2410. EXCLOG(e);
  2411. else
  2412. errCode = 0;
  2413. }
  2414. catch (IException *e) {
  2415. EXCLOG(e);
  2416. ::Release(e);
  2417. }
  2418. return errCode;
  2419. }
  2420. int remoteExec(ISocket * socket, const char *cmdline, const char *workdir,bool sync,
  2421. size32_t insize, void *inbuf, MemoryBuffer *outbuf)
  2422. {
  2423. if (!socket)
  2424. return -1;
  2425. bool hasoutput = (outbuf!=NULL);
  2426. if (!inbuf)
  2427. insize = 0;
  2428. MemoryBuffer sendbuf;
  2429. initSendBuffer(sendbuf);
  2430. sendbuf.append((RemoteFileCommandType)RFCexec).append(cmdline).append(workdir).append(sync).
  2431. append(hasoutput).append(insize);
  2432. if (insize)
  2433. sendbuf.append(insize, inbuf);
  2434. MemoryBuffer replybuf;
  2435. try {
  2436. sendBuffer(socket, sendbuf);
  2437. receiveBuffer(socket, replybuf, LENGTHY_RETRIES); // we don't know how long program will take really - assume <1hr
  2438. int retcode;
  2439. unsigned phandle;
  2440. size32_t outsz;
  2441. replybuf.read(retcode).read(phandle).read(outsz);
  2442. if (outsz&&outbuf)
  2443. replybuf.read(outsz,outbuf->reserve(outsz));
  2444. return retcode;
  2445. }
  2446. catch (IException *e) {
  2447. EXCLOG(e);
  2448. ::Release(e);
  2449. }
  2450. return -1;
  2451. }
  2452. int setDafsTrace(ISocket * socket,byte flags)
  2453. {
  2454. if (!socket) {
  2455. byte ret = traceFlags;
  2456. traceFlags = flags;
  2457. return ret;
  2458. }
  2459. MemoryBuffer sendbuf;
  2460. initSendBuffer(sendbuf);
  2461. sendbuf.append((RemoteFileCommandType)RFCsettrace).append(flags);
  2462. MemoryBuffer replybuf;
  2463. try {
  2464. sendBuffer(socket, sendbuf);
  2465. receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  2466. int retcode;
  2467. replybuf.read(retcode);
  2468. return retcode;
  2469. }
  2470. catch (IException *e) {
  2471. EXCLOG(e);
  2472. ::Release(e);
  2473. }
  2474. return -1;
  2475. }
  2476. int getDafsInfo(ISocket * socket,StringBuffer &retstr)
  2477. {
  2478. if (!socket) {
  2479. retstr.append(VERSTRING);
  2480. return 0;
  2481. }
  2482. MemoryBuffer sendbuf;
  2483. initSendBuffer(sendbuf);
  2484. sendbuf.append((RemoteFileCommandType)RFCgetinfo);
  2485. MemoryBuffer replybuf;
  2486. try {
  2487. sendBuffer(socket, sendbuf);
  2488. receiveBuffer(socket, replybuf, 1);
  2489. int retcode;
  2490. replybuf.read(retcode);
  2491. if (retcode==0) {
  2492. StringAttr s;
  2493. replybuf.read(s);
  2494. retstr.append(s);
  2495. }
  2496. return retcode;
  2497. }
  2498. catch (IException *e) {
  2499. EXCLOG(e);
  2500. ::Release(e);
  2501. }
  2502. return -1;
  2503. }
  2504. void remoteExtractBlobElements(const SocketEndpoint &ep,const char * prefix, const char * filename, ExtractedBlobArray & extracted)
  2505. {
  2506. Owned<CRemoteFile> file = new CRemoteFile (ep,filename);
  2507. file->remoteExtractBlobElements(prefix, extracted);
  2508. }
  2509. //====================================================================================================
  2510. class CAsyncCommandManager
  2511. {
  2512. class CAsyncJob: public CInterface
  2513. {
  2514. class cThread: public Thread
  2515. {
  2516. CAsyncJob *parent;
  2517. public:
  2518. cThread(CAsyncJob *_parent)
  2519. : Thread("CAsyncJob")
  2520. {
  2521. parent = _parent;
  2522. }
  2523. int run()
  2524. {
  2525. int ret = -1;
  2526. try {
  2527. ret = parent->run();
  2528. parent->setDone();
  2529. }
  2530. catch (IException *e)
  2531. {
  2532. parent->setException(e);
  2533. }
  2534. parent->threadsem.signal();
  2535. return ret;
  2536. }
  2537. } *thread;
  2538. StringAttr uuid;
  2539. public:
  2540. Semaphore &threadsem;
  2541. CAsyncJob(const char *_uuid,Semaphore &_threadsem)
  2542. : uuid(_uuid),threadsem(_threadsem)
  2543. {
  2544. thread = new cThread(this);
  2545. hash = hashc((const byte *)uuid.get(),uuid.length(),~0U);
  2546. }
  2547. ~CAsyncJob()
  2548. {
  2549. thread->join();
  2550. thread->Release();
  2551. }
  2552. static void destroy(CAsyncJob *j)
  2553. {
  2554. j->Release();
  2555. }
  2556. void start()
  2557. {
  2558. threadsem.wait();
  2559. thread->start();
  2560. }
  2561. void join()
  2562. {
  2563. thread->join();
  2564. }
  2565. static unsigned getHash(const char *key)
  2566. {
  2567. return hashc((const byte *)key,strlen(key),~0U);
  2568. }
  2569. static CAsyncJob* create(const char *key) { assertex(!"CAsyncJob::create not implemented"); return NULL; }
  2570. unsigned hash;
  2571. bool eq(const char *key)
  2572. {
  2573. return stricmp(key,uuid.get())==0;
  2574. }
  2575. virtual int run()=0;
  2576. virtual void setException(IException *e)=0;
  2577. virtual void setDone()=0;
  2578. };
  2579. class CAsyncCopySection: public CAsyncJob
  2580. {
  2581. Owned<IFile> src;
  2582. RemoteFilename dst;
  2583. offset_t toOfs;
  2584. offset_t fromOfs;
  2585. offset_t size;
  2586. CFPmode mode; // not yet supported
  2587. CriticalSection sect;
  2588. offset_t done;
  2589. offset_t total;
  2590. Semaphore finished;
  2591. AsyncCommandStatus status;
  2592. Owned<IException> exc;
  2593. public:
  2594. CAsyncCopySection(const char *_uuid, const char *fromFile, const char *toFile, offset_t _toOfs, offset_t _fromOfs, offset_t _size, Semaphore &threadsem)
  2595. : CAsyncJob(_uuid,threadsem)
  2596. {
  2597. status = ACScontinue;
  2598. src.setown(createIFile(fromFile));
  2599. dst.setRemotePath(toFile);
  2600. toOfs = _toOfs;
  2601. fromOfs = _fromOfs;
  2602. size = _size;
  2603. mode = CFPcontinue;
  2604. done = 0;
  2605. total = (offset_t)-1;
  2606. }
  2607. AsyncCommandStatus poll(offset_t &_done, offset_t &_total,unsigned timeout)
  2608. {
  2609. if (timeout&&finished.wait(timeout))
  2610. finished.signal(); // may need to call again
  2611. CriticalBlock block(sect);
  2612. if (exc)
  2613. throw exc.getClear();
  2614. _done = done;
  2615. _total = total;
  2616. return status;
  2617. }
  2618. int run()
  2619. {
  2620. class cProgress: implements ICopyFileProgress
  2621. {
  2622. CriticalSection &sect;
  2623. CFPmode &mode;
  2624. offset_t &done;
  2625. offset_t &total;
  2626. public:
  2627. cProgress(CriticalSection &_sect,offset_t &_done,offset_t &_total,CFPmode &_mode)
  2628. : sect(_sect), done(_done), total(_total), mode(_mode)
  2629. {
  2630. }
  2631. CFPmode onProgress(offset_t sizeDone, offset_t totalSize)
  2632. {
  2633. CriticalBlock block(sect);
  2634. done = sizeDone;
  2635. total = totalSize;
  2636. return mode;
  2637. }
  2638. } progress(sect,total,done,mode);
  2639. src->copySection(dst,toOfs, fromOfs, size, &progress); // exceptions will be handled by base class
  2640. return 0;
  2641. }
  2642. void setException(IException *e)
  2643. {
  2644. EXCLOG(e,"CAsyncCommandManager::CAsyncJob");
  2645. CriticalBlock block(sect);
  2646. if (exc.get())
  2647. e->Release();
  2648. else
  2649. exc.setown(e);
  2650. status = ACSerror;
  2651. }
  2652. void setDone()
  2653. {
  2654. CriticalBlock block(sect);
  2655. finished.signal();
  2656. status = ACSdone;
  2657. }
  2658. };
  2659. CMinHashTable<CAsyncJob> jobtable;
  2660. CriticalSection sect;
  2661. Semaphore threadsem;
  2662. public:
  2663. CAsyncCommandManager()
  2664. {
  2665. threadsem.signal(10); // max number of async jobs
  2666. }
  2667. void join()
  2668. {
  2669. CriticalBlock block(sect);
  2670. unsigned i;
  2671. CAsyncJob *j=jobtable.first(i);
  2672. while (j) {
  2673. j->join();
  2674. j=jobtable.next(i);
  2675. }
  2676. }
  2677. AsyncCommandStatus copySection(const char *uuid, const char *fromFile, const char *toFile, offset_t toOfs, offset_t fromOfs, offset_t size, offset_t &done, offset_t &total, unsigned timeout)
  2678. {
  2679. // return 0 if continuing, 1 if done
  2680. CAsyncCopySection * job;
  2681. Linked<CAsyncJob> cjob;
  2682. {
  2683. CriticalBlock block(sect);
  2684. cjob.set(jobtable.find(uuid,false));
  2685. if (cjob) {
  2686. job = QUERYINTERFACE(cjob.get(),CAsyncCopySection);
  2687. if (!job) {
  2688. throw MakeStringException(-1,"Async job ID mismatch");
  2689. }
  2690. }
  2691. else {
  2692. job = new CAsyncCopySection(uuid, fromFile, toFile, toOfs, fromOfs, size, threadsem);
  2693. cjob.setown(job);
  2694. jobtable.add(cjob.getLink());
  2695. cjob->start();
  2696. }
  2697. }
  2698. AsyncCommandStatus ret = ACSerror;
  2699. Owned<IException> rete;
  2700. try {
  2701. ret = job->poll(done,total,timeout);
  2702. }
  2703. catch (IException * e) {
  2704. rete.setown(e);
  2705. }
  2706. if ((ret!=ACScontinue)||rete.get()) {
  2707. job->join();
  2708. CriticalBlock block(sect);
  2709. jobtable.remove(job);
  2710. if (rete.get())
  2711. throw rete.getClear();
  2712. }
  2713. return ret;
  2714. }
  2715. };
  2716. //====================================================================================================
  2717. #define throwErr3(e,v,s) { StringBuffer msg; \
  2718. msg.appendf("ERROR: %s(%d) '%s'",#e,v,s?s:""); \
  2719. reply.append(e); reply.append(msg.str()); }
  2720. #define throwErr(e) { reply.append(e).append(#e); }
  2721. #define throwErr2(e,v) { StringBuffer tmp; tmp.append(#e).append(':').append(v); reply.append(e).append(tmp.str()); }
  2722. #define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
  2723. #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
  2724. #define MAPCOMMANDCLIENTTHROTTLER(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
  2725. static unsigned ClientCount = 0;
  2726. static unsigned MaxClientCount = 0;
  2727. static CriticalSection ClientCountSect;
  2728. class CRemoteFileServer : public CInterface, implements IRemoteFileServer, implements IThreadFactory
  2729. {
  2730. int lasthandle;
  2731. CriticalSection sect;
  2732. Owned<ISocket> acceptsock;
  2733. Owned<ISocket> rejectsock;//used to immediately reject nonsecure connection requests when in secure mode
  2734. Owned<ISocketSelectHandler> selecthandler;
  2735. Owned<IThreadPool> threads; // for commands
  2736. bool stopping;
  2737. unsigned clientcounttick;
  2738. unsigned closedclients;
  2739. CAsyncCommandManager asyncCommandManager;
  2740. Semaphore throttlesem;
  2741. atomic_t globallasttick;
  2742. int getNextHandle()
  2743. {
  2744. // called in sect critical block
  2745. loop {
  2746. if (lasthandle==INT_MAX)
  2747. lasthandle = 1;
  2748. else
  2749. lasthandle++;
  2750. unsigned idx1;
  2751. unsigned idx2;
  2752. if (!findHandle(lasthandle,idx1,idx2))
  2753. return lasthandle;
  2754. }
  2755. }
  2756. bool findHandle(int handle,unsigned &clientidx,unsigned &handleidx)
  2757. {
  2758. // called in sect critical block
  2759. clientidx = (unsigned)-1;
  2760. handleidx = (unsigned)-1;
  2761. ForEachItemIn(i,clients) {
  2762. CRemoteClientHandler &client = clients.item(i);
  2763. ForEachItemIn(j,client.handles) {
  2764. if (client.handles.item(j)==handle) {
  2765. handleidx = j;
  2766. clientidx = i;
  2767. return true;
  2768. }
  2769. }
  2770. }
  2771. return false;
  2772. }
  2773. struct CRemoteClientHandler: public CInterface, implements ISocketSelectNotify
  2774. {
  2775. CRemoteFileServer *parent;
  2776. Owned<ISocket> socket;
  2777. Owned<IAuthenticatedUser> user;
  2778. MemoryBuffer buf;
  2779. bool selecthandled;
  2780. size32_t left;
  2781. IArrayOf<IFileIO> openfiles; // kept in sync with handles
  2782. Owned<IDirectoryIterator> opendir;
  2783. StringAttrArray opennames; // for debug
  2784. IntArray handles;
  2785. unsigned lasttick, lastInactiveTick;
  2786. atomic_t &globallasttick;
  2787. unsigned previdx; // for debug
  2788. IMPLEMENT_IINTERFACE;
  2789. CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,IAuthenticatedUser *_user,atomic_t &_globallasttick)
  2790. : socket(_socket), user(_user), globallasttick(_globallasttick)
  2791. {
  2792. previdx = (unsigned)-1;
  2793. CriticalBlock block(ClientCountSect);
  2794. if (++ClientCount>MaxClientCount)
  2795. MaxClientCount = ClientCount;
  2796. if (TF_TRACE_CLIENT_CONN) {
  2797. StringBuffer s;
  2798. s.appendf("Connecting(%x) [%d,%d] to ",(unsigned)(long)this,ClientCount,MaxClientCount);
  2799. peerName(s);
  2800. PROGLOG("%s",s.str());
  2801. }
  2802. parent = _parent;
  2803. left = 0;
  2804. buf.setEndian(__BIG_ENDIAN);
  2805. selecthandled = false;
  2806. touch();
  2807. }
  2808. ~CRemoteClientHandler()
  2809. {
  2810. {
  2811. CriticalBlock block(ClientCountSect);
  2812. ClientCount--;
  2813. if (TF_TRACE_CLIENT_CONN) {
  2814. PROGLOG("Disconnecting(%x) [%d,%d] ",(unsigned)(long)this,ClientCount,MaxClientCount);
  2815. }
  2816. }
  2817. ISocket *sock = socket.getClear();
  2818. try {
  2819. sock->Release();
  2820. }
  2821. catch (IException *e) {
  2822. EXCLOG(e,"~CRemoteClientHandler");
  2823. e->Release();
  2824. }
  2825. }
  2826. bool notifySelected(ISocket *sock,unsigned selected)
  2827. {
  2828. if (TF_TRACE_FULL)
  2829. PROGLOG("notifySelected(%x)",(unsigned)(long)this);
  2830. if (sock!=socket)
  2831. WARNLOG("notifySelected - invalid socket passed");
  2832. size32_t avail = (size32_t)socket->avail_read();
  2833. if (avail)
  2834. touch();
  2835. if (left==0) {
  2836. try {
  2837. left = avail?receiveBufferSize(socket):0;
  2838. }
  2839. catch (IException *e) {
  2840. EXCLOG(e,"notifySelected(1)");
  2841. e->Release();
  2842. left = 0;
  2843. }
  2844. if (left) {
  2845. avail = (size32_t)socket->avail_read();
  2846. try {
  2847. buf.ensureCapacity(left);
  2848. }
  2849. catch (IException *e) {
  2850. EXCLOG(e,"notifySelected(2)");
  2851. e->Release();
  2852. left = 0;
  2853. // if too big then corrupted packet so read avail to try and consume
  2854. char fbuf[1024];
  2855. while (avail) {
  2856. size32_t rd = avail>sizeof(fbuf)?sizeof(fbuf):avail;
  2857. try {
  2858. socket->read(fbuf, rd); // don't need timeout here
  2859. avail -= rd;
  2860. }
  2861. catch (IException *e) {
  2862. EXCLOG(e,"notifySelected(2) flush");
  2863. e->Release();
  2864. break;
  2865. }
  2866. }
  2867. avail = 0;
  2868. left = 0;
  2869. }
  2870. }
  2871. }
  2872. size32_t toread = left>avail?avail:left;
  2873. if (toread) {
  2874. try {
  2875. socket->read(buf.reserve(toread), toread); // don't need timeout here
  2876. }
  2877. catch (IException *e) {
  2878. EXCLOG(e,"notifySelected(3)");
  2879. e->Release();
  2880. toread = left;
  2881. buf.clear();
  2882. }
  2883. }
  2884. if (TF_TRACE_FULL)
  2885. PROGLOG("notifySelected %d,%d",toread,left);
  2886. if ((left!=0)&&(avail==0)) {
  2887. WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
  2888. toread = left;
  2889. buf.clear();
  2890. }
  2891. left -= toread;
  2892. if (left==0) {
  2893. // DEBUG
  2894. parent->notify(this);
  2895. }
  2896. return false;
  2897. }
  2898. void logPrevHandle()
  2899. {
  2900. if (previdx<opennames.ordinality())
  2901. PROGLOG("Previous handle(%d): %s",handles.item(previdx),opennames.item(previdx).text.get());
  2902. }
  2903. void processCommand()
  2904. {
  2905. CThrottler throttler(parent->throttleSem());
  2906. MemoryBuffer reply;
  2907. RemoteFileCommandType cmd;
  2908. buf.read(cmd);
  2909. parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, &throttler);
  2910. buf.clear();
  2911. sendBuffer(socket, reply);
  2912. }
  2913. bool immediateCommand() // returns false if socket closed or failure
  2914. {
  2915. try {
  2916. buf.clear();
  2917. touch();
  2918. size32_t avail = (size32_t)socket->avail_read();
  2919. if (avail==0)
  2920. return false;
  2921. receiveBuffer(socket,buf, 5); // shouldn't timeout as data is available
  2922. touch();
  2923. if (buf.length()==0)
  2924. return false;
  2925. processCommand();
  2926. }
  2927. catch (IException *e) {
  2928. EXCLOG(e,"CRemoteClientHandler::immediateCommand");
  2929. e->Release();
  2930. buf.clear();
  2931. return false;
  2932. }
  2933. return true;
  2934. }
  2935. void process()
  2936. {
  2937. if (selecthandled)
  2938. processCommand(); // buffer already filled
  2939. else {
  2940. while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) { // if too many threads add to select handler
  2941. int w = socket->wait_read(1000);
  2942. if (w==0)
  2943. break;
  2944. if ((w<0)||!immediateCommand()) {
  2945. if (w<0)
  2946. WARNLOG("CRemoteClientHandler::main wait_read error");
  2947. parent->onCloseSocket(this,1);
  2948. return;
  2949. }
  2950. }
  2951. selecthandled = true;
  2952. parent->addClient(this); // add to select handler
  2953. }
  2954. }
  2955. bool timedOut()
  2956. {
  2957. return (msTick()-lasttick)>CLIENT_TIMEOUT;
  2958. }
  2959. bool inactiveTimedOut()
  2960. {
  2961. unsigned ms = msTick();
  2962. if ((ms-lastInactiveTick)>CLIENT_INACTIVEWARNING_TIMEOUT)
  2963. {
  2964. lastInactiveTick = ms;
  2965. return true;
  2966. }
  2967. return false;
  2968. }
  2969. void touch()
  2970. {
  2971. lastInactiveTick = lasttick = msTick();
  2972. atomic_set(&globallasttick,lasttick);
  2973. }
  2974. bool peerName(StringBuffer &buf)
  2975. {
  2976. if (socket) {
  2977. char name[256];
  2978. name[0] = 0;
  2979. int port = socket->peer_name(name,sizeof(name)-1);
  2980. if (port>=0) {
  2981. buf.append(name);
  2982. if (port)
  2983. buf.append(':').append(port);
  2984. return true;
  2985. }
  2986. }
  2987. return false;
  2988. }
  2989. bool getInfo(StringBuffer &str)
  2990. {
  2991. str.append("client(");
  2992. bool ok = peerName(str);
  2993. unsigned ms = msTick();
  2994. str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
  2995. ForEachItemIn(i,handles) {
  2996. str.appendf("\n %d: ",handles.item(i));
  2997. str.append(opennames.item(i).text);
  2998. }
  2999. return ok;
  3000. }
  3001. };
  3002. class cCommandProcessor: public CInterface, implements IPooledThread
  3003. {
  3004. Owned<CRemoteClientHandler> client;
  3005. public:
  3006. IMPLEMENT_IINTERFACE;
  3007. struct cCommandProcessorParams
  3008. {
  3009. CRemoteClientHandler *client;
  3010. };
  3011. void init(void *_params)
  3012. {
  3013. cCommandProcessorParams &params = *(cCommandProcessorParams *)_params;
  3014. client.setown(params.client);
  3015. }
  3016. void main()
  3017. {
  3018. // idea is that initially we process commands inline then pass over to select handler
  3019. try {
  3020. client->process();
  3021. }
  3022. catch (IException *e) {
  3023. // suppress some errors
  3024. EXCLOG(e,"cCommandProcessor::main");
  3025. e->Release();
  3026. }
  3027. try {
  3028. client.clear();
  3029. }
  3030. catch (IException *e) {
  3031. // suppress some more errors clearing client
  3032. EXCLOG(e,"cCommandProcessor::main(2)");
  3033. }
  3034. }
  3035. bool stop()
  3036. {
  3037. return true;
  3038. }
  3039. bool canReuse()
  3040. {
  3041. return false; // want to free owned osocke
  3042. }
  3043. };
  3044. IArrayOf<CRemoteClientHandler> clients;
  3045. class cImpersonateBlock
  3046. {
  3047. CRemoteClientHandler &client;
  3048. public:
  3049. cImpersonateBlock(CRemoteClientHandler &_client)
  3050. : client(_client)
  3051. {
  3052. if (client.user.get()) {
  3053. if (TF_TRACE)
  3054. PROGLOG("Impersonate user: %s",client.user->username());
  3055. client.user->impersonate();
  3056. }
  3057. }
  3058. ~cImpersonateBlock()
  3059. {
  3060. if (client.user.get()) {
  3061. if (TF_TRACE)
  3062. PROGLOG("Stop impersonating user: %s",client.user->username());
  3063. client.user->revert();
  3064. }
  3065. }
  3066. };
  3067. #define IMPERSONATE_USER(client) cImpersonateBlock ublock(client)
  3068. public:
  3069. IMPLEMENT_IINTERFACE
  3070. CRemoteFileServer()
  3071. {
  3072. throttlesem.signal(10);
  3073. lasthandle = 0;
  3074. selecthandler.setown(createSocketSelectHandler(NULL));
  3075. threads.setown(createThreadPool("CRemoteFileServerPool",this,NULL,MAX_THREADS,60*1000,
  3076. #ifdef __64BIT__
  3077. 0, // Unlimited stack size
  3078. #else
  3079. 0x10000,
  3080. #endif
  3081. INFINITE,TARGET_MIN_THREADS));
  3082. stopping = false;
  3083. clientcounttick = msTick();
  3084. closedclients = 0;
  3085. atomic_set(&globallasttick,msTick());
  3086. }
  3087. ~CRemoteFileServer()
  3088. {
  3089. #ifdef _DEBUG
  3090. PROGLOG("Exiting CRemoteFileServer");
  3091. #endif
  3092. asyncCommandManager.join();
  3093. clients.kill();
  3094. #ifdef _DEBUG
  3095. PROGLOG("Exited CRemoteFileServer");
  3096. #endif
  3097. }
  3098. //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
  3099. // should throw a different exception
  3100. bool checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
  3101. {
  3102. CriticalBlock block(sect);
  3103. fileio = NULL;
  3104. if (handle<=0) {
  3105. throwErr(RFSERR_NullFileIOHandle);
  3106. return false;
  3107. }
  3108. unsigned clientidx;
  3109. unsigned handleidx;
  3110. if (findHandle(handle,clientidx,handleidx)) {
  3111. CRemoteClientHandler &client = clients.item(clientidx);
  3112. if (del) {
  3113. client.handles.remove(handleidx);
  3114. client.openfiles.remove(handleidx);
  3115. client.opennames.remove(handleidx);
  3116. client.previdx = (unsigned)-1;
  3117. }
  3118. else {
  3119. fileio = &client.openfiles.item(handleidx);
  3120. client.previdx = handleidx;
  3121. }
  3122. return true;
  3123. }
  3124. throwErr(RFSERR_InvalidFileIOHandle);
  3125. return false;
  3126. }
  3127. void onCloseSocket(CRemoteClientHandler *client, int which)
  3128. {
  3129. if (!client)
  3130. return;
  3131. CriticalBlock block(sect);
  3132. #ifdef _DEBUG
  3133. StringBuffer s;
  3134. client->peerName(s);
  3135. PROGLOG("onCloseSocket(%d) %s",which,s.str());
  3136. #endif
  3137. if (client->socket) {
  3138. try {
  3139. selecthandler->remove(client->socket);
  3140. }
  3141. catch (IException *e) {
  3142. EXCLOG(e,"CRemoteFileServer::onCloseSocket.1");
  3143. e->Release();
  3144. }
  3145. }
  3146. try {
  3147. clients.zap(*client);
  3148. }
  3149. catch (IException *e) {
  3150. EXCLOG(e,"CRemoteFileServer::onCloseSocket.2");
  3151. e->Release();
  3152. }
  3153. }
  3154. bool cmdOpenFileIO(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3155. {
  3156. IMPERSONATE_USER(client);
  3157. Owned<StringAttrItem> name = new StringAttrItem;
  3158. byte mode;
  3159. byte share;
  3160. msg.read(name->text).read(mode).read(share);
  3161. // also try to recv extra byte
  3162. byte extra = 0;
  3163. if (msg.remaining() >= sizeof(byte))
  3164. msg.read(extra);
  3165. IFEflags extraFlags = (IFEflags)extra;
  3166. // none => nocache for remote (hint)
  3167. // can revert to previous behavior with conf file setting "allow_pgcache_flush=false"
  3168. if (extraFlags == IFEnone)
  3169. extraFlags = IFEnocache;
  3170. try {
  3171. Owned<IFile> file = createIFile(name->text);
  3172. switch ((compatIFSHmode)share) {
  3173. case compatIFSHnone:
  3174. file->setCreateFlags(S_IRUSR|S_IWUSR);
  3175. file->setShareMode(IFSHnone);
  3176. break;
  3177. case compatIFSHread:
  3178. file->setShareMode(IFSHread);
  3179. break;
  3180. case compatIFSHwrite:
  3181. file->setShareMode(IFSHfull);
  3182. break;
  3183. case compatIFSHexec:
  3184. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
  3185. break;
  3186. case compatIFSHall:
  3187. file->setCreateFlags(S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); // bit excessive
  3188. file->setShareMode(IFSHfull);
  3189. break;
  3190. }
  3191. if (TF_TRACE_PRE_IO)
  3192. PROGLOG("before open file '%s', (%d,%d,%d)",name->text.get(),(int)mode,(int)share,extraFlags);
  3193. IFileIO *fileio = file->open((IFOmode)mode,extraFlags);
  3194. int handle;
  3195. if (fileio) {
  3196. CriticalBlock block(sect);
  3197. handle = getNextHandle();
  3198. client.previdx = client.opennames.ordinality();
  3199. client.handles.append(handle);
  3200. client.openfiles.append(*fileio);
  3201. client.opennames.append(*name.getLink());
  3202. }
  3203. else
  3204. handle = 0;
  3205. reply.append(RFEnoerror);
  3206. reply.append(handle);
  3207. if (TF_TRACE)
  3208. PROGLOG("open file '%s', (%d,%d) handle = %d",name->text.get(),(int)mode,(int)share,handle);
  3209. return true;
  3210. }
  3211. catch (IException *e)
  3212. {
  3213. StringBuffer s;
  3214. e->errorMessage(s);
  3215. throwErr3(RFSERR_OpenFailed,e->errorCode(),s.str());
  3216. e->Release();
  3217. }
  3218. return false;
  3219. }
  3220. bool cmdCloseFileIO(MemoryBuffer & msg, MemoryBuffer & reply)
  3221. {
  3222. int handle;
  3223. msg.read(handle);
  3224. IFileIO *fileio;
  3225. if (!checkFileIOHandle(reply, handle, fileio, true))
  3226. return false;
  3227. if (TF_TRACE)
  3228. PROGLOG("close file, handle = %d",handle);
  3229. reply.append(RFEnoerror);
  3230. return true;
  3231. }
  3232. bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply)
  3233. {
  3234. int handle;
  3235. __int64 pos;
  3236. size32_t len;
  3237. msg.read(handle).read(pos).read(len);
  3238. IFileIO *fileio;
  3239. if (!checkFileIOHandle(reply, handle, fileio))
  3240. return false;
  3241. //arrange it so we read directly into the reply buffer...
  3242. unsigned posOfErr = reply.length();
  3243. reply.append((unsigned)RFEnoerror);
  3244. size32_t numRead;
  3245. unsigned posOfLength = reply.length();
  3246. if (TF_TRACE_PRE_IO)
  3247. PROGLOG("before read file, handle = %d, toread = %d",handle,len);
  3248. void * data;
  3249. {
  3250. reply.reserve(sizeof(numRead));
  3251. data = reply.reserve(len);
  3252. }
  3253. try {
  3254. numRead = fileio->read(pos,len,data);
  3255. }
  3256. catch (IException *e)
  3257. {
  3258. reply.setWritePos(posOfErr);
  3259. StringBuffer s;
  3260. e->errorMessage(s);
  3261. throwErr3(RFSERR_ReadFailed,e->errorCode(),s.str());
  3262. e->Release();
  3263. return false;
  3264. }
  3265. if (TF_TRACE)
  3266. PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
  3267. {
  3268. reply.setLength(posOfLength + sizeof(numRead) + numRead);
  3269. reply.writeEndianDirect(posOfLength,sizeof(numRead),&numRead);
  3270. }
  3271. return true;
  3272. }
  3273. bool cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3274. {
  3275. int handle;
  3276. msg.read(handle);
  3277. IFileIO *fileio;
  3278. if (!checkFileIOHandle(reply, handle, fileio))
  3279. return false;
  3280. __int64 size = fileio->size();
  3281. reply.append((unsigned)RFEnoerror).append(size);
  3282. if (TF_TRACE)
  3283. PROGLOG("size file, handle = %d, size = %" I64F "d",handle,size);
  3284. return true;
  3285. }
  3286. bool cmdSetSize(MemoryBuffer & msg, MemoryBuffer & reply)
  3287. {
  3288. int handle;
  3289. offset_t size;
  3290. msg.read(handle).read(size);
  3291. IFileIO *fileio;
  3292. if (TF_TRACE)
  3293. PROGLOG("set size file, handle = %d, size = %" I64F "d",handle,size);
  3294. if (!checkFileIOHandle(reply, handle, fileio))
  3295. return false;
  3296. fileio->setSize(size);
  3297. reply.append((unsigned)RFEnoerror);
  3298. return true;
  3299. }
  3300. bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply)
  3301. {
  3302. int handle;
  3303. __int64 pos;
  3304. size32_t len;
  3305. msg.read(handle).read(pos).read(len);
  3306. IFileIO *fileio;
  3307. if (!checkFileIOHandle(reply, handle, fileio))
  3308. return false;
  3309. const byte *data = (const byte *)msg.readDirect(len);
  3310. try {
  3311. if (TF_TRACE_PRE_IO)
  3312. PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
  3313. size32_t numWritten = fileio->write(pos,len,data);
  3314. if (TF_TRACE)
  3315. PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
  3316. reply.append((unsigned)RFEnoerror).append(numWritten);
  3317. return true;
  3318. }
  3319. catch (IException *e)
  3320. {
  3321. StringBuffer s;
  3322. e->errorMessage(s);
  3323. throwErr3(RFSERR_WriteFailed,e->errorCode(),s.str());
  3324. e->Release();
  3325. }
  3326. return false;
  3327. }
  3328. bool cmdExists(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3329. {
  3330. IMPERSONATE_USER(client);
  3331. StringAttr name;
  3332. msg.read(name);
  3333. if (TF_TRACE)
  3334. PROGLOG("exists, '%s'",name.get());
  3335. Owned<IFile> file=createIFile(name);
  3336. try {
  3337. bool e = file->exists();
  3338. reply.append((unsigned)RFEnoerror).append(e);
  3339. return true;
  3340. }
  3341. catch (IException *e)
  3342. {
  3343. StringBuffer s;
  3344. e->errorMessage(s);
  3345. throwErr3(RFSERR_ExistsFailed,e->errorCode(),s.str());
  3346. e->Release();
  3347. }
  3348. return false;
  3349. }
  3350. bool cmdRemove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3351. {
  3352. IMPERSONATE_USER(client);
  3353. StringAttr name;
  3354. msg.read(name);
  3355. if (TF_TRACE)
  3356. PROGLOG("remove, '%s'",name.get());
  3357. Owned<IFile> file=createIFile(name);
  3358. try {
  3359. bool e = file->remove();
  3360. reply.append((unsigned)RFEnoerror).append(e);
  3361. return true;
  3362. }
  3363. catch (IException *e)
  3364. {
  3365. StringBuffer s;
  3366. e->errorMessage(s);
  3367. throwErr3(RFSERR_RemoveFailed,e->errorCode(),s.str());
  3368. e->Release();
  3369. }
  3370. return false;
  3371. }
  3372. bool cmdGetVer(MemoryBuffer & msg, MemoryBuffer & reply)
  3373. {
  3374. if (TF_TRACE)
  3375. PROGLOG("getVer");
  3376. if (msg.getPos()+sizeof(unsigned)>msg.length())
  3377. reply.append((unsigned)RFEnoerror);
  3378. else
  3379. reply.append((unsigned)FILESRV_VERSION+0x10000);
  3380. reply.append(VERSTRING);
  3381. return true;
  3382. }
  3383. bool cmdRename(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3384. {
  3385. IMPERSONATE_USER(client);
  3386. StringAttr fromname;
  3387. msg.read(fromname);
  3388. StringAttr toname;
  3389. msg.read(toname);
  3390. if (TF_TRACE)
  3391. PROGLOG("rename, '%s' to '%s'",fromname.get(),toname.get());
  3392. Owned<IFile> file=createIFile(fromname);
  3393. try {
  3394. file->rename(toname);
  3395. reply.append((unsigned)RFEnoerror);
  3396. return true;
  3397. }
  3398. catch (IException *e)
  3399. {
  3400. StringBuffer s;
  3401. e->errorMessage(s);
  3402. throwErr3(RFSERR_RenameFailed,e->errorCode(),s.str());
  3403. e->Release();
  3404. }
  3405. return false;
  3406. }
  3407. bool cmdMove(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3408. {
  3409. IMPERSONATE_USER(client);
  3410. StringAttr fromname;
  3411. msg.read(fromname);
  3412. StringAttr toname;
  3413. msg.read(toname);
  3414. if (TF_TRACE)
  3415. PROGLOG("move, '%s' to '%s'",fromname.get(),toname.get());
  3416. Owned<IFile> file=createIFile(fromname);
  3417. try {
  3418. file->move(toname);
  3419. reply.append((unsigned)RFEnoerror);
  3420. return true;
  3421. }
  3422. catch (IException *e)
  3423. {
  3424. StringBuffer s;
  3425. e->errorMessage(s);
  3426. throwErr3(RFSERR_MoveFailed,e->errorCode(),s.str());
  3427. e->Release();
  3428. }
  3429. return false;
  3430. }
  3431. bool cmdCopy(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3432. {
  3433. IMPERSONATE_USER(client);
  3434. StringAttr fromname;
  3435. msg.read(fromname);
  3436. StringAttr toname;
  3437. msg.read(toname);
  3438. if (TF_TRACE)
  3439. PROGLOG("copy, '%s' to '%s'",fromname.get(),toname.get());
  3440. try {
  3441. copyFile(toname, fromname);
  3442. reply.append((unsigned)RFEnoerror);
  3443. return true;
  3444. }
  3445. catch (IException *e)
  3446. {
  3447. StringBuffer s;
  3448. e->errorMessage(s);
  3449. throwErr3(RFSERR_CopyFailed,e->errorCode(),s.str());
  3450. e->Release();
  3451. }
  3452. return false;
  3453. }
  3454. bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
  3455. {
  3456. IMPERSONATE_USER(client);
  3457. int handle;
  3458. __int64 pos;
  3459. __int64 len;
  3460. StringAttr srcname;
  3461. msg.read(handle).read(srcname).read(pos).read(len);
  3462. IFileIO *fileio;
  3463. if (!checkFileIOHandle(reply, handle, fileio))
  3464. return false;
  3465. try {
  3466. Owned<IFile> file = createIFile(srcname.get());
  3467. __int64 written = fileio->appendFile(file,pos,len);
  3468. if (TF_TRACE)
  3469. PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
  3470. reply.append((unsigned)RFEnoerror).append(written);
  3471. return true;
  3472. }
  3473. catch (IException *e)
  3474. {
  3475. StringBuffer s;
  3476. e->errorMessage(s);
  3477. throwErr3(RFSERR_AppendFailed,e->errorCode(),s.str());
  3478. e->Release();
  3479. }
  3480. return false;
  3481. }
  3482. bool cmdIsFile(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3483. {
  3484. IMPERSONATE_USER(client);
  3485. StringAttr name;
  3486. msg.read(name);
  3487. if (TF_TRACE)
  3488. PROGLOG("isFile, '%s'",name.get());
  3489. Owned<IFile> file=createIFile(name);
  3490. try {
  3491. unsigned ret = (unsigned)file->isFile();
  3492. reply.append((unsigned)RFEnoerror).append(ret);
  3493. return true;
  3494. }
  3495. catch (IException *e)
  3496. {
  3497. StringBuffer s;
  3498. e->errorMessage(s);
  3499. throwErr3(RFSERR_IsFileFailed,e->errorCode(),s.str());
  3500. e->Release();
  3501. }
  3502. return false;
  3503. }
  3504. bool cmdIsDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3505. {
  3506. IMPERSONATE_USER(client);
  3507. StringAttr name;
  3508. msg.read(name);
  3509. if (TF_TRACE)
  3510. PROGLOG("isDir, '%s'",name.get());
  3511. Owned<IFile> file=createIFile(name);
  3512. try {
  3513. unsigned ret = (unsigned)file->isDirectory();
  3514. reply.append((unsigned)RFEnoerror).append(ret);
  3515. return true;
  3516. }
  3517. catch (IException *e)
  3518. {
  3519. StringBuffer s;
  3520. e->errorMessage(s);
  3521. throwErr3(RFSERR_IsDirectoryFailed,e->errorCode(),s.str());
  3522. e->Release();
  3523. }
  3524. return false;
  3525. }
  3526. bool cmdIsReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3527. {
  3528. IMPERSONATE_USER(client);
  3529. StringAttr name;
  3530. msg.read(name);
  3531. if (TF_TRACE)
  3532. PROGLOG("isReadOnly, '%s'",name.get());
  3533. Owned<IFile> file=createIFile(name);
  3534. try {
  3535. unsigned ret = (unsigned)file->isReadOnly();
  3536. reply.append((unsigned)RFEnoerror).append(ret);
  3537. return true;
  3538. }
  3539. catch (IException *e)
  3540. {
  3541. StringBuffer s;
  3542. e->errorMessage(s);
  3543. throwErr3(RFSERR_IsReadOnlyFailed,e->errorCode(),s.str());
  3544. e->Release();
  3545. }
  3546. return false;
  3547. }
  3548. bool cmdSetReadOnly(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3549. {
  3550. IMPERSONATE_USER(client);
  3551. StringAttr name;
  3552. bool set;
  3553. msg.read(name).read(set);
  3554. if (TF_TRACE)
  3555. PROGLOG("setReadOnly, '%s' %d",name.get(),(int)set);
  3556. Owned<IFile> file=createIFile(name);
  3557. try {
  3558. file->setReadOnly(set);
  3559. reply.append((unsigned)RFEnoerror);
  3560. return true;
  3561. }
  3562. catch (IException *e)
  3563. {
  3564. StringBuffer s;
  3565. e->errorMessage(s);
  3566. throwErr3(RFSERR_SetReadOnlyFailed,e->errorCode(),s.str());
  3567. e->Release();
  3568. }
  3569. return false;
  3570. }
  3571. bool cmdGetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3572. {
  3573. IMPERSONATE_USER(client);
  3574. StringAttr name;
  3575. msg.read(name);
  3576. if (TF_TRACE)
  3577. PROGLOG("getTime, '%s'",name.get());
  3578. Owned<IFile> file=createIFile(name);
  3579. CDateTime createTime;
  3580. CDateTime modifiedTime;
  3581. CDateTime accessedTime;
  3582. try {
  3583. bool ret = file->getTime(&createTime,&modifiedTime,&accessedTime);
  3584. reply.append((unsigned)RFEnoerror).append(ret);
  3585. if (ret) {
  3586. createTime.serialize(reply);
  3587. modifiedTime.serialize(reply);
  3588. accessedTime.serialize(reply);
  3589. }
  3590. return true;
  3591. }
  3592. catch (IException *e)
  3593. {
  3594. StringBuffer s;
  3595. e->errorMessage(s);
  3596. throwErr3(RFSERR_GetTimeFailed,e->errorCode(),s.str());
  3597. e->Release();
  3598. }
  3599. return false;
  3600. }
  3601. bool cmdSetTime(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3602. {
  3603. IMPERSONATE_USER(client);
  3604. StringAttr name;
  3605. bool creategot;
  3606. CDateTime createTime;
  3607. bool modifiedgot;
  3608. CDateTime modifiedTime;
  3609. bool accessedgot;
  3610. CDateTime accessedTime;
  3611. msg.read(name);
  3612. msg.read(creategot);
  3613. if (creategot)
  3614. createTime.deserialize(msg);
  3615. msg.read(modifiedgot);
  3616. if (modifiedgot)
  3617. modifiedTime.deserialize(msg);
  3618. msg.read(accessedgot);
  3619. if (accessedgot)
  3620. accessedTime.deserialize(msg);
  3621. if (TF_TRACE)
  3622. PROGLOG("setTime, '%s'",name.get());
  3623. Owned<IFile> file=createIFile(name);
  3624. try {
  3625. bool ret = file->setTime(creategot?&createTime:NULL,modifiedgot?&modifiedTime:NULL,accessedgot?&accessedTime:NULL);
  3626. reply.append((unsigned)RFEnoerror).append(ret);
  3627. return true;
  3628. }
  3629. catch (IException *e)
  3630. {
  3631. StringBuffer s;
  3632. e->errorMessage(s);
  3633. throwErr3(RFSERR_SetTimeFailed,e->errorCode(),s.str());
  3634. e->Release();
  3635. }
  3636. return false;
  3637. }
  3638. bool cmdCreateDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3639. {
  3640. IMPERSONATE_USER(client);
  3641. StringAttr name;
  3642. msg.read(name);
  3643. if (TF_TRACE)
  3644. PROGLOG("CreateDir, '%s'",name.get());
  3645. Owned<IFile> dir=createIFile(name);
  3646. try {
  3647. bool ret = dir->createDirectory();
  3648. reply.append((unsigned)RFEnoerror).append(ret);
  3649. return true;
  3650. }
  3651. catch (IException *e)
  3652. {
  3653. StringBuffer s;
  3654. e->errorMessage(s);
  3655. throwErr3(RFSERR_CreateDirFailed,e->errorCode(),s.str());
  3656. e->Release();
  3657. }
  3658. return false;
  3659. }
  3660. bool cmdGetDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3661. {
  3662. IMPERSONATE_USER(client);
  3663. StringAttr name;
  3664. StringAttr mask;
  3665. bool includedir;
  3666. bool sub;
  3667. byte stream = 0;
  3668. msg.read(name).read(mask).read(includedir).read(sub);
  3669. if (msg.remaining()>=sizeof(byte)) {
  3670. msg.read(stream);
  3671. if (stream==1)
  3672. client.opendir.clear();
  3673. }
  3674. if (TF_TRACE)
  3675. PROGLOG("GetDir, '%s', '%s'",name.get(),mask.get());
  3676. try {
  3677. Owned<IFile> dir=createIFile(name);
  3678. Owned<IDirectoryIterator> iter;
  3679. if (stream>1)
  3680. iter.set(client.opendir);
  3681. else {
  3682. iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
  3683. if (stream != 0)
  3684. client.opendir.set(iter);
  3685. }
  3686. if (!iter) {
  3687. reply.append((unsigned)RFSERR_GetDirFailed);
  3688. return false;
  3689. }
  3690. reply.append((unsigned)RFEnoerror);
  3691. if (CRemoteDirectoryIterator::serialize(reply,iter,stream?0x100000:0,stream<2)) {
  3692. if (stream != 0)
  3693. client.opendir.clear();
  3694. }
  3695. else {
  3696. bool cont=true;
  3697. reply.append(cont);
  3698. }
  3699. return true;
  3700. }
  3701. catch (IException *e)
  3702. {
  3703. StringBuffer s;
  3704. e->errorMessage(s);
  3705. throwErr3(RFSERR_GetDirFailed,e->errorCode(),s.str());
  3706. e->Release();
  3707. }
  3708. return false;
  3709. }
  3710. bool cmdMonitorDir(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3711. {
  3712. IMPERSONATE_USER(client);
  3713. StringAttr name;
  3714. StringAttr mask;
  3715. bool includedir;
  3716. bool sub;
  3717. unsigned checkinterval;
  3718. unsigned timeout;
  3719. __int64 cancelid; // not yet used
  3720. msg.read(name).read(mask).read(includedir).read(sub).read(checkinterval).read(timeout).read(cancelid);
  3721. byte isprev;
  3722. msg.read(isprev);
  3723. Owned<IDirectoryIterator> prev;
  3724. if (isprev==1) {
  3725. SocketEndpoint ep;
  3726. CRemoteDirectoryIterator *di = new CRemoteDirectoryIterator(ep,name);
  3727. di->appendBuf(msg);
  3728. prev.setown(di);
  3729. }
  3730. if (TF_TRACE)
  3731. PROGLOG("MonitorDir, '%s' '%s'",name.get(),mask.get());
  3732. try {
  3733. Owned<IFile> dir=createIFile(name);
  3734. Owned<IDirectoryDifferenceIterator> iter=dir->monitorDirectory(prev,mask.length()?mask.get():NULL,sub,includedir,checkinterval,timeout);
  3735. reply.append((unsigned)RFEnoerror);
  3736. byte state = (iter.get()==NULL)?0:1;
  3737. reply.append(state);
  3738. if (state==1)
  3739. CRemoteDirectoryIterator::serializeDiff(reply,iter);
  3740. return true;
  3741. }
  3742. catch (IException *e)
  3743. {
  3744. StringBuffer s;
  3745. e->errorMessage(s);
  3746. throwErr3(RFSERR_GetDirFailed,e->errorCode(),s.str());
  3747. e->Release();
  3748. }
  3749. return false;
  3750. }
  3751. bool cmdCopySection(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3752. {
  3753. IMPERSONATE_USER(client);
  3754. StringAttr uuid;
  3755. StringAttr fromFile;
  3756. StringAttr toFile;
  3757. offset_t toOfs;
  3758. offset_t fromOfs;
  3759. offset_t size;
  3760. offset_t sizeDone=0;
  3761. offset_t totalSize=(offset_t)-1;
  3762. unsigned timeout;
  3763. msg.read(uuid).read(fromFile).read(toFile).read(toOfs).read(fromOfs).read(size).read(timeout);
  3764. try {
  3765. AsyncCommandStatus status = asyncCommandManager.copySection(uuid,fromFile,toFile,toOfs,fromOfs,size,sizeDone,totalSize,timeout);
  3766. reply.append((unsigned)RFEnoerror).append((unsigned)status).append(sizeDone).append(totalSize);
  3767. }
  3768. catch (IException *e)
  3769. {
  3770. StringBuffer s;
  3771. e->errorMessage(s);
  3772. throwErr3(RFSERR_CopySectionFailed,e->errorCode(),s.str());
  3773. e->Release();
  3774. }
  3775. return true;
  3776. }
  3777. bool cmdTreeCopy(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler, bool usetmp=false)
  3778. {
  3779. IMPERSONATE_USER(client);
  3780. RemoteFilename src;
  3781. src.deserialize(msg);
  3782. RemoteFilename dst;
  3783. dst.deserialize(msg);
  3784. StringAttr net;
  3785. StringAttr mask;
  3786. msg.read(net).read(mask);
  3787. try {
  3788. IpAddress ip;
  3789. treeCopyFile(src,dst,net,mask,ip,usetmp,throttler);
  3790. unsigned status = 0;
  3791. reply.append((unsigned)RFEnoerror).append((unsigned)status);
  3792. ip.ipserialize(reply);
  3793. }
  3794. catch (IException *e)
  3795. {
  3796. StringBuffer s;
  3797. e->errorMessage(s);
  3798. throwErr3(RFSERR_TreeCopyFailed,e->errorCode(),s.str());
  3799. e->Release();
  3800. }
  3801. return true;
  3802. }
  3803. bool cmdTreeCopyTmp(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CThrottler *throttler)
  3804. {
  3805. return cmdTreeCopy(msg, reply, client, throttler, true);
  3806. }
  3807. bool cmdGetCRC(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3808. {
  3809. IMPERSONATE_USER(client);
  3810. StringAttr name;
  3811. msg.read(name);
  3812. if (TF_TRACE)
  3813. PROGLOG("getCRC, '%s'",name.get());
  3814. Owned<IFile> file=createIFile(name);
  3815. try {
  3816. unsigned ret = file->getCRC();
  3817. reply.append((unsigned)RFEnoerror).append(ret);
  3818. return true;
  3819. }
  3820. catch (IException *e)
  3821. {
  3822. StringBuffer s;
  3823. e->errorMessage(s);
  3824. throwErr3(RFSERR_GetCrcFailed,e->errorCode(),s.str());
  3825. e->Release();
  3826. }
  3827. return false;
  3828. }
  3829. bool cmdStop(MemoryBuffer &msg, MemoryBuffer &reply)
  3830. {
  3831. PROGLOG("Abort request received");
  3832. stopping = true;
  3833. if (acceptsock)
  3834. acceptsock->cancel_accept();
  3835. if (rejectsock)
  3836. rejectsock->cancel_accept();
  3837. reply.append((unsigned)RFEnoerror);
  3838. return false;
  3839. }
  3840. bool cmdExec(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3841. {
  3842. StringAttr cmdline;
  3843. StringAttr workdir;
  3844. bool sync;
  3845. bool hasoutput;
  3846. size32_t insize;
  3847. MemoryAttr inbuf;
  3848. msg.read(cmdline).read(workdir).read(sync).read(hasoutput).read(insize);
  3849. if (insize)
  3850. msg.read(insize, inbuf.allocate(insize));
  3851. Owned<IPipeProcess> pipe = createPipeProcess();
  3852. int retcode=-1;
  3853. HANDLE phandle=(HANDLE)0;
  3854. MemoryBuffer outbuf;
  3855. if (pipe->run("EXEC",cmdline,workdir,insize!=0,hasoutput)) {
  3856. if (insize) {
  3857. pipe->write(insize, inbuf.get());
  3858. pipe->closeInput();
  3859. }
  3860. if (hasoutput) {
  3861. byte buf[4096];
  3862. loop {
  3863. size32_t read = pipe->read(sizeof(buf),buf);
  3864. if (!read)
  3865. break;
  3866. outbuf.append(read,buf);
  3867. }
  3868. }
  3869. if (sync)
  3870. retcode = pipe->wait();
  3871. else {
  3872. phandle = pipe->getProcessHandle();
  3873. retcode = 0;
  3874. }
  3875. }
  3876. size32_t outsz = outbuf.length();
  3877. reply.append(retcode).append((unsigned)phandle).append(outsz);
  3878. if (outsz)
  3879. reply.append(outbuf);
  3880. return true;
  3881. }
  3882. bool cmdSetTrace(MemoryBuffer &msg, MemoryBuffer &reply)
  3883. {
  3884. byte flags;
  3885. msg.read(flags);
  3886. int retcode=-1;
  3887. if (flags!=255) { // escape
  3888. retcode = traceFlags;
  3889. traceFlags = flags;
  3890. }
  3891. reply.append(retcode);
  3892. return true;
  3893. }
  3894. bool cmdGetInfo(MemoryBuffer &msg, MemoryBuffer &reply)
  3895. {
  3896. StringBuffer retstr;
  3897. int retcode = getInfo(retstr);
  3898. reply.append(retcode).append(retstr.str());
  3899. return true;
  3900. }
  3901. bool cmdFirewall(MemoryBuffer &msg, MemoryBuffer &reply)
  3902. {
  3903. // TBD
  3904. StringBuffer retstr;
  3905. int retcode = getInfo(retstr);
  3906. reply.append(retcode).append(retstr.str());
  3907. return true;
  3908. }
  3909. bool cmdExtractBlobElements(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
  3910. {
  3911. IMPERSONATE_USER(client);
  3912. try {
  3913. StringAttr prefix;
  3914. StringAttr filename;
  3915. msg.read(prefix).read(filename);
  3916. RemoteFilename rfn;
  3917. rfn.setLocalPath(filename);
  3918. ExtractedBlobArray extracted;
  3919. extractBlobElements(prefix, rfn, extracted);
  3920. unsigned n = extracted.ordinality();
  3921. reply.append((unsigned)RFEnoerror).append(n);
  3922. for (unsigned i=0;i<n;i++)
  3923. extracted.item(i).serialize(reply);
  3924. }
  3925. catch (IException *e) {
  3926. StringBuffer s;
  3927. e->errorMessage(s);
  3928. throwErr3(RFSERR_ExtractBlobElementsFailed,e->errorCode(),s.str());
  3929. e->Release();
  3930. }
  3931. return true;
  3932. }
  3933. bool cmdRedeploy(MemoryBuffer &msg, MemoryBuffer &reply)
  3934. {
  3935. return false; // TBD
  3936. }
  3937. bool cmdKill(MemoryBuffer & msg, MemoryBuffer & reply)
  3938. {
  3939. // TBD
  3940. throwErr2(RFSERR_InvalidCommand,(unsigned)RFCkill);
  3941. return false;
  3942. }
  3943. bool cmdUnknown(MemoryBuffer & msg, MemoryBuffer & reply,RemoteFileCommandType cmd)
  3944. {
  3945. throwErr2(RFSERR_InvalidCommand,(unsigned)cmd);
  3946. return false;
  3947. }
  3948. bool cmdUnlock(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
  3949. {
  3950. // this is an attempt to authenticate when we haven't got authentication turned on
  3951. StringBuffer s;
  3952. client.peerName(s);
  3953. if (TF_TRACE_CLIENT_STATS)
  3954. PROGLOG("Connect from %s",s.str());
  3955. throwErr2(RFSERR_InvalidCommand,(unsigned)RFCunlock);
  3956. return false;
  3957. }
  3958. bool dispatchCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
  3959. {
  3960. bool ret = true;
  3961. switch(cmd) {
  3962. MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
  3963. MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
  3964. MAPCOMMAND(RFCread, cmdRead);
  3965. MAPCOMMAND(RFCsize, cmdSize);
  3966. MAPCOMMAND(RFCwrite, cmdWrite);
  3967. MAPCOMMANDCLIENT(RFCexists, cmdExists, *client);
  3968. MAPCOMMANDCLIENT(RFCremove, cmdRemove, *client);
  3969. MAPCOMMANDCLIENT(RFCrename, cmdRename, *client);
  3970. MAPCOMMAND(RFCgetver, cmdGetVer);
  3971. MAPCOMMANDCLIENT(RFCisfile, cmdIsFile, *client);
  3972. MAPCOMMANDCLIENT(RFCisdirectory, cmdIsDir, *client);
  3973. MAPCOMMANDCLIENT(RFCisreadonly, cmdIsReadOnly, *client);
  3974. MAPCOMMANDCLIENT(RFCsetreadonly, cmdSetReadOnly, *client);
  3975. MAPCOMMANDCLIENT(RFCgettime, cmdGetTime, *client);
  3976. MAPCOMMANDCLIENT(RFCsettime, cmdSetTime, *client);
  3977. MAPCOMMANDCLIENT(RFCcreatedir, cmdCreateDir, *client);
  3978. MAPCOMMANDCLIENT(RFCgetdir, cmdGetDir, *client);
  3979. MAPCOMMANDCLIENT(RFCmonitordir, cmdMonitorDir, *client);
  3980. MAPCOMMAND(RFCstop, cmdStop);
  3981. MAPCOMMANDCLIENT(RFCexec, cmdExec, *client);
  3982. MAPCOMMANDCLIENT(RFCextractblobelements, cmdExtractBlobElements, *client);
  3983. MAPCOMMAND(RFCkill, cmdKill);
  3984. MAPCOMMAND(RFCredeploy, cmdRedeploy); // only Windows
  3985. MAPCOMMANDCLIENT(RFCgetcrc, cmdGetCRC, *client);
  3986. MAPCOMMANDCLIENT(RFCmove, cmdMove, *client);
  3987. MAPCOMMANDCLIENT(RFCcopy, cmdCopy, *client);
  3988. MAPCOMMANDCLIENT(RFCappend, cmdAppend, *client);
  3989. MAPCOMMAND(RFCsetsize, cmdSetSize);
  3990. MAPCOMMAND(RFCsettrace, cmdSetTrace);
  3991. MAPCOMMAND(RFCgetinfo, cmdGetInfo);
  3992. MAPCOMMAND(RFCfirewall, cmdFirewall);
  3993. MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
  3994. MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
  3995. MAPCOMMANDCLIENTTHROTTLER(RFCtreecopy, cmdTreeCopy, *client, throttler);
  3996. MAPCOMMANDCLIENTTHROTTLER(RFCtreecopytmp, cmdTreeCopyTmp, *client, throttler);
  3997. default:
  3998. ret = cmdUnknown(msg,reply,cmd);
  3999. }
  4000. if (!ret) { // append error string
  4001. if (reply.length()>=sizeof(unsigned)*2) {
  4002. reply.reset();
  4003. unsigned z;
  4004. unsigned e;
  4005. reply.read(z).read(e);
  4006. StringBuffer err("ERR(");
  4007. err.append(e).append(") ");
  4008. if (client&&(client->peerName(err)))
  4009. err.append(" : ");
  4010. if (e&&(reply.getPos()<reply.length())) {
  4011. StringAttr es;
  4012. reply.read(es);
  4013. err.append(" : ").append(es);
  4014. }
  4015. reply.reset();
  4016. if (cmd!=RFCunlock)
  4017. PROGLOG("%s",err.str()); // supress authentication logging
  4018. if (client)
  4019. client->logPrevHandle();
  4020. }
  4021. }
  4022. return ret;
  4023. }
  4024. virtual IPooledThread *createNew()
  4025. {
  4026. return new cCommandProcessor();
  4027. }
  4028. void run(SocketEndpoint &listenep, bool useSSL)
  4029. {
  4030. if (listenep.isNull())
  4031. acceptsock.setown(ISocket::create(listenep.port));
  4032. else {
  4033. StringBuffer ips;
  4034. listenep.getIpText(ips);
  4035. acceptsock.setown(ISocket::create_ip(listenep.port,ips.str()));
  4036. }
  4037. if (useSSL) {
  4038. if (!securitySettings.certificate)
  4039. throw createDafsException(DAFSERR_connection_failed,"SSL Certificate information not found in environment.conf");
  4040. if (listenep.port <= 0)
  4041. {
  4042. assertex(FALSE);//should never get here
  4043. listenep.port = securitySettings.daFileSrvPort;
  4044. }
  4045. //Create unsecure socket to reject non-ssl client requests
  4046. if (listenep.isNull())
  4047. rejectsock.setown(ISocket::create(DAFILESRV_PORT));
  4048. else
  4049. {
  4050. StringBuffer ips;
  4051. listenep.getIpText(ips);
  4052. rejectsock.setown(ISocket::create_ip(DAFILESRV_PORT,ips.str()));
  4053. }
  4054. }
  4055. #ifdef _DEBUG
  4056. StringBuffer sb;
  4057. listenep.getUrlStr(sb);
  4058. DBGLOG("Server accepting %sfrom %s", useSSL?"SECURE ":"", sb.str());
  4059. #endif
  4060. selecthandler->start();
  4061. UnsignedArray readSocks;
  4062. if (useSSL)
  4063. {
  4064. readSocks.append(acceptsock->OShandle());
  4065. readSocks.append(rejectsock->OShandle());
  4066. }
  4067. loop {
  4068. Owned<ISocket> sock;
  4069. bool sockavail = false;
  4070. try {
  4071. if (!useSSL)
  4072. sockavail = acceptsock->wait_read(1000*60*1)!=0;
  4073. else
  4074. {
  4075. UnsignedArray waitingSocks;
  4076. //SSL Enabled. Listen for non SSL connection on DAFILESRV_PORT and reject them
  4077. int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
  4078. if (numReady)
  4079. {
  4080. for (int idx = 0; idx < numReady; idx++)
  4081. {
  4082. if (waitingSocks.item(idx) == rejectsock->OShandle())
  4083. {
  4084. //Unsecure connection attemped, reject !
  4085. Owned<ISocket> s;
  4086. s.setown(rejectsock->accept(true));
  4087. IpAddress ip;
  4088. StringBuffer sb;
  4089. s->getPeerAddress(ip);
  4090. ip.getIpText(sb);
  4091. DBGLOG("Rejecting nonsecure connect from %s",sb.str());
  4092. s->close();
  4093. }
  4094. else
  4095. {
  4096. sockavail = true;
  4097. }
  4098. }
  4099. }
  4100. }
  4101. #if 0
  4102. if (!sockavail) {
  4103. JSocketStatistics stats;
  4104. getSocketStatistics(stats);
  4105. StringBuffer s;
  4106. getSocketStatisticsString(stats,s);
  4107. PROGLOG( "Socket statistics : \n%s\n",s.str());
  4108. }
  4109. #endif
  4110. }
  4111. catch (IException *e) {
  4112. EXCLOG(e,"CRemoteFileServer(1)");
  4113. e->Release();
  4114. // not sure what to do so just accept
  4115. sockavail = true;
  4116. }
  4117. if (stopping)
  4118. break;
  4119. if (sockavail) {
  4120. try {
  4121. sock.setown(acceptsock->accept(true));
  4122. if (useSSL)
  4123. {
  4124. Owned<ISecureSocket> ssock = createSecureSocket(sock.getClear(), ServerSocket);
  4125. int status = ssock->secure_accept();
  4126. if (status < 0)
  4127. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection");
  4128. sock.setown(ssock.getLink());
  4129. }
  4130. if (!sock||stopping)
  4131. break;
  4132. }
  4133. catch (IException *e) {
  4134. EXCLOG(e,"CRemoteFileServer");
  4135. e->Release();
  4136. break;
  4137. }
  4138. runClient(sock.getClear());
  4139. }
  4140. else
  4141. checkTimeout();
  4142. }
  4143. if (TF_TRACE_CLIENT_STATS)
  4144. PROGLOG("CRemoteFileServer:run exiting");
  4145. selecthandler->stop(true);
  4146. }
  4147. void processUnauthenticatedCommand(RemoteFileCommandType cmd, ISocket *socket, MemoryBuffer &msg)
  4148. {
  4149. // these are unauthenticated commands
  4150. if (cmd != RFCgetver)
  4151. cmd = RFCinvalid;
  4152. MemoryBuffer reply;
  4153. dispatchCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
  4154. sendBuffer(socket, reply);
  4155. }
  4156. bool checkAuthentication(ISocket *socket, IAuthenticatedUser *&ret)
  4157. {
  4158. ret = NULL;
  4159. if (!AuthenticationEnabled)
  4160. return true;
  4161. MemoryBuffer reqbuf;
  4162. MemoryBuffer reply;
  4163. MemoryBuffer encbuf; // because aesEncrypt clears input
  4164. initSendBuffer(reply);
  4165. receiveBuffer(socket, reqbuf, 1);
  4166. RemoteFileCommandType typ=0;
  4167. if (reqbuf.remaining()<sizeof(RemoteFileCommandType))
  4168. return false;
  4169. reqbuf.read(typ);
  4170. if (typ!=RFCunlock) {
  4171. processUnauthenticatedCommand(typ,socket,reqbuf);
  4172. return false;
  4173. }
  4174. if (reqbuf.remaining()<sizeof(OnceKey))
  4175. return false;
  4176. OnceKey oncekey;
  4177. reqbuf.read(sizeof(oncekey),&oncekey);
  4178. IpAddress ip;
  4179. socket->getPeerAddress(ip);
  4180. byte ipdata[16];
  4181. size32_t ipds = ip.getNetAddress(sizeof(ipdata),&ipdata);
  4182. mergeOnce(oncekey,sizeof(ipdata),&ipdata); // this is clients key
  4183. OnceKey mykey;
  4184. genOnce(mykey);
  4185. reply.append((unsigned)0); // errcode
  4186. aesEncrypt(&oncekey,sizeof(oncekey),&mykey,sizeof(oncekey),encbuf);
  4187. reply.append(encbuf.length()).append(encbuf);
  4188. sendBuffer(socket, reply); // send my oncekey
  4189. reqbuf.clear();
  4190. receiveBuffer(socket, reqbuf, 1);
  4191. if (reqbuf.remaining()>sizeof(RemoteFileCommandType)+sizeof(size32_t)) {
  4192. reqbuf.read(typ);
  4193. if (typ==RFCunlockreply) {
  4194. size32_t bs;
  4195. reqbuf.read(bs);
  4196. if (bs<=reqbuf.remaining()) {
  4197. MemoryBuffer userbuf;
  4198. aesDecrypt(&mykey,sizeof(mykey),reqbuf.readDirect(bs),bs,userbuf);
  4199. byte n;
  4200. userbuf.read(n);
  4201. if (n>=2) {
  4202. StringAttr user;
  4203. StringAttr password;
  4204. userbuf.read(user).read(password);
  4205. Owned<IAuthenticatedUser> iau = createAuthenticatedUser();
  4206. if (iau->login(user,password)) {
  4207. initSendBuffer(reply.clear());
  4208. reply.append((unsigned)0);
  4209. sendBuffer(socket, reply); // send OK
  4210. ret = iau;
  4211. return true;
  4212. }
  4213. }
  4214. }
  4215. }
  4216. }
  4217. reply.clear();
  4218. throwErr(RFSERR_AuthenticateFailed);
  4219. sendBuffer(socket, reply); // send OK
  4220. return false;
  4221. }
  4222. void runClient(ISocket *sock)
  4223. {
  4224. cCommandProcessor::cCommandProcessorParams params;
  4225. IAuthenticatedUser *user=NULL;
  4226. bool authenticated = false;
  4227. try {
  4228. if (checkAuthentication(sock,user))
  4229. authenticated = true;
  4230. }
  4231. catch (IException *e) {
  4232. e->Release();
  4233. }
  4234. if (!authenticated) {
  4235. try {
  4236. sock->Release();
  4237. }
  4238. catch (IException *e) {
  4239. e->Release();
  4240. }
  4241. return;
  4242. }
  4243. params.client = new CRemoteClientHandler(this,sock,user,globallasttick);
  4244. {
  4245. CriticalBlock block(sect);
  4246. params.client->Link();
  4247. clients.append(*params.client);
  4248. }
  4249. threads->start(&params);
  4250. }
  4251. void stop()
  4252. {
  4253. // stop accept loop
  4254. if (TF_TRACE_CLIENT_STATS)
  4255. PROGLOG("CRemoteFileServer::stop");
  4256. if (acceptsock)
  4257. acceptsock->cancel_accept();
  4258. if (rejectsock)
  4259. rejectsock->cancel_accept();
  4260. threads->stopAll();
  4261. threads->joinAll(true,60*1000);
  4262. }
  4263. bool notify(CRemoteClientHandler *_client)
  4264. {
  4265. Linked<CRemoteClientHandler> client;
  4266. client.set(_client);
  4267. if (TF_TRACE_FULL)
  4268. PROGLOG("notify %d",client->buf.length());
  4269. if (client->buf.length()) {
  4270. cCommandProcessor::cCommandProcessorParams params;
  4271. params.client = client.getClear();
  4272. threads->start(&params);
  4273. }
  4274. else
  4275. onCloseSocket(client,3); // removes owned handles
  4276. return false;
  4277. }
  4278. void addClient(CRemoteClientHandler *client)
  4279. {
  4280. if (client&&client->socket)
  4281. selecthandler->add(client->socket,SELECTMODE_READ,client);
  4282. }
  4283. void checkTimeout()
  4284. {
  4285. if (msTick()-clientcounttick>1000*60*60) {
  4286. CriticalBlock block(ClientCountSect);
  4287. if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
  4288. PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
  4289. clientcounttick = msTick();
  4290. MaxClientCount = ClientCount;
  4291. if (closedclients) {
  4292. if (TF_TRACE_CLIENT_STATS)
  4293. PROGLOG("Closed client count = %d",closedclients);
  4294. closedclients = 0;
  4295. }
  4296. }
  4297. CriticalBlock block(sect);
  4298. ForEachItemInRev(i,clients) {
  4299. CRemoteClientHandler &client = clients.item(i);
  4300. if (client.timedOut()) {
  4301. StringBuffer s;
  4302. bool ok = client.getInfo(s); // will spot duff sockets
  4303. if (ok&&(client.handles.ordinality()!=0)) {
  4304. if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
  4305. WARNLOG("Inactive %s",s.str());
  4306. }
  4307. else {
  4308. #ifndef _DEBUG
  4309. if (TF_TRACE_CLIENT_CONN)
  4310. #endif
  4311. PROGLOG("Timing out %s",s.str());
  4312. closedclients++;
  4313. onCloseSocket(&client,4); // removes owned handles
  4314. }
  4315. }
  4316. }
  4317. }
  4318. int getInfo(StringBuffer &str)
  4319. {
  4320. {
  4321. CriticalBlock block(ClientCountSect);
  4322. str.append(VERSTRING).append('\n');
  4323. str.appendf("Client count = %d\n",ClientCount);
  4324. str.appendf("Max client count = %d",MaxClientCount);
  4325. }
  4326. CriticalBlock block(sect);
  4327. ForEachItemIn(i,clients) {
  4328. str.append('\n').append(i).append(": ");
  4329. clients.item(i).getInfo(str);
  4330. }
  4331. return 0;
  4332. }
  4333. unsigned threadRunningCount()
  4334. {
  4335. if (!threads)
  4336. return 0;
  4337. return threads->runningCount();
  4338. }
  4339. Semaphore &throttleSem()
  4340. {
  4341. return throttlesem;
  4342. }
  4343. unsigned idleTime()
  4344. {
  4345. unsigned t = (unsigned)atomic_read(&globallasttick);
  4346. return msTick()-t;
  4347. }
  4348. };
  4349. IRemoteFileServer * createRemoteFileServer()
  4350. {
  4351. #if SIMULATE_PACKETLOSS
  4352. errorSimulationOn = false;
  4353. #endif
  4354. return new CRemoteFileServer();
  4355. }