ccdfile.cpp 142 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "udptopo.hpp"
  24. #include "ccd.hpp"
  25. #include "ccdfile.hpp"
  26. #include "ccdquery.hpp"
  27. #include "ccdstate.hpp"
  28. #include "ccdsnmp.hpp"
  29. #include "rmtfile.hpp"
  30. #include "ccdqueue.ipp"
  31. #include "ccdcache.hpp"
  32. #if defined(__linux__) || defined(__APPLE__)
  33. #include <sys/mman.h>
  34. #endif
  35. #if defined (__linux__)
  36. #include <sys/syscall.h>
  37. #include "ioprio.h"
  38. #endif
  39. #include "thorcommon.hpp"
  40. #include "eclhelper_dyn.hpp"
  41. #include "rtldynfield.hpp"
  42. std::atomic<unsigned> numFilesOpen[2];
  43. #define MAX_READ_RETRIES 2
  44. #ifdef _DEBUG
  45. //#define FAIL_20_READ
  46. //#define FAIL_20_OPEN
  47. #endif
  48. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  49. class DECL_EXCEPTION NotYetOpenException : implements IException, public CInterface
  50. {
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. virtual int errorCode() const { return 0; }
  54. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  55. virtual MessageAudience errorAudience() const { return MSGAUD_programmer; }
  56. };
  57. class CFailingFileIO : implements IFileIO, public CInterface
  58. {
  59. #define THROWNOTOPEN throw new NotYetOpenException()
  60. public:
  61. IMPLEMENT_IINTERFACE;
  62. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  63. virtual offset_t size() { THROWNOTOPEN; }
  64. virtual void flush() { THROWNOTOPEN; }
  65. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  66. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  67. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  68. virtual void close() { }
  69. virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
  70. } failure;
  71. class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public CInterface
  72. {
  73. protected:
  74. IArrayOf<IFile> sources;
  75. Owned<IFile> logical;
  76. Owned<IFileIO> current;
  77. Owned<IMemoryMappedFile> mmapped;
  78. mutable CriticalSection crit;
  79. offset_t fileSize;
  80. unsigned currentIdx;
  81. unsigned lastAccess;
  82. CDateTime fileDate;
  83. bool copying = false;
  84. bool isCompressed = false;
  85. bool remote = false;
  86. IRoxieFileCache *cached = nullptr;
  87. unsigned fileIdx = 0;
  88. unsigned crc = 0;
  89. CRuntimeStatisticCollection fileStats;
  90. #ifdef FAIL_20_READ
  91. unsigned readCount;
  92. #endif
  93. public:
  94. IMPLEMENT_IINTERFACE;
  95. CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed, unsigned _crc)
  96. : logical(_logical), fileSize(size), isCompressed(_isCompressed), crc(_crc), fileStats(diskLocalStatistics)
  97. {
  98. fileDate.set(_date);
  99. currentIdx = 0;
  100. current.set(&failure);
  101. #ifdef FAIL_20_READ
  102. readCount = 0;
  103. #endif
  104. lastAccess = msTick();
  105. }
  106. ~CRoxieLazyFileIO()
  107. {
  108. setFailure(); // ensures the open file count properly maintained
  109. }
  110. virtual void beforeDispose()
  111. {
  112. if (cached)
  113. cached->removeCache(this);
  114. }
  115. virtual unsigned getFileIdx() const override
  116. {
  117. return fileIdx;
  118. }
  119. virtual unsigned getCrc() const override
  120. {
  121. return crc;
  122. }
  123. void setCache(IRoxieFileCache *cache, unsigned _fileIdx)
  124. {
  125. assertex(!cached);
  126. cached = cache;
  127. fileIdx = _fileIdx;
  128. }
  129. void removeCache(const IRoxieFileCache *cache)
  130. {
  131. assertex(cached==cache);
  132. cached = NULL;
  133. }
  134. inline void setRemote(bool _remote) { remote = _remote; }
  135. virtual void setCopying(bool _copying)
  136. {
  137. CriticalBlock b(crit);
  138. if (remote && currentIdx)
  139. {
  140. // The current location is not our preferred location. Recheck whether we can now access our preferred location
  141. setFailure();
  142. currentIdx = 0;
  143. _checkOpen();
  144. }
  145. copying = _copying;
  146. }
  147. virtual void dump() const
  148. {
  149. CriticalBlock b(crit);
  150. DBGLOG("LazyFileIO object %s has %d sources:", queryFilename(), sources.ordinality());
  151. ForEachItemIn(idx, sources)
  152. {
  153. DBGLOG("%c %s", idx==currentIdx ? '*' : ' ', sources.item(idx).queryFilename());
  154. }
  155. }
  156. virtual bool isCopying() const
  157. {
  158. CriticalBlock b(crit);
  159. return copying;
  160. }
  161. virtual bool isOpen() const
  162. {
  163. CriticalBlock b(crit);
  164. return current.get() != &failure;
  165. }
  166. virtual unsigned getLastAccessed() const
  167. {
  168. CriticalBlock b(crit);
  169. return lastAccess;
  170. }
  171. virtual void close()
  172. {
  173. CriticalBlock b(crit);
  174. setFailure();
  175. }
  176. virtual bool isRemote()
  177. {
  178. CriticalBlock b(crit);
  179. return remote;
  180. }
  181. void setFailure()
  182. {
  183. try
  184. {
  185. if (current.get()==&failure)
  186. return;
  187. numFilesOpen[remote]--;
  188. mergeStats(fileStats, current);
  189. current.set(&failure);
  190. }
  191. catch (IException *E)
  192. {
  193. if (traceLevel > 5)
  194. {
  195. StringBuffer s;
  196. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  197. }
  198. E->Release();
  199. }
  200. }
  201. void checkOpen()
  202. {
  203. CriticalBlock b(crit);
  204. _checkOpen();
  205. }
  206. IFileIO *getCheckOpen(unsigned &activeIdx)
  207. {
  208. CriticalBlock b(crit);
  209. _checkOpen();
  210. activeIdx = currentIdx;
  211. return LINK(current);
  212. }
  213. void _checkOpen()
  214. {
  215. if (current.get() == &failure)
  216. {
  217. StringBuffer filesTried;
  218. unsigned tries = 0;
  219. bool firstTime = true;
  220. RoxieFileStatus fileStatus = FileNotFound;
  221. for (;;)
  222. {
  223. if (currentIdx >= sources.length())
  224. currentIdx = 0;
  225. if (tries==sources.length())
  226. {
  227. if (firstTime) // if first time - reset and try again
  228. {
  229. firstTime = false;
  230. tries = 0;
  231. }
  232. else
  233. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  234. }
  235. const char *sourceName = sources.item(currentIdx).queryFilename();
  236. if (traceLevel > 10)
  237. DBGLOG("Trying to open %s", sourceName);
  238. try
  239. {
  240. #ifdef FAIL_20_OPEN
  241. openCount++;
  242. if ((openCount % 5) == 0)
  243. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  244. #endif
  245. IFile *f = &sources.item(currentIdx);
  246. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, isCompressed, false);
  247. if (fileStatus == FileIsValid)
  248. {
  249. if (isCompressed)
  250. current.setown(createCompressedFileReader(f));
  251. else
  252. current.setown(f->open(IFOread));
  253. if (current)
  254. {
  255. if (traceLevel > 5)
  256. DBGLOG("Opening %s", sourceName);
  257. if (useRemoteResources)
  258. disconnectRemoteIoOnExit(current);
  259. break;
  260. }
  261. // throwUnexpected(); - try another location if this one has the wrong version of the file
  262. }
  263. if (useRemoteResources)
  264. disconnectRemoteFile(f);
  265. }
  266. catch (IException *E)
  267. {
  268. E->Release();
  269. }
  270. currentIdx++;
  271. tries++;
  272. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  273. {
  274. filesTried.appendf(" %s", sourceName); // only need to build this list once
  275. switch (fileStatus)
  276. {
  277. case FileNotFound:
  278. filesTried.append(": FileNotFound");
  279. break;
  280. case FileSizeMismatch:
  281. filesTried.append(": FileSizeMismatch");
  282. break;
  283. case FileDateMismatch:
  284. filesTried.append(": FileDateMismatch");
  285. break;
  286. }
  287. }
  288. }
  289. lastAccess = msTick();
  290. if (++numFilesOpen[remote] > maxFilesOpen[remote])
  291. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  292. }
  293. }
  294. virtual void addSource(IFile *newSource)
  295. {
  296. if (newSource)
  297. {
  298. if (traceLevel > 10)
  299. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  300. CriticalBlock b(crit);
  301. sources.append(*newSource);
  302. }
  303. }
  304. virtual size32_t read(offset_t pos, size32_t len, void * data)
  305. {
  306. unsigned activeIdx;
  307. Owned<IFileIO> active = getCheckOpen(activeIdx);
  308. unsigned tries = 0;
  309. for (;;)
  310. {
  311. try
  312. {
  313. size32_t ret = active->read(pos, len, data);
  314. lastAccess = msTick();
  315. if (cached && !remote)
  316. cached->noteRead(fileIdx, pos, ret);
  317. return ret;
  318. }
  319. catch (NotYetOpenException *E)
  320. {
  321. E->Release();
  322. }
  323. catch (IException *E)
  324. {
  325. EXCLOG(MCoperatorError, E, "Read error");
  326. E->Release();
  327. OERRLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(activeIdx).queryFilename());
  328. {
  329. CriticalBlock b(crit);
  330. if (currentIdx == activeIdx)
  331. {
  332. currentIdx = activeIdx+1;
  333. setFailure();
  334. }
  335. }
  336. }
  337. active.setown(getCheckOpen(activeIdx));
  338. tries++;
  339. if (tries == MAX_READ_RETRIES)
  340. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(activeIdx).queryFilename(), tries);
  341. }
  342. }
  343. virtual void flush()
  344. {
  345. Linked<IFileIO> active;
  346. {
  347. CriticalBlock b(crit);
  348. active.set(current);
  349. }
  350. if (active.get() != &failure)
  351. active->flush();
  352. }
  353. virtual offset_t size()
  354. {
  355. unsigned activeIdx;
  356. Owned<IFileIO> active = getCheckOpen(activeIdx);
  357. lastAccess = msTick();
  358. return active->size();
  359. }
  360. virtual unsigned __int64 getStatistic(StatisticKind kind)
  361. {
  362. unsigned __int64 v = fileStats.getStatisticValue(kind);
  363. CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
  364. return v + current->getStatistic(kind);
  365. }
  366. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  367. virtual void setSize(offset_t size) { throwUnexpected(); }
  368. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  369. virtual const char *queryFilename() const { return logical->queryFilename(); }
  370. virtual bool isAliveAndLink() const { return CInterface::isAliveAndLink(); }
  371. virtual IMemoryMappedFile *getMappedFile() override
  372. {
  373. CriticalBlock b(crit);
  374. if (mmapped)
  375. return mmapped.getLink();
  376. if (!remote)
  377. {
  378. mmapped.setown(logical->openMemoryMapped());
  379. return mmapped.getLink();
  380. }
  381. return nullptr;
  382. }
  383. virtual IFileIO *getFileIO() override
  384. {
  385. return LINK(this);
  386. }
  387. virtual bool createHardFileLink()
  388. {
  389. unsigned tries = 0;
  390. for (;;)
  391. {
  392. StringBuffer filesTried;
  393. if (currentIdx >= sources.length())
  394. currentIdx = 0;
  395. if (tries==sources.length())
  396. return false;
  397. const char *sourceName = sources.item(currentIdx).queryFilename();
  398. filesTried.appendf(" %s", sourceName);
  399. try
  400. {
  401. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, isCompressed) == FileIsValid)
  402. {
  403. StringBuffer source_drive;
  404. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  405. StringBuffer query_drive;
  406. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  407. // only try to create link if on the same drive
  408. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  409. {
  410. try
  411. {
  412. DBGLOG("Trying to create Hard Link for %s", sourceName);
  413. createHardLink(logical->queryFilename(), sourceName);
  414. current.setown(sources.item(currentIdx).open(IFOread));
  415. return true;
  416. }
  417. catch(IException *E)
  418. {
  419. StringBuffer err;
  420. OERRLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  421. E->Release();
  422. }
  423. }
  424. }
  425. }
  426. catch (IException *E)
  427. {
  428. E->Release();
  429. }
  430. currentIdx++;
  431. tries++;
  432. }
  433. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  434. return false; // if we get here - no hardlink
  435. }
  436. void copyComplete()
  437. {
  438. CriticalBlock b(crit);
  439. setFailure(); // lazyOpen will then reopen it...
  440. currentIdx = 0;
  441. remote = false;
  442. copying = false;
  443. sources.kill();
  444. sources.add(*logical.getLink(), 0);
  445. if (!lazyOpen)
  446. _checkOpen();
  447. }
  448. bool checkCopyComplete()
  449. {
  450. CriticalBlock b(crit);
  451. if (logical->exists()) // MORE - do we need to check data/size etc? do we have the info to do so?
  452. {
  453. copyComplete();
  454. return true;
  455. }
  456. return false;
  457. }
  458. virtual IFile *querySource()
  459. {
  460. CriticalBlock b(crit);
  461. _checkOpen();
  462. return &sources.item(currentIdx);
  463. };
  464. virtual IFile *queryTarget() { return logical; }
  465. virtual offset_t getSize() { return fileSize; }
  466. virtual CDateTime *queryDateTime() { return &fileDate; }
  467. static int compareAccess(IInterface * const *L, IInterface * const *R)
  468. {
  469. ILazyFileIO *LL = (ILazyFileIO *) *L;
  470. ILazyFileIO *RR = (ILazyFileIO *) *R;
  471. return LL->getLastAccessed() - RR->getLastAccessed();
  472. }
  473. };
  474. //----------------------------------------------------------------------------------------------
  475. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  476. {
  477. if (!remoteFDesc)
  478. return NULL;
  479. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  480. if (!remotePDesc)
  481. return NULL;
  482. unsigned int crc, remoteCrc;
  483. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  484. return remotePDesc;
  485. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  486. return remotePDesc;
  487. return NULL;
  488. }
  489. static int getClusterPriority(const char *clusterName)
  490. {
  491. assertex(preferredClusters);
  492. int *priority = preferredClusters->getValue(clusterName);
  493. return priority ? *priority : 100;
  494. }
  495. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const StringArray &fromClusters, bool includeFromCluster)
  496. {
  497. if (traceRemoteFiles)
  498. {
  499. StringBuffer s;
  500. ForEachItemIn(ifc, fromClusters)
  501. {
  502. const char *fromCluster = fromClusters.item(ifc);
  503. if (s.length())
  504. s.append('/');
  505. s.append(fromCluster);
  506. }
  507. DBGLOG("appendRemoteLocations lfn=%s fromCluster=%s, includeFromCluster=%s", nullText(localFileName), s.str(), boolToStr(includeFromCluster));
  508. }
  509. IFileDescriptor &fdesc = pdesc->queryOwner();
  510. unsigned numCopies = pdesc->numCopies();
  511. unsigned lastClusterNo = (unsigned) -1;
  512. unsigned numThisCluster = 0;
  513. unsigned initialSize = locations.length();
  514. int priority = 0;
  515. IntArray priorities;
  516. for (unsigned copy = 0; copy < numCopies; copy++)
  517. {
  518. unsigned clusterNo = pdesc->copyClusterNum(copy);
  519. StringBuffer clusterName;
  520. fdesc.getClusterGroupName(clusterNo, clusterName);
  521. if (traceRemoteFiles)
  522. DBGLOG("appendRemoteLocations found entry in cluster %s", clusterName.str());
  523. if (fromClusters.length())
  524. {
  525. bool matches = fromClusters.contains(clusterName);
  526. if (matches!=includeFromCluster)
  527. continue;
  528. }
  529. RemoteFilename r;
  530. pdesc->getFilename(copy,r);
  531. StringBuffer path;
  532. r.getRemotePath(path);
  533. if (localFileName && r.isLocal())
  534. {
  535. StringBuffer l;
  536. r.getLocalPath(l);
  537. if (streq(l, localFileName))
  538. continue; // don't add ourself
  539. }
  540. if (clusterNo == lastClusterNo)
  541. {
  542. numThisCluster++;
  543. if (numThisCluster > 2) // Don't add more than 2 from one cluster
  544. continue;
  545. }
  546. else
  547. {
  548. numThisCluster = 1;
  549. lastClusterNo = clusterNo;
  550. if (preferredClusters)
  551. {
  552. priority = getClusterPriority(clusterName);
  553. }
  554. else
  555. priority = copy;
  556. }
  557. if (priority >= 0)
  558. {
  559. ForEachItemIn(idx, priorities)
  560. {
  561. if (priorities.item(idx) < priority)
  562. break;
  563. }
  564. priorities.add(priority, idx);
  565. locations.add(path.str(), idx+initialSize);
  566. if (traceRemoteFiles)
  567. DBGLOG("appendRemoteLocations adding location %s at position %u", path.str(), idx+initialSize);
  568. }
  569. }
  570. }
  571. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
  572. {
  573. StringArray fromClusters;
  574. if (!isEmptyString(fromCluster))
  575. fromClusters.append(fromCluster);
  576. appendRemoteLocations(pdesc, locations, localFileName, fromClusters, includeFromCluster);
  577. }
  578. //----------------------------------------------------------------------------------------------
  579. typedef StringArray *StringArrayPtr;
  580. // A circular buffer recording recent disk read operations that can be used to "prewarm" the cache
  581. class CacheReportingBuffer : public CInterfaceOf<ICacheInfoRecorder>
  582. {
  583. // A circular buffer recording recent file activity. Note that noteRead() and clear() may be called from multiple threads
  584. // (other functions are assumed single-threaded) and that locking is kept to a minimum, even if it means information may be slightly inaccurate.
  585. CacheInfoEntry *recentReads = nullptr;
  586. std::atomic<unsigned> recentReadHead = {0};
  587. unsigned recentReadSize;
  588. public:
  589. CacheReportingBuffer(offset_t trackSize)
  590. {
  591. recentReadSize = trackSize >> CacheInfoEntry::pageBits;
  592. if (traceLevel)
  593. DBGLOG("Creating CacheReportingBuffer with %d elements", recentReadSize);
  594. if (!recentReadSize)
  595. throw makeStringExceptionV(ROXIE_FILE_ERROR, "cacheTrackSize(%u) is the size in bytes it cannot be < %u", (unsigned)trackSize, 1U << CacheInfoEntry::pageBits);
  596. recentReads = new CacheInfoEntry[recentReadSize];
  597. recentReadHead = 0;
  598. }
  599. CacheReportingBuffer(const CacheReportingBuffer &from)
  600. {
  601. // NOTE - from may be updated concurrently - we do not want to lock it
  602. // There are therefore races in here, but they do not matter (may result in very recent data being regarded as very old or vice versa).
  603. recentReadSize = from.recentReadSize;
  604. recentReadHead = from.recentReadHead.load(std::memory_order_relaxed);
  605. recentReads = new CacheInfoEntry[recentReadSize];
  606. memcpy(recentReads, from.recentReads, recentReadSize * sizeof(CacheInfoEntry));
  607. }
  608. ~CacheReportingBuffer()
  609. {
  610. delete [] recentReads;
  611. }
  612. void clear()
  613. {
  614. recentReadHead = 0;
  615. }
  616. void noteRead(unsigned fileIdx, offset_t pos, unsigned len, CacheInfoEntry::PageType pageType)
  617. {
  618. if (recentReads && len)
  619. {
  620. CacheInfoEntry start(fileIdx, pos, pageType);
  621. CacheInfoEntry end(fileIdx, pos+len-1, pageType);
  622. for(;start <= end; ++start)
  623. {
  624. recentReads[recentReadHead++ % recentReadSize] = start;
  625. }
  626. }
  627. }
  628. void sortAndDedup()
  629. {
  630. // NOTE: single-threaded
  631. unsigned sortSize;
  632. if (recentReadHead > recentReadSize)
  633. sortSize = recentReadSize;
  634. else
  635. sortSize = recentReadHead;
  636. std::sort(recentReads, recentReads + sortSize);
  637. CacheInfoEntry lastPos(-1,-1,CacheInfoEntry::PageTypeDisk);
  638. unsigned dest = 0;
  639. for (unsigned idx = 0; idx < sortSize; idx++)
  640. {
  641. CacheInfoEntry pos = recentReads[idx];
  642. if (pos.b.file != lastPos.b.file || pos.b.page != lastPos.b.page) // Ignore inNodeCache bit when deduping
  643. {
  644. recentReads[dest++] = pos;
  645. lastPos = pos;
  646. }
  647. }
  648. recentReadHead = dest;
  649. }
  650. void report(StringBuffer &ret, unsigned channel, const StringArray &cacheIndexes, const UnsignedShortArray &cacheIndexChannels)
  651. {
  652. // NOTE: single-threaded
  653. assertex(recentReadHead <= recentReadSize); // Should have sorted and deduped before calling this
  654. unsigned lastFileIdx = (unsigned) -1;
  655. offset_t lastPage = (offset_t) -1;
  656. offset_t startRange = 0;
  657. CacheInfoEntry::PageType lastPageType = CacheInfoEntry::PageTypeDisk;
  658. bool includeFile = false;
  659. for (unsigned idx = 0; idx < recentReadHead; idx++)
  660. {
  661. CacheInfoEntry pos = recentReads[idx];
  662. if (pos.b.file != lastFileIdx)
  663. {
  664. if (includeFile)
  665. appendRange(ret, startRange, lastPage, lastPageType).newline();
  666. lastFileIdx = pos.b.file;
  667. if (channel==(unsigned) -1 || cacheIndexChannels.item(lastFileIdx)==channel)
  668. {
  669. ret.appendf("%u|%s|", cacheIndexChannels.item(lastFileIdx), cacheIndexes.item(lastFileIdx));
  670. includeFile = true;
  671. }
  672. else
  673. includeFile = false;
  674. startRange = pos.b.page;
  675. }
  676. else if ((pos.b.page == lastPage || pos.b.page == lastPage+1) && pos.b.type == lastPageType)
  677. {
  678. // Still in current range
  679. }
  680. else
  681. {
  682. if (includeFile)
  683. appendRange(ret, startRange, lastPage, lastPageType);
  684. startRange = pos.b.page;
  685. }
  686. lastPage = pos.b.page;
  687. lastPageType = (CacheInfoEntry::PageType)pos.b.type;
  688. }
  689. if (includeFile)
  690. appendRange(ret, startRange, lastPage, lastPageType).newline();
  691. }
  692. virtual void noteWarm(unsigned fileIdx, offset_t pos, unsigned len, NodeType type) override
  693. {
  694. //For convenience the values for PageType match the NodeX enumeration.
  695. CacheInfoEntry::PageType pageType = (type <= NodeBlob) ? (CacheInfoEntry::PageType)type : CacheInfoEntry::PageTypeDisk;
  696. noteRead(fileIdx, pos, len, pageType);
  697. }
  698. private:
  699. static StringBuffer &appendRange(StringBuffer &ret, offset_t start, offset_t end, CacheInfoEntry::PageType pageType)
  700. {
  701. ret.append(' ');
  702. if (pageType != CacheInfoEntry::PageTypeDisk)
  703. ret.append('*').append("RLB"[pageType]);
  704. if (start==end)
  705. ret.appendf("%" I64F "x", start);
  706. else
  707. ret.appendf("%" I64F "x-%" I64F "x", start, end);
  708. return ret;
  709. }
  710. };
  711. class IndexCacheWarmer : implements ICacheWarmer
  712. {
  713. IRoxieFileCache *cache = nullptr;
  714. Owned<ILazyFileIO> localFile;
  715. Owned<IKeyIndex> keyIndex;
  716. bool keyFailed = false;
  717. unsigned fileIdx = (unsigned) -1;
  718. unsigned filesProcessed = 0;
  719. unsigned pagesPreloaded = 0;
  720. public:
  721. IndexCacheWarmer(IRoxieFileCache *_cache) : cache(_cache) {}
  722. virtual void startFile(const char *filename) override
  723. {
  724. // "filename" is the filename that roxie would use if it copied the file locally. This may not
  725. // match the name of the actual file - e.g. if the file is local but in a different location.
  726. localFile.setown(cache->lookupLocalFile(filename));
  727. if (localFile)
  728. {
  729. fileIdx = localFile->getFileIdx();
  730. }
  731. keyFailed = false;
  732. filesProcessed++;
  733. }
  734. virtual bool warmBlock(const char *filename, NodeType nodeType, offset_t startOffset, offset_t endOffset) override
  735. {
  736. if (nodeType != NodeNone && !keyFailed && localFile && !keyIndex)
  737. {
  738. //Pass false for isTLK - it will be initialised from the index header
  739. keyIndex.setown(createKeyIndex(filename, localFile->getCrc(), *localFile.get(), fileIdx, false));
  740. if (!keyIndex)
  741. keyFailed = true;
  742. }
  743. if (nodeType != NodeNone && keyIndex)
  744. {
  745. // Round startOffset up to nearest multiple of index node size
  746. unsigned nodeSize = keyIndex->getNodeSize();
  747. startOffset = ((startOffset+nodeSize-1)/nodeSize)*nodeSize;
  748. do
  749. {
  750. if (traceLevel > 8)
  751. DBGLOG("prewarming index page %u %s %" I64F "x-%" I64F "x", (int) nodeType, filename, startOffset, endOffset);
  752. bool loaded = keyIndex->prewarmPage(startOffset, nodeType);
  753. if (!loaded)
  754. break;
  755. pagesPreloaded++;
  756. startOffset += nodeSize;
  757. }
  758. while (startOffset < endOffset);
  759. }
  760. else if (fileIdx != (unsigned) -1)
  761. cache->noteRead(fileIdx, startOffset, (endOffset-1) - startOffset); // Ensure pages we prewarm are recorded in our cache tracker
  762. return true;
  763. }
  764. virtual void endFile() override
  765. {
  766. localFile.clear();
  767. keyIndex.clear();
  768. }
  769. virtual void report() override
  770. {
  771. if (traceLevel)
  772. DBGLOG("Processed %u files and preloaded %u index nodes", filesProcessed, pagesPreloaded);
  773. }
  774. };
  775. class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
  776. {
  777. friend class CcdFileTest;
  778. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  779. #ifdef _CONTAINERIZED
  780. mutable ICopyArrayOf<ILazyFileIO> buddyCopying;
  781. mutable bool buddyChecking = false;
  782. #endif
  783. bool reportedFilesToCopy = false;
  784. InterruptableSemaphore toCopy;
  785. InterruptableSemaphore toClose;
  786. InterruptableSemaphore cidtSleep;
  787. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  788. mutable CriticalSection crit;
  789. CriticalSection cpcrit;
  790. bool started;
  791. bool aborting;
  792. std::atomic<bool> closing;
  793. bool closePending[2];
  794. StringAttrMapping fileErrorList;
  795. bool cidtActive = false;
  796. Semaphore cidtStarted;
  797. Semaphore bctStarted;
  798. Semaphore hctStarted;
  799. // Read-tracking code for pre-warming OS caches
  800. StringArray cacheIndexes;
  801. UnsignedShortArray cacheIndexChannels;
  802. CacheReportingBuffer *activeCacheReportingBuffer = nullptr;
  803. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true)
  804. {
  805. // Ensure that SockFile does not keep these sockets open (or we will run out, or at least empty our LRU cache)
  806. // If useRemoteResources is not set, all checks for fileUpToDate are likely to be followed quickly by calls to copy,
  807. // so autoclosing is unnecessary and undesireable.
  808. class AutoDisconnector
  809. {
  810. public:
  811. AutoDisconnector(IFile *_f, bool isEnabled) { f = isEnabled ? _f : NULL; };
  812. ~AutoDisconnector() { if (f) disconnectRemoteFile(f); }
  813. private:
  814. IFile *f;
  815. } autoDisconnector(f, autoDisconnect && useRemoteResources);
  816. offset_t fileSize = f->size();
  817. if (fileSize != (offset_t) -1)
  818. {
  819. // only check size if specified
  820. if ( (size != (offset_t) -1) && !isCompressed && fileSize != size) // MORE - should be able to do better on compressed you'da thunk
  821. return FileSizeMismatch;
  822. // A temporary fix - files stored on azure don't have an accurate time stamp, so treat them as up to date.
  823. if (isUrl(f->queryFilename()))
  824. return FileIsValid;
  825. CDateTime mt;
  826. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  827. }
  828. else
  829. return FileNotFound;
  830. }
  831. int runCacheInfoDump()
  832. {
  833. cidtStarted.signal();
  834. if (traceLevel)
  835. DBGLOG("Cache info dump thread %p starting", this);
  836. try
  837. {
  838. for (;;)
  839. {
  840. cidtSleep.wait(cacheReportPeriodSeconds * 1000);
  841. if (closing)
  842. break;
  843. if (traceLevel>8)
  844. DBGLOG("Cache info dump");
  845. // Note - cache info is stored in the DLLSERVER persistent area - which we should perhaps consider renaming
  846. StringBuffer cacheRootDirectory;
  847. if (isContainerized())
  848. {
  849. if (!getConfigurationDirectory(nullptr, "query", nullptr, nullptr, cacheRootDirectory))
  850. throwUnexpected();
  851. }
  852. else
  853. {
  854. const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
  855. assertex(dllserver_root != nullptr);
  856. cacheRootDirectory.append(dllserver_root);
  857. }
  858. Owned<const ITopologyServer> topology = getTopology();
  859. Owned<CacheReportingBuffer> tempCacheReportingBuffer = new CacheReportingBuffer(*activeCacheReportingBuffer);
  860. getNodeCacheInfo(*tempCacheReportingBuffer);
  861. tempCacheReportingBuffer->sortAndDedup();
  862. StringBuffer ret;
  863. tempCacheReportingBuffer->report(ret, 0, cacheIndexes, cacheIndexChannels);
  864. if (ret.length())
  865. {
  866. // NOTE - this location is shared with other nodes - who may also be writing
  867. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", cacheRootDirectory.str(), roxieName.str(), 0);
  868. atomicWriteFile(cacheFileName, ret);
  869. if (traceLevel > 8)
  870. DBGLOG("Channel 0 cache info:\n%s", ret.str());
  871. }
  872. for (unsigned channel : topology->queryChannels())
  873. {
  874. tempCacheReportingBuffer->report(ret.clear(), channel, cacheIndexes, cacheIndexChannels);
  875. if (ret.length())
  876. {
  877. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", cacheRootDirectory.str(), roxieName.str(), channel);
  878. atomicWriteFile(cacheFileName, ret);
  879. if (traceLevel > 8)
  880. DBGLOG("Channel %u cache info:\n%s", channel, ret.str());
  881. }
  882. }
  883. // We could at this point put deduped back into active
  884. }
  885. }
  886. catch (IException *E)
  887. {
  888. // Any exceptions terminate the thread - probably a better option than flooding the log
  889. if (!aborting)
  890. EXCLOG(MCoperatorError, E, "Cache info dumper: ");
  891. E->Release();
  892. }
  893. catch (...)
  894. {
  895. IERRLOG("Unknown exception in cache info dump thread");
  896. }
  897. if (traceLevel)
  898. DBGLOG("Cache info dump thread %p exiting", this);
  899. return 0;
  900. }
  901. unsigned trackCache(const char *filename, unsigned channel)
  902. {
  903. // NOTE - called from openFile, with crit already held
  904. if (!activeCacheReportingBuffer)
  905. return (unsigned) -1;
  906. cacheIndexes.append(filename);
  907. cacheIndexChannels.append(channel);
  908. return cacheIndexes.length()-1;
  909. }
  910. virtual void noteRead(unsigned fileIdx, offset_t pos, unsigned len) override
  911. {
  912. if (activeCacheReportingBuffer)
  913. activeCacheReportingBuffer->noteRead(fileIdx, pos, len, CacheInfoEntry::PageTypeDisk);
  914. }
  915. ILazyFileIO *openFile(const char *lfn, unsigned partNo, unsigned channel, const char *localLocation,
  916. IPartDescriptor *pdesc,
  917. const StringArray &localEnoughLocationInfo,
  918. const StringArray &remoteLocationInfo,
  919. offset_t size, const CDateTime &modified)
  920. {
  921. Owned<IFile> local = createIFile(localLocation);
  922. if (traceRemoteFiles)
  923. DBGLOG("openFile adding file %s (localLocation %s)", lfn, localLocation);
  924. bool isCompressed = selfTestMode ? false : pdesc->queryOwner().isCompressed();
  925. unsigned crc = 0;
  926. if (!selfTestMode)
  927. pdesc->getCrc(crc);
  928. Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed, crc);
  929. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, isCompressed);
  930. if (fileStatus == FileIsValid)
  931. {
  932. ret->addSource(local.getLink());
  933. ret->setRemote(false);
  934. }
  935. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  936. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  937. else
  938. {
  939. bool addedOne = false;
  940. #ifdef _CONTAINERIZED
  941. // put the localEnoughLocations next in the list
  942. ForEachItemIn(plane_idx, localEnoughLocationInfo)
  943. {
  944. try
  945. {
  946. const char *localEnoughName = localEnoughLocationInfo.item(plane_idx);
  947. Owned<IFile> localEnoughFile = createIFile(localEnoughName);
  948. RoxieFileStatus status = fileUpToDate(localEnoughFile, size, modified, isCompressed);
  949. if (status==FileIsValid)
  950. {
  951. if (miscDebugTraceLevel > 5)
  952. DBGLOG("adding local enough location %s", localEnoughName);
  953. ret->addSource(localEnoughFile.getClear());
  954. addedOne = true;
  955. //do not set ret->setRemote(true) these locations are treated as if found locally, and not copied to the default plane
  956. }
  957. else if (localEnoughFile->exists() && !ignoreOrphans) // Implies local dali and local enough file out of sync
  958. throw MakeStringException(ROXIE_FILE_ERROR, "Direct access (local enough) file %s does not match DFS information", localEnoughName);
  959. else if (miscDebugTraceLevel > 10)
  960. DBGLOG("Checked local enough data plane location %s, status=%d", localEnoughName, (int) status);
  961. }
  962. catch (IException *E)
  963. {
  964. EXCLOG(MCoperatorError, E, "While creating local enough file reference");
  965. E->Release();
  966. }
  967. }
  968. #else
  969. // put the peerRoxieLocations next in the list
  970. StringArray localLocations;
  971. if (selfTestMode)
  972. localLocations.append("test.buddy");
  973. else
  974. appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true); // Adds all locations on the same cluster
  975. ForEachItemIn(roxie_idx, localLocations)
  976. {
  977. try
  978. {
  979. const char *remoteName = localLocations.item(roxie_idx);
  980. Owned<IFile> remote = createIFile(remoteName);
  981. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  982. if (status==FileIsValid)
  983. {
  984. if (miscDebugTraceLevel > 5)
  985. DBGLOG("adding peer location %s", remoteName);
  986. ret->addSource(remote.getClear());
  987. addedOne = true;
  988. }
  989. else if (status==FileNotFound)
  990. {
  991. // Even though it's not on the buddy (yet), add it to the locations since it may well be there
  992. // by the time we come to copy (and if it is, we want to copy from there)
  993. if (miscDebugTraceLevel > 5)
  994. DBGLOG("adding missing peer location %s", remoteName);
  995. ret->addSource(remote.getClear());
  996. // Don't set addedOne - we need to go to remote too
  997. }
  998. else if (miscDebugTraceLevel > 10)
  999. DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
  1000. }
  1001. catch (IException *E)
  1002. {
  1003. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  1004. E->Release();
  1005. }
  1006. ret->setRemote(true);
  1007. }
  1008. #endif
  1009. if (!addedOne && (copyResources || useRemoteResources || selfTestMode)) // If no peer locations available, go to remote
  1010. {
  1011. ForEachItemIn(idx, remoteLocationInfo)
  1012. {
  1013. try
  1014. {
  1015. const char *remoteName = remoteLocationInfo.item(idx);
  1016. Owned<IFile> remote = createIFile(remoteName);
  1017. if (traceLevel > 5)
  1018. DBGLOG("checking remote location %s", remoteName);
  1019. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  1020. if (status==FileIsValid)
  1021. {
  1022. if (miscDebugTraceLevel > 5)
  1023. DBGLOG("adding remote location %s", remoteName);
  1024. RemoteFilename rfn;
  1025. rfn.setRemotePath(remoteName);
  1026. #ifndef _CONTAINERIZED
  1027. // MORE - may want to change this to mark some other locations as "local enough"
  1028. if (!rfn.isLocal()) // MORE - may still want to copy files even if they are on a posix-accessible path, for local caching? Probably really want to know if hooked or not...
  1029. #endif
  1030. ret->setRemote(true);
  1031. ret->addSource(remote.getClear());
  1032. addedOne = true;
  1033. }
  1034. else if (miscDebugTraceLevel > 10)
  1035. DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
  1036. }
  1037. catch (IException *E)
  1038. {
  1039. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  1040. E->Release();
  1041. }
  1042. }
  1043. }
  1044. if (!addedOne)
  1045. {
  1046. if (local->exists()) // Implies local dali and local file out of sync
  1047. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  1048. else
  1049. {
  1050. if (traceLevel >= 2)
  1051. {
  1052. #ifndef _CONTAINERIZED
  1053. DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
  1054. ForEachItemIn(local_idx, localLocations)
  1055. {
  1056. DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
  1057. }
  1058. DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
  1059. #else
  1060. DBGLOG("Failed to open file at any of the following %d remote locations:", remoteLocationInfo.length());
  1061. #endif
  1062. ForEachItemIn(remote_idx, remoteLocationInfo)
  1063. {
  1064. DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
  1065. }
  1066. }
  1067. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  1068. }
  1069. }
  1070. }
  1071. ret->setCache(this, trackCache(local->queryFilename(), channel));
  1072. files.setValue(local->queryFilename(), (ILazyFileIO *)ret);
  1073. return ret.getClear();
  1074. }
  1075. static void deleteTempFiles(const char *targetFilename)
  1076. {
  1077. try
  1078. {
  1079. StringBuffer destPath;
  1080. StringBuffer prevTempFile;
  1081. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  1082. prevTempFile.append("*.$$$");
  1083. Owned<IDirectoryIterator> iter = createDirectoryIterator(destPath, prevTempFile, false, false);
  1084. ForEach(*iter)
  1085. {
  1086. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  1087. if (thisFile->isFile() == fileBool::foundYes)
  1088. thisFile->remove();
  1089. }
  1090. }
  1091. catch(IException *E)
  1092. {
  1093. StringBuffer err;
  1094. OERRLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  1095. E->Release();
  1096. }
  1097. catch(...)
  1098. {
  1099. }
  1100. }
  1101. static bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
  1102. {
  1103. bool fileCopied = false;
  1104. IFile *sourceFile;
  1105. try
  1106. {
  1107. f->setCopying(true);
  1108. sourceFile = f->querySource();
  1109. }
  1110. catch (IException *E)
  1111. {
  1112. f->setCopying(false);
  1113. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  1114. throw;
  1115. }
  1116. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  1117. deleteTempFiles(targetFilename);
  1118. offset_t fileSize = sourceFile->size();
  1119. if ( (fileSize + minFreeDiskSpace) > freeDiskSpace)
  1120. {
  1121. StringBuffer err;
  1122. err.appendf("Insufficient disk space. File %s needs %" I64F "d bytes, but only %" I64F "d remains, and %" I64F "d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  1123. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  1124. EXCLOG(MCoperatorError, E);
  1125. E->Release();
  1126. f->setCopying(false);
  1127. }
  1128. else
  1129. {
  1130. Owned<IFile> destFile = createIFile(tempFile);
  1131. bool hardLinkCreated = false;
  1132. unsigned start = msTick();
  1133. #ifdef _DEBUG
  1134. if (topology && topology->getPropBool("@simulateSlowCopies")) // topology is null when running unit tests
  1135. {
  1136. DBGLOG("Simulating a slow copy");
  1137. Sleep(10*1000);
  1138. }
  1139. #endif
  1140. try
  1141. {
  1142. if (useHardLink)
  1143. hardLinkCreated = f->createHardFileLink();
  1144. if (hardLinkCreated)
  1145. msg = "Hard Link";
  1146. else
  1147. {
  1148. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  1149. if (traceLevel > 5)
  1150. {
  1151. StringBuffer str;
  1152. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  1153. TimeSection timing(str.str());
  1154. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  1155. }
  1156. else
  1157. {
  1158. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  1159. }
  1160. }
  1161. f->setCopying(false);
  1162. fileCopied = true;
  1163. }
  1164. catch(IException *E)
  1165. {
  1166. f->setCopying(false);
  1167. EXCLOG(E, "Copy exception - remove templocal");
  1168. destFile->remove();
  1169. deleteTempFiles(targetFilename);
  1170. throw;
  1171. }
  1172. catch(...)
  1173. {
  1174. f->setCopying(false);
  1175. IERRLOG("%s exception - remove templocal", msg);
  1176. destFile->remove();
  1177. deleteTempFiles(targetFilename);
  1178. throw;
  1179. }
  1180. if (!hardLinkCreated) // for hardlinks no rename needed
  1181. {
  1182. try
  1183. {
  1184. destFile->rename(targetFilename);
  1185. }
  1186. catch(IException *)
  1187. {
  1188. f->setCopying(false);
  1189. deleteTempFiles(targetFilename);
  1190. throw;
  1191. }
  1192. unsigned elapsed = msTick() - start;
  1193. double sizeMB = ((double) fileSize) / (1024*1024);
  1194. double MBperSec = elapsed ? (sizeMB / elapsed) * 1000 : 0;
  1195. DBGLOG("%s to %s complete in %d ms (%.1f MB/sec)", msg, targetFilename, elapsed, MBperSec);
  1196. }
  1197. f->copyComplete();
  1198. }
  1199. deleteTempFiles(targetFilename);
  1200. return fileCopied;
  1201. }
  1202. static bool doCopy(ILazyFileIO *f, bool background, CFflags copyFlags=CFnone)
  1203. {
  1204. if (!f->isRemote())
  1205. f->copyComplete();
  1206. else
  1207. {
  1208. const char *targetFilename = f->queryTarget()->queryFilename();
  1209. StringBuffer tempFile(targetFilename);
  1210. StringBuffer destPath;
  1211. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  1212. if (destPath.length())
  1213. recursiveCreateDirectory(destPath.str());
  1214. else
  1215. destPath.append('.');
  1216. if (!checkDirExists(destPath.str())) {
  1217. OERRLOG("Dest directory %s does not exist", destPath.str());
  1218. return false;
  1219. }
  1220. tempFile.append(".$$$");
  1221. const char *msg = background ? "Background copy" : "Copy";
  1222. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
  1223. }
  1224. return false; // if we get here there was no file copied
  1225. }
  1226. public:
  1227. IMPLEMENT_IINTERFACE;
  1228. CRoxieFileCache() :
  1229. cidt(*this),
  1230. bct(*this), hct(*this)
  1231. {
  1232. aborting = false;
  1233. closing = false;
  1234. closePending[false] = false;
  1235. closePending[true] = false;
  1236. started = false;
  1237. if (!selfTestMode && !allFilesDynamic)
  1238. {
  1239. Owned<IPropertyTree> compConfig = getComponentConfig();
  1240. offset_t cacheTrackSize = compConfig->getPropInt64("@cacheTrackSize", (offset_t) -1);
  1241. if (cacheTrackSize == (offset_t) -1)
  1242. {
  1243. const char *memLimit = compConfig->queryProp("resources/limits/@memory");
  1244. if (!memLimit)
  1245. memLimit = compConfig->queryProp("resources/requests/@memory");
  1246. if (memLimit)
  1247. {
  1248. try
  1249. {
  1250. cacheTrackSize = friendlyStringToSize(memLimit);
  1251. }
  1252. catch (IException *E)
  1253. {
  1254. EXCLOG(E);
  1255. E->Release();
  1256. cacheTrackSize = 0;
  1257. }
  1258. }
  1259. else
  1260. cacheTrackSize = 0x10000 * (1<<CacheInfoEntry::pageBits);
  1261. }
  1262. if (cacheTrackSize)
  1263. activeCacheReportingBuffer = new CacheReportingBuffer(cacheTrackSize);
  1264. }
  1265. }
  1266. ~CRoxieFileCache()
  1267. {
  1268. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  1269. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  1270. HashIterator h(files);
  1271. ForEach(h)
  1272. {
  1273. ILazyFileIO *f = files.mapToValue(&h.query());
  1274. f->removeCache(this);
  1275. }
  1276. delete activeCacheReportingBuffer;
  1277. }
  1278. virtual void start()
  1279. {
  1280. if (!started)
  1281. {
  1282. bct.start();
  1283. hct.start();
  1284. bctStarted.wait();
  1285. hctStarted.wait();
  1286. }
  1287. started = true;
  1288. }
  1289. virtual void startCacheReporter() override
  1290. {
  1291. #ifndef _CONTAINERIZED
  1292. if (!getenv("HPCC_DLLSERVER_PATH"))
  1293. return;
  1294. #endif
  1295. if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1296. {
  1297. cidt.start();
  1298. cidtStarted.wait();
  1299. cidtActive = true;
  1300. }
  1301. }
  1302. class CacheInfoDumpThread : public Thread
  1303. {
  1304. CRoxieFileCache &owner;
  1305. public:
  1306. CacheInfoDumpThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-CacheInfoDumpThread"), owner(_owner) {}
  1307. virtual int run()
  1308. {
  1309. return owner.runCacheInfoDump();
  1310. }
  1311. } cidt;
  1312. class BackgroundCopyThread : public Thread
  1313. {
  1314. CRoxieFileCache &owner;
  1315. public:
  1316. BackgroundCopyThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-BackgroundCopyThread"), owner(_owner) {}
  1317. virtual int run()
  1318. {
  1319. return owner.runBackgroundCopy();
  1320. }
  1321. } bct;
  1322. class HandleCloserThread : public Thread
  1323. {
  1324. CRoxieFileCache &owner;
  1325. public:
  1326. HandleCloserThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-HandleCloserThread"), owner(_owner) {}
  1327. virtual int run()
  1328. {
  1329. return owner.runHandleCloser();
  1330. }
  1331. } hct;
  1332. int runBackgroundCopy()
  1333. {
  1334. bctStarted.signal();
  1335. #if defined(__linux__) && defined(SYS_ioprio_set)
  1336. if (backgroundCopyClass)
  1337. syscall(SYS_ioprio_set, IOPRIO_WHO_PROCESS, 0, IOPRIO_PRIO_VALUE(backgroundCopyClass, backgroundCopyPrio));
  1338. #endif
  1339. if (traceLevel)
  1340. {
  1341. #if defined(__linux__) && defined(SYS_ioprio_get)
  1342. int ioprio = syscall(SYS_ioprio_get, IOPRIO_WHO_PROCESS, 0);
  1343. int ioclass = IOPRIO_PRIO_CLASS(ioprio);
  1344. ioprio = IOPRIO_PRIO_DATA(ioprio);
  1345. DBGLOG("Background copy thread %p starting, io priority class %d, priority %d", this, ioclass, ioprio);
  1346. #else
  1347. DBGLOG("Background copy thread %p starting", this);
  1348. #endif
  1349. }
  1350. try
  1351. {
  1352. for (;;)
  1353. {
  1354. Linked<ILazyFileIO> next;
  1355. toCopy.wait();
  1356. {
  1357. CriticalBlock b(crit);
  1358. if (closing)
  1359. break;
  1360. if (todo.ordinality())
  1361. {
  1362. ILazyFileIO *popped = &todo.popGet();
  1363. if (popped->isAliveAndLink())
  1364. {
  1365. next.setown(popped);
  1366. }
  1367. numFilesToProcess--; // must decrement counter for SNMP accuracy
  1368. }
  1369. }
  1370. if (next)
  1371. {
  1372. try
  1373. {
  1374. doCopy(next, true, CFflush_rdwr);
  1375. }
  1376. catch (IException *E)
  1377. {
  1378. if (aborting)
  1379. throw;
  1380. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  1381. E->Release();
  1382. }
  1383. catch (...)
  1384. {
  1385. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  1386. }
  1387. }
  1388. CriticalBlock b(crit);
  1389. if (todo.ordinality()==0 && reportedFilesToCopy)
  1390. {
  1391. #ifdef _CONTAINERIZED
  1392. DBGLOG("No more data files for this node to copy");
  1393. if (!buddyCopying.length() && !buddyChecking)
  1394. #endif
  1395. {
  1396. DBGLOG("No more data files to copy");
  1397. reportedFilesToCopy = false;
  1398. }
  1399. }
  1400. }
  1401. }
  1402. catch (IException *E)
  1403. {
  1404. if (!aborting)
  1405. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  1406. E->Release();
  1407. }
  1408. catch (...)
  1409. {
  1410. IERRLOG("Unknown exception in background copy thread");
  1411. }
  1412. if (traceLevel)
  1413. DBGLOG("Background copy thread %p exiting", this);
  1414. return 0;
  1415. }
  1416. int runHandleCloser()
  1417. {
  1418. hctStarted.signal();
  1419. if (traceLevel)
  1420. DBGLOG("HandleCloser thread %p starting", this);
  1421. try
  1422. {
  1423. unsigned lastCloseCheck = msTick();
  1424. for (;;)
  1425. {
  1426. #ifdef _CONTAINERIZED
  1427. unsigned checkPeriod = topology->getPropInt("@copyCheckPeriod", 60);
  1428. #else
  1429. unsigned checkPeriod = 10*60; // check expired file handles every 10 minutes, buddyCopying a little more often
  1430. #endif
  1431. toClose.wait(checkPeriod * 1000);
  1432. if (closing)
  1433. break;
  1434. #ifdef _CONTAINERIZED
  1435. // Periodically recheck the list to see what is now local, and remove them from the buddyCopying list
  1436. ICopyArrayOf<ILazyFileIO> checkBuddies;
  1437. {
  1438. CriticalBlock b(crit);
  1439. if (buddyCopying.length())
  1440. {
  1441. buddyCopying.swapWith(checkBuddies);
  1442. buddyChecking = true;
  1443. }
  1444. }
  1445. if (checkBuddies.length())
  1446. {
  1447. ForEachItemIn(idx, checkBuddies)
  1448. {
  1449. ILazyFileIO &check = checkBuddies.item(idx);
  1450. if (traceRemoteFiles)
  1451. DBGLOG("Checking whether someone has copied file %s for me", check.queryFilename());
  1452. if (check.isRemote())
  1453. {
  1454. if (traceRemoteFiles)
  1455. check.dump();
  1456. if (!check.checkCopyComplete()) // Recheck whether there is a local file we can open
  1457. {
  1458. CriticalBlock b1(crit);
  1459. buddyCopying.append(check);
  1460. }
  1461. }
  1462. }
  1463. CriticalBlock b2(crit);
  1464. buddyChecking = false;
  1465. if (buddyCopying.length()==0)
  1466. {
  1467. DBGLOG("No more data files being copied by other nodes");
  1468. if (todo.ordinality()==0 && reportedFilesToCopy)
  1469. {
  1470. DBGLOG("No more data files to copy");
  1471. reportedFilesToCopy = false;
  1472. }
  1473. }
  1474. }
  1475. #endif
  1476. unsigned elapsed = msTick()-lastCloseCheck;
  1477. if (elapsed >= 10*60*1000)
  1478. {
  1479. doCloseExpired(true);
  1480. doCloseExpired(false);
  1481. lastCloseCheck = msTick();
  1482. }
  1483. }
  1484. }
  1485. catch (IException *E)
  1486. {
  1487. if (!aborting)
  1488. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  1489. E->Release();
  1490. }
  1491. catch (...)
  1492. {
  1493. IERRLOG("Unknown exception in handle closer thread");
  1494. }
  1495. if (traceLevel)
  1496. DBGLOG("Handle closer thread %p exiting", this);
  1497. return 0;
  1498. }
  1499. virtual void join(unsigned timeout=INFINITE)
  1500. {
  1501. aborting = true;
  1502. if (started)
  1503. {
  1504. toCopy.interrupt();
  1505. toClose.interrupt();
  1506. bct.join(timeout);
  1507. hct.join(timeout);
  1508. }
  1509. #ifdef _CONTAINERIZED
  1510. if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1511. {
  1512. cidtSleep.interrupt();
  1513. cidt.join(timeout);
  1514. }
  1515. #endif
  1516. }
  1517. virtual void wait()
  1518. {
  1519. closing = true;
  1520. if (started)
  1521. {
  1522. toCopy.signal();
  1523. toClose.signal();
  1524. bct.join();
  1525. hct.join();
  1526. }
  1527. #ifdef _CONTAINERIZED
  1528. if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1529. {
  1530. cidtSleep.signal();
  1531. cidt.join();
  1532. }
  1533. #endif
  1534. }
  1535. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  1536. {
  1537. return aborting ? CFPcancel : CFPcontinue;
  1538. }
  1539. virtual void removeCache(ILazyFileIO *file) const
  1540. {
  1541. CriticalBlock b(crit);
  1542. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1543. // So only remove from hash table if what we find there matches the item that is being deleted.
  1544. const char *filename = file->queryFilename();
  1545. ILazyFileIO *goer = files.getValue(filename);
  1546. if (goer == file)
  1547. files.remove(filename);
  1548. ForEachItemInRev(idx, todo)
  1549. {
  1550. if (file == &todo.item(idx))
  1551. {
  1552. todo.remove(idx);
  1553. numFilesToProcess--; // must decrement counter for SNMP accuracy
  1554. }
  1555. }
  1556. }
  1557. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  1558. IPartDescriptor *pdesc, unsigned numParts, unsigned channel,
  1559. const StringArray &localEnoughLocationInfo,
  1560. const StringArray &deployedLocationInfo, bool startFileCopy)
  1561. {
  1562. unsigned replicationLevel = getReplicationLevel(channel);
  1563. IPropertyTree &partProps = pdesc->queryProperties();
  1564. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  1565. bool local = partProps.getPropBool("@local");
  1566. CDateTime dfsDate;
  1567. if (checkFileDate)
  1568. {
  1569. const char *dateStr = partProps.queryProp("@modified");
  1570. dfsDate.setString(dateStr);
  1571. }
  1572. unsigned partNo = pdesc->queryPartIndex() + 1;
  1573. StringBuffer localLocation;
  1574. if (local)
  1575. {
  1576. assertex(partNo==1 && numParts==1);
  1577. localLocation.append(lfn); // any resolution done earlier
  1578. }
  1579. else
  1580. {
  1581. // MORE - not at all sure about this. Foreign files should stay foreign ?
  1582. CDfsLogicalFileName dlfn;
  1583. dlfn.set(lfn);
  1584. if (dlfn.isForeign())
  1585. dlfn.clearForeign();
  1586. bool defaultDirPerPart = false;
  1587. StringBuffer defaultDir;
  1588. #ifdef _CONTAINERIZED
  1589. if (!dlfn.isExternal())
  1590. {
  1591. IFileDescriptor &fileDesc = pdesc->queryOwner();
  1592. StringBuffer planeName;
  1593. fileDesc.getClusterGroupName(0, planeName);
  1594. Owned<IStoragePlane> plane = getDataStoragePlane(planeName, true);
  1595. defaultDir.append(plane->queryPrefix());
  1596. FileDescriptorFlags fileFlags = static_cast<FileDescriptorFlags>(fileDesc.queryProperties().getPropInt("@flags"));
  1597. if (FileDescriptorFlags::none != (fileFlags & FileDescriptorFlags::dirperpart))
  1598. defaultDirPerPart = true;
  1599. }
  1600. #endif
  1601. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault, defaultDir.str(), defaultDirPerPart);
  1602. }
  1603. Owned<ILazyFileIO> ret;
  1604. try
  1605. {
  1606. CLeavableCriticalBlock b(crit);
  1607. ILazyFileIO * match = files.getValue(localLocation);
  1608. if (match && match->isAliveAndLink())
  1609. {
  1610. Owned<ILazyFileIO> f = match;
  1611. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1612. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1613. {
  1614. releaseAgentDynamicFileCache(); // Agent dynamic file cache or...
  1615. if (fileType == ROXIE_KEY) // ...jhtree cache can keep files active and thus prevent us from loading a new version
  1616. clearKeyStoreCacheEntry(f); // Will release iff that is the only link
  1617. f.clear(); // Note - needs to be done before calling getValue() again, hence the need to make it separate from the f.set below
  1618. f.set(files.getValue(localLocation));
  1619. if (f) // May have been cleared above...
  1620. {
  1621. StringBuffer modifiedDt;
  1622. if (!dfsDate.isNull())
  1623. dfsDate.getString(modifiedDt);
  1624. StringBuffer fileDt;
  1625. f->queryDateTime()->getString(fileDt);
  1626. if (fileErrorList.find(lfn) == 0)
  1627. {
  1628. switch (fileType)
  1629. {
  1630. case ROXIE_KEY:
  1631. fileErrorList.setValue(lfn, "Key");
  1632. break;
  1633. case ROXIE_FILE:
  1634. fileErrorList.setValue(lfn, "File");
  1635. break;
  1636. }
  1637. }
  1638. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %" I64F "d %" I64F "d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1639. }
  1640. }
  1641. else
  1642. return f.getClear();
  1643. }
  1644. ret.setown(openFile(lfn, partNo, channel, localLocation, pdesc, localEnoughLocationInfo, deployedLocationInfo, dfsSize, dfsDate));
  1645. if (startFileCopy)
  1646. {
  1647. if (ret->isRemote())
  1648. {
  1649. if (copyResources) // MORE - should always copy peer files
  1650. {
  1651. #ifdef _CONTAINERIZED
  1652. // In containerized mode, Roxie file copies are restricted to have only one node do the copying (first node on a channel,
  1653. // random node for single-part). But any node that has
  1654. // (a) files being read remotely and
  1655. // (b) no files to copy and
  1656. // (c) a small delay will go through all remote files and check if they are now available locally
  1657. // There is an assumption that a "pull" roxie does not have replicas that we don't know about
  1658. // - more than one "pull" roxie copying to the same plane at the same time
  1659. // - replicas=1 should be set on the "pull" roxie (we may be able to relax that using info from toposerver)
  1660. // - can't use localAgent mode on a "pull" roxie
  1661. bool iShouldCopy = (replicationLevel==0);
  1662. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1663. {
  1664. // We distribute the responsibility for copying the TLK/single-part files
  1665. unsigned whoShouldCopy = (rtlHash32VStr(lfn, HASH32_INIT) % numChannels) + 1;
  1666. if (whoShouldCopy != myChannel)
  1667. iShouldCopy = false;
  1668. }
  1669. if (!reportedFilesToCopy)
  1670. DBGLOG("Received files to copy");
  1671. reportedFilesToCopy = true;
  1672. if (iShouldCopy)
  1673. {
  1674. if (!useRemoteResources)
  1675. {
  1676. b.leave();
  1677. ret->checkOpen();
  1678. doCopy(ret, false, CFflush_rdwr);
  1679. return ret.getLink();
  1680. }
  1681. todo.append(*ret);
  1682. numFilesToProcess++; // must increment counter for SNMP accuracy
  1683. toCopy.signal();
  1684. }
  1685. else
  1686. {
  1687. if (traceRemoteFiles)
  1688. DBGLOG("Add file %s to buddyCopying list", ret->queryFilename());
  1689. buddyCopying.append(*ret); // We expect someone else to copy it for us
  1690. }
  1691. #else
  1692. // Single-part files and top-level keys are copied immediately rather than being read remotely while background copying
  1693. // This is to avoid huge contention on the source dafilesrv if the Roxie is live.
  1694. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY) || !useRemoteResources)
  1695. {
  1696. b.leave();
  1697. ret->checkOpen();
  1698. doCopy(ret, false, CFflush_rdwr);
  1699. return ret.getLink();
  1700. }
  1701. // Copies are popped from end of the todo list
  1702. // By putting the replicates on the front we ensure they are done after the primaries
  1703. // and are therefore likely to result in local rather than remote copies.
  1704. if (!reportedFilesToCopy)
  1705. DBGLOG("Received files to copy");
  1706. reportedFilesToCopy = true;
  1707. if (replicationLevel)
  1708. todo.add(*ret, 0);
  1709. else
  1710. todo.append(*ret);
  1711. numFilesToProcess++; // must increment counter for SNMP accuracy
  1712. toCopy.signal();
  1713. #endif
  1714. }
  1715. }
  1716. }
  1717. if (!lazyOpen)
  1718. ret->checkOpen();
  1719. }
  1720. catch(IException *e)
  1721. {
  1722. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1723. {
  1724. if (fileErrorList.find(lfn) == 0)
  1725. {
  1726. switch (fileType)
  1727. {
  1728. case ROXIE_KEY:
  1729. fileErrorList.setValue(lfn, "Key");
  1730. break;
  1731. case ROXIE_FILE:
  1732. fileErrorList.setValue(lfn, "File");
  1733. break;
  1734. }
  1735. }
  1736. }
  1737. throw;
  1738. }
  1739. return ret.getLink();
  1740. }
  1741. virtual ILazyFileIO *lookupLocalFile(const char *filename)
  1742. {
  1743. try
  1744. {
  1745. CriticalBlock b(crit);
  1746. ILazyFileIO * match = files.getValue(filename);
  1747. if (match && match->isAliveAndLink())
  1748. return match;
  1749. }
  1750. catch(IException *e)
  1751. {
  1752. e->Release();
  1753. }
  1754. return nullptr;
  1755. }
  1756. virtual void closeExpired(bool remote)
  1757. {
  1758. // This schedules a close at the next available opportunity
  1759. CriticalBlock b(cpcrit); // paranoid...
  1760. if (!closePending[remote])
  1761. {
  1762. closePending[remote] = true;
  1763. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) numFilesOpen[remote]);
  1764. toClose.signal();
  1765. }
  1766. }
  1767. static unsigned __int64 readPage(const char * &_t)
  1768. {
  1769. const char *t = _t;
  1770. unsigned __int64 v = 0;
  1771. for (;;)
  1772. {
  1773. char c = *t;
  1774. if ((c >= '0') && (c <= '9'))
  1775. v = v * 16 + (c-'0');
  1776. else if ((c >= 'a') && (c <= 'f'))
  1777. v = v * 16 + (c-'a'+10);
  1778. else if ((c >= 'A') && (c <= 'F'))
  1779. v = v * 16 + (c-'A'+10);
  1780. else
  1781. break;
  1782. t++;
  1783. }
  1784. _t = t;
  1785. return v;
  1786. }
  1787. virtual void loadSavedOsCacheInfo() override
  1788. {
  1789. if (!topology->getPropBool("@warmOsCache", true))
  1790. return;
  1791. Owned<const ITopologyServer> topology = getTopology();
  1792. for (unsigned channel : topology->queryChannels())
  1793. doLoadSavedOsCacheInfo(channel);
  1794. doLoadSavedOsCacheInfo(0); // MORE - maybe only if I am also a server?
  1795. }
  1796. void doLoadSavedOsCacheInfo(unsigned channel)
  1797. {
  1798. StringBuffer cacheRootDirectory;
  1799. if (isContainerized())
  1800. {
  1801. if (!getConfigurationDirectory(nullptr, "query", nullptr, nullptr, cacheRootDirectory))
  1802. throwUnexpected();
  1803. }
  1804. else
  1805. {
  1806. //Default behaviour is to not load or saving anything on bare metal
  1807. const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
  1808. if (!dllserver_root)
  1809. return;
  1810. cacheRootDirectory.append(dllserver_root);
  1811. }
  1812. unsigned cacheWarmTraceLevel = topology->getPropInt("@cacheWarmTraceLevel", traceLevel);
  1813. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", cacheRootDirectory.str(), roxieName.str(), channel);
  1814. StringBuffer cacheInfo;
  1815. try
  1816. {
  1817. if (checkFileExists(cacheFileName))
  1818. {
  1819. #ifndef _WIN32
  1820. StringBuffer output;
  1821. VStringBuffer command("ccdcache %s -t %u", cacheFileName.str(), cacheWarmTraceLevel);
  1822. unsigned retcode = runExternalCommand(nullptr, output, output, command, nullptr, ".", nullptr);
  1823. if (output.length())
  1824. {
  1825. StringArray outputLines;
  1826. outputLines.appendList(output, "\n");
  1827. ForEachItemIn(idx, outputLines)
  1828. {
  1829. const char *line = outputLines.item(idx);
  1830. if (line && *line)
  1831. DBGLOG("ccdcache: %s", line);
  1832. }
  1833. }
  1834. if (retcode)
  1835. DBGLOG("ccdcache failed with exit code %u", retcode);
  1836. #endif
  1837. cacheInfo.loadFile(cacheFileName, false);
  1838. if (traceLevel)
  1839. DBGLOG("Loading cache information from %s for channel %d", cacheFileName.str(), channel);
  1840. warmOsCache(cacheInfo);
  1841. }
  1842. }
  1843. catch(IException *E)
  1844. {
  1845. EXCLOG(E);
  1846. E->Release();
  1847. }
  1848. }
  1849. virtual void warmOsCache(const char *cacheInfo) override
  1850. {
  1851. if (!cacheInfo)
  1852. return;
  1853. IndexCacheWarmer warmer(this);
  1854. if (!::warmOsCache(cacheInfo, &warmer))
  1855. DBGLOG("WARNING: Unrecognized cacheInfo format");
  1856. warmer.report();
  1857. }
  1858. virtual void clearOsCache() override
  1859. {
  1860. if (activeCacheReportingBuffer)
  1861. activeCacheReportingBuffer->clear();
  1862. }
  1863. virtual void reportOsCache(StringBuffer &ret, unsigned channel) const override
  1864. {
  1865. if (activeCacheReportingBuffer)
  1866. {
  1867. Owned<CacheReportingBuffer> temp = new CacheReportingBuffer(*activeCacheReportingBuffer);
  1868. getNodeCacheInfo(*temp);
  1869. temp->sortAndDedup();
  1870. temp->report(ret, channel, cacheIndexes, cacheIndexChannels);
  1871. // We could at this point put deduped back into active
  1872. }
  1873. }
  1874. void doCloseExpired(bool remote)
  1875. {
  1876. {
  1877. CriticalBlock b(cpcrit); // paranoid...
  1878. closePending[remote] = false;
  1879. }
  1880. IArrayOf<ILazyFileIO> goers;
  1881. {
  1882. CriticalBlock b(crit);
  1883. HashIterator h(files);
  1884. ForEach(h)
  1885. {
  1886. ILazyFileIO * match = files.mapToValue(&h.query());
  1887. if (match->isAliveAndLink())
  1888. {
  1889. Owned<ILazyFileIO> f = match;
  1890. if (f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1891. {
  1892. unsigned age = msTick() - f->getLastAccessed();
  1893. if (age > maxFileAge[remote])
  1894. {
  1895. if (traceLevel > 5)
  1896. {
  1897. // NOTE - querySource will cause the file to be opened if not already open
  1898. // That's OK here, since we know the file is open and remote.
  1899. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1900. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1901. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1902. }
  1903. f->close();
  1904. }
  1905. else
  1906. goers.append(*f.getClear());
  1907. }
  1908. }
  1909. }
  1910. }
  1911. unsigned numFilesLeft = goers.ordinality();
  1912. if (numFilesLeft > maxFilesOpen[remote])
  1913. {
  1914. goers.sort(CRoxieLazyFileIO::compareAccess);
  1915. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1916. unsigned idx = minFilesOpen[remote];
  1917. while (idx < numFilesLeft)
  1918. {
  1919. ILazyFileIO &f = goers.item(idx++);
  1920. if (!f.isCopying())
  1921. {
  1922. if (traceLevel > 5)
  1923. {
  1924. unsigned age = msTick() - f.getLastAccessed();
  1925. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1926. }
  1927. f.close();
  1928. }
  1929. }
  1930. }
  1931. }
  1932. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1933. {
  1934. Owned<IFile> dirf = createIFile(directory);
  1935. if (dirf->exists() && dirf->isDirectory()==fileBool::foundYes)
  1936. {
  1937. try
  1938. {
  1939. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1940. ForEach(*iter)
  1941. {
  1942. const char *thisName = iter->query().queryFilename();
  1943. flushUnusedDirectories(origBaseDir, thisName, xml);
  1944. }
  1945. if (stricmp(origBaseDir, directory) != 0)
  1946. {
  1947. try
  1948. {
  1949. dirf->remove();
  1950. xml.appendf("<Directory>%s</Directory>\n", directory);
  1951. DBGLOG("Deleted directory %s", directory);
  1952. }
  1953. catch (IException *e)
  1954. {
  1955. // don't care if we can't delete the directory
  1956. e->Release();
  1957. }
  1958. catch(...)
  1959. {
  1960. // don't care if we can't delete the directory
  1961. }
  1962. }
  1963. }
  1964. catch (IException *e)
  1965. {
  1966. // don't care if we can't delete the directory
  1967. e->Release();
  1968. }
  1969. catch(...)
  1970. {
  1971. // don't care if we can't delete the directory
  1972. }
  1973. }
  1974. }
  1975. int numFilesToCopy()
  1976. {
  1977. CriticalBlock b(crit);
  1978. return todo.ordinality();
  1979. }
  1980. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1981. static inline bool validFNameChar(char c)
  1982. {
  1983. static const char *invalids = "*\"/:<>?\\|";
  1984. return (c>=32 && c<127 && !strchr(invalids, c));
  1985. }
  1986. };
  1987. #ifdef _CONTAINERIZED
  1988. static bool getDirectAccessStoragePlanes(StringArray &planes)
  1989. {
  1990. Owned<IPropertyTreeIterator> iter = getComponentConfigSP()->getElements("directAccessPlanes");
  1991. ForEach(*iter)
  1992. {
  1993. const char *plane = iter->query().queryProp("");
  1994. if (!isEmptyString(plane))
  1995. planes.appendUniq(plane);
  1996. }
  1997. return !planes.empty();
  1998. }
  1999. #endif
  2000. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  2001. {
  2002. #ifdef _CONTAINERIZED
  2003. const char *myCluster = defaultPlane.str();
  2004. #else
  2005. const char *myCluster = roxieName.str();
  2006. #endif
  2007. StringArray localEnoughLocations; //files from these locations won't be copied to the default plane
  2008. StringArray remoteLocations;
  2009. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  2010. if (peerCluster)
  2011. {
  2012. if (*peerCluster!='-') // a remote cluster was specified explicitly
  2013. appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true); // Add only from specified cluster
  2014. }
  2015. else
  2016. {
  2017. #ifdef _CONTAINERIZED
  2018. StringArray localEnoughPlanes;
  2019. if (getDirectAccessStoragePlanes(localEnoughPlanes))
  2020. appendRemoteLocations(pdesc, localEnoughLocations, NULL, localEnoughPlanes, true);
  2021. localEnoughPlanes.append(myCluster);
  2022. appendRemoteLocations(pdesc, remoteLocations, NULL, localEnoughPlanes, false); // Add from any plane on same dali, other than default or loacal enough
  2023. #else
  2024. appendRemoteLocations(pdesc, remoteLocations, NULL, myCluster, false); // Add from any cluster on same dali, other than mine
  2025. #endif
  2026. }
  2027. if (remotePDesc)
  2028. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false); // Then any remote on remote dali
  2029. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, channel, localEnoughLocations, remoteLocations, startCopy);
  2030. }
  2031. //====================================================================================================
  2032. class CFilePartMap : implements IFilePartMap, public CInterface
  2033. {
  2034. class FilePartMapElement
  2035. {
  2036. public:
  2037. offset_t base;
  2038. offset_t top;
  2039. inline int compare(offset_t offset)
  2040. {
  2041. if (offset < base)
  2042. return -1;
  2043. else if (offset >= top)
  2044. return 1;
  2045. else
  2046. return 0;
  2047. }
  2048. } *map;
  2049. static int compareParts(const void *l, const void *r)
  2050. {
  2051. offset_t lp = * (offset_t *) l;
  2052. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  2053. return thisPart->compare(lp);
  2054. }
  2055. unsigned numParts;
  2056. offset_t recordCount;
  2057. offset_t totalSize;
  2058. StringAttr fileName;
  2059. public:
  2060. IMPLEMENT_IINTERFACE;
  2061. CFilePartMap(IPropertyTree &resource)
  2062. {
  2063. fileName.set(resource.queryProp("@id"));
  2064. numParts = resource.getPropInt("@numparts");
  2065. recordCount = resource.getPropInt64("@recordCount");
  2066. totalSize = resource.getPropInt64("@size");
  2067. assertex(numParts);
  2068. map = new FilePartMapElement[numParts];
  2069. for (unsigned i = 0; i < numParts; i++)
  2070. {
  2071. StringBuffer partPath;
  2072. partPath.appendf("Part[@num='%d']", i+1);
  2073. IPropertyTree *part = resource.queryPropTree(partPath.str());
  2074. if (!part)
  2075. {
  2076. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  2077. part = resource.queryPropTree(partPath.str());
  2078. }
  2079. assertex(part);
  2080. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  2081. assertex(size != (unsigned __int64) -1);
  2082. map[i].base = i ? map[i-1].top : 0;
  2083. map[i].top = map[i].base + size;
  2084. }
  2085. if (totalSize == (offset_t)-1)
  2086. totalSize = map[numParts-1].top;
  2087. else if (totalSize != map[numParts-1].top)
  2088. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  2089. }
  2090. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  2091. : fileName(_fileName)
  2092. {
  2093. numParts = fdesc.numParts();
  2094. IPropertyTree &props = fdesc.queryProperties();
  2095. recordCount = props.getPropInt64("@recordCount", -1);
  2096. totalSize = props.getPropInt64("@size", -1);
  2097. assertex(numParts);
  2098. map = new FilePartMapElement[numParts];
  2099. for (unsigned i = 0; i < numParts; i++)
  2100. {
  2101. IPartDescriptor &part = *fdesc.queryPart(i);
  2102. IPropertyTree &partProps = part.queryProperties();
  2103. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  2104. map[i].base = i ? map[i-1].top : 0;
  2105. if (size==(unsigned __int64) -1)
  2106. {
  2107. if (i==numParts-1)
  2108. map[i].top = (unsigned __int64) -1;
  2109. else
  2110. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file sizes not known for file %s", fileName.get());
  2111. }
  2112. else
  2113. map[i].top = map[i].base + size;
  2114. }
  2115. if (totalSize == (offset_t)-1)
  2116. totalSize = map[numParts-1].top;
  2117. else if (totalSize != map[numParts-1].top)
  2118. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  2119. }
  2120. ~CFilePartMap()
  2121. {
  2122. delete [] map;
  2123. }
  2124. virtual bool IsShared() const { return CInterface::IsShared(); };
  2125. virtual unsigned mapOffset(offset_t pos) const
  2126. {
  2127. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  2128. if (!part)
  2129. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %" I64F "d in file %s out of range (max offset permitted is %" I64F "d)", pos, fileName.str(), totalSize);
  2130. return (part-map)+1;
  2131. }
  2132. virtual unsigned getNumParts() const
  2133. {
  2134. return numParts;
  2135. }
  2136. virtual offset_t getTotalSize() const
  2137. {
  2138. return totalSize;
  2139. }
  2140. virtual offset_t getRecordCount() const
  2141. {
  2142. return recordCount;
  2143. }
  2144. virtual offset_t getBase(unsigned part) const
  2145. {
  2146. if (part > numParts || part == 0)
  2147. {
  2148. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existent file part %d (valid are 1-%d)", part, numParts);
  2149. }
  2150. return map[part-1].base;
  2151. }
  2152. virtual offset_t getFileSize() const
  2153. {
  2154. return map[numParts-1].top;
  2155. }
  2156. };
  2157. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  2158. {
  2159. return new CFilePartMap(fileName, fdesc);
  2160. }
  2161. //====================================================================================================
  2162. class CFileIOArray : implements IFileIOArray, public CInterface
  2163. {
  2164. mutable CriticalSection crit;
  2165. mutable unsigned __int64 totalSize = (unsigned __int64) -1; // Calculated on demand, and cached
  2166. mutable StringAttr id; // Calculated on demand, and cached
  2167. IPointerArrayOf<IFileIO> files;
  2168. UnsignedArray subfiles;
  2169. StringArray filenames;
  2170. Int64Array bases;
  2171. int actualCrc = 0;
  2172. unsigned valid = 0;
  2173. bool multipleFormatsSeen = false;
  2174. void _getId() const
  2175. {
  2176. md5_state_t md5;
  2177. md5_byte_t digest[16];
  2178. md5_init(&md5);
  2179. ForEachItemIn(idx, files)
  2180. {
  2181. IFileIO *file = files.item(idx);
  2182. if (file)
  2183. {
  2184. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  2185. }
  2186. }
  2187. md5_finish(&md5, digest);
  2188. char digestStr[33];
  2189. for (int i = 0; i < 16; i++)
  2190. {
  2191. sprintf(&digestStr[i*2],"%02x", digest[i]);
  2192. }
  2193. id.set(digestStr, 32);
  2194. }
  2195. public:
  2196. IMPLEMENT_IINTERFACE;
  2197. virtual bool IsShared() const { return CInterface::IsShared(); };
  2198. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base) const override
  2199. {
  2200. if (!files.isItem(partNo))
  2201. {
  2202. DBGLOG("getFilePart requested invalid part %d", partNo);
  2203. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  2204. }
  2205. IFileIO *file = files.item(partNo);
  2206. if (!file)
  2207. {
  2208. base = 0;
  2209. return NULL;
  2210. }
  2211. base = bases.item(partNo);
  2212. return LINK(file);
  2213. }
  2214. virtual const char *queryLogicalFilename(unsigned partNo) const override
  2215. {
  2216. if (!filenames.isItem(partNo))
  2217. {
  2218. DBGLOG("queryLogicalFilename requested invalid part %d", partNo);
  2219. throw MakeStringException(ROXIE_FILE_ERROR, "queryLogicalFilename requested invalid part %d", partNo);
  2220. }
  2221. return filenames.item(partNo);
  2222. }
  2223. void addFile(IFileIO *f, offset_t base, unsigned subfile, const char *filename, int _actualCrc)
  2224. {
  2225. if (f)
  2226. valid++;
  2227. files.append(f);
  2228. bases.append(base);
  2229. if (_actualCrc)
  2230. {
  2231. if (actualCrc && actualCrc != _actualCrc)
  2232. multipleFormatsSeen = true;
  2233. else
  2234. actualCrc = _actualCrc;
  2235. }
  2236. // MORE - lots of duplication in subfiles and filenames arrays
  2237. subfiles.append(subfile);
  2238. filenames.append(filename ? filename : "");
  2239. }
  2240. virtual unsigned length() const override
  2241. {
  2242. return files.length();
  2243. }
  2244. virtual unsigned numValid() const override
  2245. {
  2246. return valid;
  2247. }
  2248. virtual int queryActualFormatCrc() const override
  2249. {
  2250. return actualCrc;
  2251. }
  2252. virtual bool allFormatsMatch() const override
  2253. {
  2254. return !multipleFormatsSeen;
  2255. }
  2256. virtual bool isValid(unsigned partNo) const override
  2257. {
  2258. if (!files.isItem(partNo))
  2259. return false;
  2260. IFileIO *file = files.item(partNo);
  2261. if (!file)
  2262. return false;
  2263. return true;
  2264. }
  2265. virtual unsigned __int64 size() const override
  2266. {
  2267. CriticalBlock b(crit);
  2268. if (totalSize == (unsigned __int64) -1)
  2269. {
  2270. totalSize = 0;
  2271. ForEachItemIn(idx, files)
  2272. {
  2273. IFileIO *file = files.item(idx);
  2274. if (file)
  2275. totalSize += file->size();
  2276. }
  2277. }
  2278. return totalSize;
  2279. }
  2280. virtual StringBuffer &getId(StringBuffer &ret) const override
  2281. {
  2282. CriticalBlock b(crit);
  2283. if (!id)
  2284. _getId();
  2285. return ret.append(id);
  2286. }
  2287. virtual unsigned getSubFile(unsigned partNo) const override
  2288. {
  2289. return subfiles.item(partNo);
  2290. }
  2291. };
  2292. class CTranslatorSet : implements CInterfaceOf<ITranslatorSet>
  2293. {
  2294. IConstPointerArrayOf<IDynamicTransform> transformers;
  2295. IConstPointerArrayOf<IKeyTranslator> keyTranslators;
  2296. IPointerArrayOf<IOutputMetaData> actualLayouts;
  2297. const RtlRecord &targetLayout;
  2298. int targetFormatCrc = 0;
  2299. bool anyTranslators = false;
  2300. bool anyKeyedTranslators = false;
  2301. bool translatorsMatch = true;
  2302. public:
  2303. CTranslatorSet(const RtlRecord &_targetLayout, int _targetFormatCrc)
  2304. : targetLayout(_targetLayout), targetFormatCrc(_targetFormatCrc)
  2305. {}
  2306. void addTranslator(const IDynamicTransform *translator, const IKeyTranslator *keyTranslator, IOutputMetaData *actualLayout)
  2307. {
  2308. assertex(actualLayout);
  2309. if (translator || keyTranslator)
  2310. anyTranslators = true;
  2311. if (translator && translator->keyedTranslated())
  2312. anyKeyedTranslators = true;
  2313. if (transformers.ordinality() && (translator != transformers.item(0)))
  2314. translatorsMatch = false;
  2315. transformers.append(translator);
  2316. keyTranslators.append(keyTranslator);
  2317. actualLayouts.append(actualLayout);
  2318. }
  2319. virtual const RtlRecord &queryTargetFormat() const override
  2320. {
  2321. return targetLayout;
  2322. }
  2323. virtual int queryTargetFormatCrc() const override
  2324. {
  2325. return targetFormatCrc;
  2326. }
  2327. virtual const IDynamicTransform *queryTranslator(unsigned subFile) const override
  2328. {
  2329. // We need to have translated partnos to subfiles before calling this!
  2330. // Note: while the required projected format will be the same for all parts, the
  2331. // actual layout - and thus the required translation - may not be, for example if
  2332. // we have a superfile with mismatching formats.
  2333. if (anyTranslators && transformers.isItem(subFile))
  2334. return transformers.item(subFile);
  2335. return nullptr;
  2336. }
  2337. virtual const IKeyTranslator *queryKeyTranslator(unsigned subFile) const override
  2338. {
  2339. if (anyTranslators && keyTranslators.isItem(subFile))
  2340. return keyTranslators.item(subFile);
  2341. return nullptr;
  2342. }
  2343. virtual ISourceRowPrefetcher *getPrefetcher(unsigned subFile) const override
  2344. {
  2345. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  2346. assertex(actualLayout);
  2347. return actualLayout->createDiskPrefetcher();
  2348. }
  2349. virtual IOutputMetaData *queryActualLayout(unsigned subFile) const override
  2350. {
  2351. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  2352. assertex(actualLayout);
  2353. return actualLayout;
  2354. }
  2355. virtual bool isTranslating() const override
  2356. {
  2357. return anyTranslators;
  2358. }
  2359. virtual bool isTranslatingKeyed() const override
  2360. {
  2361. return anyKeyedTranslators;
  2362. }
  2363. virtual bool hasConsistentTranslation() const override
  2364. {
  2365. return translatorsMatch;
  2366. }
  2367. };
  2368. template <class X> class PerChannelCacheOf
  2369. {
  2370. IPointerArrayOf<X> cache;
  2371. UnsignedArray channels;
  2372. public:
  2373. // NOTE - typically only a couple of entries (but see PerFormatCacheOf below
  2374. void set(X *value, unsigned channel)
  2375. {
  2376. cache.append(value);
  2377. channels.append(channel);
  2378. }
  2379. X *get(unsigned channel) const
  2380. {
  2381. ForEachItemIn(idx, channels)
  2382. {
  2383. if (channels.item(idx)==channel)
  2384. return cache.item(idx);
  2385. }
  2386. return NULL;
  2387. }
  2388. };
  2389. template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
  2390. {
  2391. // Identical for now, but characteristics are different so implementations may diverge.
  2392. // For example, this one may want to be a hash table, and there may be many more entries
  2393. };
  2394. class CResolvedFile : implements IResolvedFileCreator, implements ISafeSDSSubscription, public CInterface
  2395. {
  2396. protected:
  2397. IResolvedFileCache *cached;
  2398. StringAttr lfn;
  2399. StringAttr physicalName;
  2400. Owned<IDistributedFile> dFile; // NULL on copies serialized to agents. Note that this implies we keep a lock on dali file for the lifetime of this object.
  2401. CDateTime fileTimeStamp;
  2402. offset_t fileSize;
  2403. unsigned fileCheckSum;
  2404. RoxieFileType fileType;
  2405. bool isSuper;
  2406. StringArray subNames;
  2407. IPointerArrayOf<IFileDescriptor> subFiles; // note - on agents, the file descriptors may have incomplete info. On originating server is always complete
  2408. IPointerArrayOf<IFileDescriptor> remoteSubFiles; // note - on agents, the file descriptors may have incomplete info. On originating server is always complete
  2409. IntArray formatCrcs;
  2410. IPointerArrayOf<IOutputMetaData> diskTypeInfo; // New info using RtlTypeInfo structures
  2411. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  2412. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  2413. Owned <IPropertyTree> properties;
  2414. Linked<IRoxieDaliHelper> daliHelper;
  2415. Owned<IDaliPackageWatcher> notifier;
  2416. virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
  2417. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  2418. {
  2419. subNames.append(subName);
  2420. subFiles.append(fdesc);
  2421. remoteSubFiles.append(remoteFDesc);
  2422. IPropertyTree const & props = fdesc->queryProperties();
  2423. // NOTE - grouping is not included in the formatCRC, nor is the trailing byte that indicates grouping
  2424. // included in the rtlTypeInfo.
  2425. const char *kind = props.queryProp("@kind");
  2426. if (kind)
  2427. {
  2428. RoxieFileType thisFileType = streq(kind, "key") ? ROXIE_KEY : ROXIE_FILE;
  2429. if (subFiles.length()==1)
  2430. fileType = thisFileType;
  2431. else
  2432. assertex(thisFileType==fileType);
  2433. }
  2434. bool isGrouped = props.getPropBool("@grouped", false);
  2435. int formatCrc = props.getPropInt("@formatCrc", 0);
  2436. // If formatCrc and grouping are same as previous, reuse previous typeInfo
  2437. Owned<IOutputMetaData> actualFormat;
  2438. unsigned prevIdx = formatCrcs.length()-1;
  2439. if (formatCrcs.length() && formatCrc == formatCrcs.item(prevIdx) &&
  2440. diskTypeInfo.item(prevIdx) && isGrouped==diskTypeInfo.item(prevIdx)->isGrouped())
  2441. actualFormat.set(diskTypeInfo.item(prevIdx));
  2442. else
  2443. actualFormat.setown(getDaliLayoutInfo(props));
  2444. diskTypeInfo.append(actualFormat.getClear());
  2445. formatCrcs.append(formatCrc);
  2446. unsigned numParts = fdesc->numParts();
  2447. offset_t base = 0;
  2448. for (unsigned i = 0; i < numParts; i++)
  2449. {
  2450. IPartDescriptor *pdesc = fdesc->queryPart(i);
  2451. IPropertyTree &partProps = pdesc->queryProperties();
  2452. offset_t dfsSize = partProps.getPropInt64("@size");
  2453. partProps.setPropInt64("@offset", base);
  2454. base += dfsSize;
  2455. }
  2456. fileSize += base;
  2457. }
  2458. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2459. {
  2460. if (traceLevel > 2)
  2461. DBGLOG("Superfile %s change detected", lfn.get());
  2462. {
  2463. CriticalBlock b(lock);
  2464. if (cached)
  2465. {
  2466. cached->removeCache(this);
  2467. cached = NULL;
  2468. }
  2469. }
  2470. globalPackageSetManager->requestReload(false, false);
  2471. }
  2472. // We cache all the file maps/arrays etc here.
  2473. mutable CriticalSection lock;
  2474. mutable Owned<IFilePartMap> fileMap;
  2475. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  2476. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  2477. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  2478. public:
  2479. IMPLEMENT_IINTERFACE;
  2480. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* _daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
  2481. : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile), daliHelper(_daliHelper)
  2482. {
  2483. cached = NULL;
  2484. fileSize = 0;
  2485. fileCheckSum = 0;
  2486. if (dFile)
  2487. {
  2488. if (traceLevel > 5)
  2489. DBGLOG("Roxie server adding information for file %s", lfn.get());
  2490. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  2491. dFile->getFileCheckSum(fileCheckSum);
  2492. assertex(tsSet); // per Nigel, is always set
  2493. IDistributedSuperFile *superFile = dFile->querySuperFile();
  2494. if (superFile)
  2495. {
  2496. isSuper = true;
  2497. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  2498. ForEach(*subs)
  2499. {
  2500. IDistributedFile &sub = subs->query();
  2501. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  2502. Owned<IFileDescriptor> remoteFDesc;
  2503. if (daliHelper)
  2504. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt, defaultPrivilegedUser));
  2505. subDFiles.append(OLINK(sub));
  2506. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  2507. }
  2508. // We have to clone the properties since we don't want to keep the superfile locked
  2509. properties.setown(createPTreeFromIPT(&dFile->queryAttributes(), ipt_lowmem));
  2510. if (!isDynamic && !lockSuperFiles)
  2511. {
  2512. notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
  2513. dFile.clear(); // We don't lock superfiles, except dynamic ones
  2514. }
  2515. }
  2516. else // normal file, not superkey
  2517. {
  2518. isSuper = false;
  2519. properties.set(&dFile->queryAttributes());
  2520. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  2521. Owned<IFileDescriptor> remoteFDesc;
  2522. if (daliHelper)
  2523. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt, defaultPrivilegedUser));
  2524. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  2525. }
  2526. }
  2527. }
  2528. virtual void beforeDispose()
  2529. {
  2530. if (notifier)
  2531. daliHelper->releaseSubscription(notifier);
  2532. notifier.clear();
  2533. if (cached)
  2534. {
  2535. cached->removeCache(this);
  2536. }
  2537. }
  2538. virtual unsigned numSubFiles() const
  2539. {
  2540. return subNames.length();
  2541. }
  2542. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  2543. {
  2544. if (subNames.isItem(num))
  2545. {
  2546. name.append(subNames.item(num));
  2547. return true;
  2548. }
  2549. else
  2550. {
  2551. return false;
  2552. }
  2553. }
  2554. virtual unsigned findSubName(const char *subname) const
  2555. {
  2556. ForEachItemIn(idx, subNames)
  2557. {
  2558. if (strieq(subNames.item(idx), subname))
  2559. return idx;
  2560. }
  2561. return NotFound;
  2562. }
  2563. virtual unsigned getContents(StringArray &contents) const
  2564. {
  2565. ForEachItemIn(idx, subNames)
  2566. {
  2567. contents.append(subNames.item(idx));
  2568. }
  2569. return subNames.length();
  2570. }
  2571. virtual bool isSuperFile() const
  2572. {
  2573. return isSuper;
  2574. }
  2575. virtual bool isKey() const
  2576. {
  2577. return fileType==ROXIE_KEY;
  2578. }
  2579. virtual IFilePartMap *getFileMap() const
  2580. {
  2581. CriticalBlock b(lock);
  2582. if (!fileMap)
  2583. {
  2584. if (subFiles.length())
  2585. {
  2586. if (subFiles.length()!=1)
  2587. throw MakeStringException(0, "Roxie does not support FETCH or KEYED JOIN to superkey with multiple parts");
  2588. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  2589. }
  2590. }
  2591. return fileMap.getLink();
  2592. }
  2593. virtual unsigned getNumParts() const
  2594. {
  2595. CriticalBlock b(lock);
  2596. unsigned numParts = 0;
  2597. ForEachItemIn(idx, subFiles)
  2598. {
  2599. unsigned thisNumParts = subFiles.item(idx)->numParts();
  2600. if (thisNumParts > numParts)
  2601. numParts = thisNumParts;
  2602. }
  2603. return numParts;
  2604. }
  2605. bool serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  2606. {
  2607. // Find all the partno's that go to this channel
  2608. unsigned numParts = fdesc->numParts();
  2609. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  2610. numParts--; // don't want to send TLK
  2611. UnsignedArray partNos;
  2612. for (unsigned i = 1; i <= numParts; i++)
  2613. {
  2614. if (getBondedChannel(i)==channel || !isLocal)
  2615. {
  2616. partNos.append(i-1);
  2617. }
  2618. }
  2619. fdesc->serializeParts(mb, partNos);
  2620. return partNos.length();
  2621. }
  2622. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const override
  2623. {
  2624. if (traceLevel > 6)
  2625. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  2626. byte type = (byte) fileType;
  2627. mb.append(type);
  2628. fileTimeStamp.serialize(mb);
  2629. mb.append(fileCheckSum);
  2630. mb.append(fileSize);
  2631. mb.append(isSuper);
  2632. unsigned numSubFiles = subFiles.length();
  2633. mb.append(numSubFiles);
  2634. ForEachItemIn(idx, subFiles)
  2635. {
  2636. mb.append(subNames.item(idx));
  2637. IFileDescriptor *fdesc = subFiles.item(idx);
  2638. bool anyparts = serializeFDesc(mb, fdesc, channel, isLocal);
  2639. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2640. if (remoteFDesc)
  2641. {
  2642. mb.append(true);
  2643. anyparts |= serializeFDesc(mb, remoteFDesc, channel, isLocal);
  2644. }
  2645. else
  2646. mb.append(false);
  2647. mb.append(formatCrcs.item(idx));
  2648. IOutputMetaData *diskType = diskTypeInfo.item(idx);
  2649. if (anyparts && diskType)
  2650. {
  2651. if (idx && formatCrcs.item(idx)==formatCrcs.item(idx-1))
  2652. mb.append((byte) 3); // indicating same format as previous
  2653. else
  2654. {
  2655. mb.append((byte) (diskType->isGrouped() ? 2 : 1));
  2656. verifyex(dumpTypeInfo(mb, diskType->queryTypeInfo())); // Must be serializable, as we deserialized it...
  2657. }
  2658. }
  2659. else
  2660. mb.append((byte) 0);
  2661. }
  2662. if (properties)
  2663. {
  2664. mb.append(true);
  2665. properties->serialize(mb);
  2666. }
  2667. else
  2668. mb.append(false);
  2669. }
  2670. static FileFormatMode getMode(IFileDescriptor *fileDesc)
  2671. {
  2672. if (isFileKey(fileDesc))
  2673. return FileFormatMode::index;
  2674. else
  2675. {
  2676. const char *kind = fileDesc->queryKind();
  2677. if (kind)
  2678. {
  2679. if (streq("csv", kind))
  2680. return FileFormatMode::csv;
  2681. else if (streq("xml", kind))
  2682. return FileFormatMode::xml;
  2683. else if (streq("json", kind))
  2684. return FileFormatMode::xml; // MORE - is that right?
  2685. }
  2686. return FileFormatMode::flat;
  2687. }
  2688. }
  2689. virtual ITranslatorSet *getTranslators(int projectedFormatCrc, IOutputMetaData *projected, int expectedFormatCrc, IOutputMetaData *expected, RecordTranslationMode mode, FileFormatMode fileMode, const char *queryName) const override
  2690. {
  2691. // NOTE - projected and expected and anything fetched from them such as type info may reside in dynamically loaded (and unloaded)
  2692. // query DLLs - this means it is not safe to include them in any sort of cache that might outlive the current query.
  2693. Owned<CTranslatorSet> result = new CTranslatorSet(expected->queryRecordAccessor(true), projectedFormatCrc);
  2694. Owned<const IDynamicTransform> translator; // Translates rows from actual to projected
  2695. Owned<const IKeyTranslator> keyedTranslator; // translate filter conditions from expected to actual
  2696. int prevFormatCrc = 0;
  2697. assertex(projected != nullptr);
  2698. ForEachItemIn(idx, subFiles)
  2699. {
  2700. IFileDescriptor *subFile = subFiles.item(idx);
  2701. IOutputMetaData *actual = expected;
  2702. if (subFile)
  2703. {
  2704. FileFormatMode actualMode = getMode(subFile);
  2705. const char *subname = subNames.item(idx);
  2706. if (fileMode!=actualMode)
  2707. {
  2708. if (traceLevel>0)
  2709. DBGLOG("In query %s: Not translating %s as file type does not match", queryName, subname);
  2710. }
  2711. else if (projectedFormatCrc != 0) // projectedFormatCrc is currently 0 for csv/xml which should not create translators.
  2712. {
  2713. int thisFormatCrc = 0;
  2714. bool actualUnknown = true;
  2715. if (mode == RecordTranslationMode::AlwaysECL)
  2716. {
  2717. if (formatCrcs.item(idx) && expectedFormatCrc && (formatCrcs.item(idx) != expectedFormatCrc))
  2718. DBGLOG("Overriding stored record layout reading file %s", subname);
  2719. thisFormatCrc = expectedFormatCrc;
  2720. }
  2721. else
  2722. {
  2723. thisFormatCrc = formatCrcs.item(idx);
  2724. if (diskTypeInfo.item(idx))
  2725. {
  2726. actual = diskTypeInfo.item(idx);
  2727. actualUnknown = false;
  2728. }
  2729. else if (thisFormatCrc == expectedFormatCrc) // Type descriptors that cannot be serialized can still be read from code
  2730. {
  2731. actual = expected;
  2732. actualUnknown = false;
  2733. }
  2734. }
  2735. assertex(actual);
  2736. if ((thisFormatCrc != prevFormatCrc) || (idx == 0)) // Check if same translation as last subfile
  2737. {
  2738. translator.clear();
  2739. keyedTranslator.clear();
  2740. //Check if the file requires translation, but translation is disabled
  2741. if (thisFormatCrc && expectedFormatCrc && (thisFormatCrc != expectedFormatCrc) && (mode == RecordTranslationMode::None))
  2742. throwTranslationError(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true), subname);
  2743. if (thisFormatCrc == expectedFormatCrc && projectedFormatCrc == expectedFormatCrc && (actualUnknown || alwaysTrustFormatCrcs))
  2744. {
  2745. if (traceLevel > 5)
  2746. DBGLOG("In query %s: Assume no translation required for file %s, crc's match", queryName, subname);
  2747. }
  2748. else if (actualUnknown && mode != RecordTranslationMode::AlwaysECL)
  2749. {
  2750. if (thisFormatCrc)
  2751. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s (disk format not serialized)", subname);
  2752. else if (traceLevel > 5)
  2753. DBGLOG("In query %s: Assume no translation required for %s, disk format unknown", queryName, subname);
  2754. }
  2755. else
  2756. {
  2757. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), actual->queryRecordAccessor(true)));
  2758. if (traceLevel>0 && traceTranslations)
  2759. {
  2760. DBGLOG("In query %s: Record layout translator created for %s", queryName, subname);
  2761. translator->describe();
  2762. }
  2763. if (!translator || !translator->canTranslate())
  2764. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  2765. else if (translator->needsTranslate())
  2766. {
  2767. if (fileMode==FileFormatMode::index && translator->keyedTranslated())
  2768. throw MakeStringException(ROXIE_MISMATCH, "Record layout mismatch detected in keyed fields for file %s", subname);
  2769. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  2770. }
  2771. else
  2772. translator.clear();
  2773. }
  2774. }
  2775. prevFormatCrc = thisFormatCrc;
  2776. }
  2777. }
  2778. else if (traceLevel > 5)
  2779. DBGLOG("In query %s: Assume no translation required, subfile is null", queryName);
  2780. result->addTranslator(LINK(translator), LINK(keyedTranslator), LINK(actual));
  2781. }
  2782. return result.getClear();
  2783. }
  2784. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  2785. {
  2786. CriticalBlock b(lock);
  2787. IFileIOArray *ret = ioArrayMap.get(channel);
  2788. if (!ret)
  2789. {
  2790. ret = createIFileIOArray(isOpt, channel);
  2791. ioArrayMap.set(ret, channel);
  2792. }
  2793. return LINK(ret);
  2794. }
  2795. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  2796. {
  2797. Owned<CFileIOArray> f = new CFileIOArray;
  2798. f->addFile(nullptr, 0, 0, nullptr, 0);
  2799. ForEachItemIn(idx, subFiles)
  2800. {
  2801. IFileDescriptor *fdesc = subFiles.item(idx);
  2802. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2803. const char *subname = subNames.item(idx);
  2804. int thisFormatCrc = formatCrcs.item(idx);
  2805. if (fdesc)
  2806. {
  2807. unsigned numParts = fdesc->numParts();
  2808. for (unsigned i = 1; i <= numParts; i++)
  2809. {
  2810. if (!channel || getBondedChannel(i)==channel)
  2811. {
  2812. try
  2813. {
  2814. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  2815. assertex(pdesc);
  2816. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  2817. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  2818. IPropertyTree &partProps = pdesc->queryProperties();
  2819. f->addFile(file.getClear(), partProps.getPropInt64("@offset"), idx, subname, thisFormatCrc);
  2820. }
  2821. catch (IException *E)
  2822. {
  2823. StringBuffer err;
  2824. err.append("Could not load file ");
  2825. fdesc->getTraceName(err);
  2826. DBGLOG(E, err.str());
  2827. if (!isOpt)
  2828. throw;
  2829. E->Release();
  2830. f->addFile(nullptr, 0, idx, nullptr, 0);
  2831. }
  2832. }
  2833. else
  2834. f->addFile(nullptr, 0, idx, nullptr, 0);
  2835. }
  2836. }
  2837. }
  2838. return f.getClear();
  2839. }
  2840. virtual IKeyArray *getKeyArray(bool isOpt, unsigned channel) const override
  2841. {
  2842. unsigned maxParts = 0;
  2843. ForEachItemIn(subFile, subFiles)
  2844. {
  2845. IFileDescriptor *fdesc = subFiles.item(subFile);
  2846. if (fdesc)
  2847. {
  2848. unsigned numParts = fdesc->numParts();
  2849. if (numParts > 1)
  2850. numParts--; // Don't include TLK
  2851. if (numParts > maxParts)
  2852. maxParts = numParts;
  2853. }
  2854. }
  2855. CriticalBlock b(lock);
  2856. IKeyArray *ret = keyArrayMap.get(channel);
  2857. if (!ret)
  2858. {
  2859. ret = createKeyArray(isOpt, channel, maxParts);
  2860. keyArrayMap.set(ret, channel);
  2861. }
  2862. return LINK(ret);
  2863. }
  2864. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  2865. {
  2866. Owned<IKeyArray> ret = ::createKeyArray();
  2867. if (channel)
  2868. {
  2869. ret->addKey(NULL);
  2870. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  2871. {
  2872. if (channel == getBondedChannel(partNo))
  2873. {
  2874. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2875. ForEachItemIn(idx, subFiles)
  2876. {
  2877. IFileDescriptor *fdesc = subFiles.item(idx);
  2878. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2879. Owned <ILazyFileIO> part;
  2880. unsigned crc = 0;
  2881. if (fdesc) // NB there may be no parts for this channel
  2882. {
  2883. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  2884. if (pdesc)
  2885. {
  2886. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  2887. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  2888. pdesc->getCrc(crc);
  2889. }
  2890. }
  2891. if (part)
  2892. {
  2893. if (lazyOpen)
  2894. {
  2895. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2896. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), part->getFileIdx(), false));
  2897. }
  2898. else
  2899. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), part->getFileIdx(), false));
  2900. }
  2901. else
  2902. keyset->addIndex(NULL);
  2903. }
  2904. ret->addKey(keyset.getClear());
  2905. }
  2906. else
  2907. ret->addKey(NULL);
  2908. }
  2909. }
  2910. else
  2911. {
  2912. // Channel 0 means return the TLK
  2913. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2914. ForEachItemIn(idx, subFiles)
  2915. {
  2916. IFileDescriptor *fdesc = subFiles.item(idx);
  2917. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2918. Owned<IKeyIndexBase> key;
  2919. if (fdesc)
  2920. {
  2921. unsigned numParts = fdesc->numParts();
  2922. assertex(numParts > 0);
  2923. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  2924. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  2925. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  2926. unsigned crc = 0;
  2927. pdesc->getCrc(crc);
  2928. StringBuffer pname;
  2929. pdesc->getPath(pname);
  2930. if (lazyOpen)
  2931. {
  2932. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2933. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), keyFile->getFileIdx(), numParts>1));
  2934. }
  2935. else
  2936. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), keyFile->getFileIdx(), numParts>1));
  2937. keyset->addIndex(LINK(key->queryPart(0)));
  2938. }
  2939. else
  2940. keyset->addIndex(NULL);
  2941. }
  2942. if (keyset->numParts())
  2943. ret->addKey(keyset.getClear());
  2944. else if (!isOpt)
  2945. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  2946. else if (traceLevel > 4)
  2947. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  2948. }
  2949. return ret.getClear();
  2950. }
  2951. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IOutputMetaData *preloadLayout, bool preload) const
  2952. {
  2953. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  2954. // Failures to resolve will not be cached, only successes.
  2955. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  2956. CriticalBlock b(lock);
  2957. IInMemoryIndexManager *ret = indexMap.get(channel);
  2958. if (!ret)
  2959. {
  2960. ret = createInMemoryIndexManager(preloadLayout->queryRecordAccessor(true), isOpt, lfn);
  2961. Owned<IFileIOArray> files = getIFileIOArray(isOpt, channel);
  2962. ret->load(files, preloadLayout, preload); // note - files (passed in) are also channel specific
  2963. indexMap.set(ret, channel);
  2964. }
  2965. return LINK(ret);
  2966. }
  2967. virtual const CDateTime &queryTimeStamp() const
  2968. {
  2969. return fileTimeStamp;
  2970. }
  2971. virtual unsigned queryCheckSum() const
  2972. {
  2973. return fileCheckSum;
  2974. }
  2975. virtual offset_t getFileSize() const
  2976. {
  2977. return fileSize;
  2978. }
  2979. virtual hash64_t addHash64(hash64_t hashValue) const
  2980. {
  2981. hashValue = fileTimeStamp.getHash(hashValue);
  2982. if (fileCheckSum)
  2983. hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
  2984. return hashValue;
  2985. }
  2986. virtual void addSubFile(const IResolvedFile *_sub)
  2987. {
  2988. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  2989. if (subFiles.length())
  2990. assertex(sub->fileType==fileType);
  2991. else
  2992. fileType = sub->fileType;
  2993. subRFiles.append((IResolvedFile &) *LINK(_sub));
  2994. ForEachItemIn(idx, sub->subFiles)
  2995. {
  2996. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  2997. }
  2998. }
  2999. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  3000. {
  3001. addFile(lfn, _sub, _remoteSub);
  3002. }
  3003. virtual void addSubFile(const char *localFileName)
  3004. {
  3005. Owned<IFile> file = createIFile(localFileName);
  3006. assertex(file->exists());
  3007. offset_t size = file->size();
  3008. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  3009. if (isIndexFile(file))
  3010. fdesc->queryProperties().setProp("@kind", "key");
  3011. Owned<IPropertyTree> pp = createPTree("Part", ipt_lowmem);
  3012. pp->setPropInt64("@size",size);
  3013. pp->setPropBool("@local", true);
  3014. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  3015. addSubFile(fdesc.getClear(), NULL);
  3016. }
  3017. virtual void setCache(IResolvedFileCache *cache)
  3018. {
  3019. if (cached)
  3020. {
  3021. if (traceLevel > 9)
  3022. DBGLOG("setCache removing from prior cache %s", queryFileName());
  3023. if (cache==NULL)
  3024. cached->removeCache(this);
  3025. else
  3026. throwUnexpected();
  3027. }
  3028. cached = cache;
  3029. }
  3030. virtual bool isAliveAndLink() const
  3031. {
  3032. return CInterface::isAliveAndLink();
  3033. }
  3034. virtual const char *queryFileName() const
  3035. {
  3036. return lfn.get();
  3037. }
  3038. virtual const char *queryPhysicalName() const
  3039. {
  3040. return physicalName.get();
  3041. }
  3042. virtual const IPropertyTree *queryProperties() const
  3043. {
  3044. return properties;
  3045. }
  3046. virtual void remove()
  3047. {
  3048. subFiles.kill();
  3049. subDFiles.kill();
  3050. subRFiles.kill();
  3051. subNames.kill();
  3052. remoteSubFiles.kill();
  3053. properties.clear();
  3054. notifier.clear();
  3055. if (isSuper)
  3056. {
  3057. // Because we don't lock superfiles, we need to behave differently
  3058. UNIMPLEMENTED;
  3059. }
  3060. else if (dFile)
  3061. {
  3062. dFile->detach();
  3063. }
  3064. else if (!physicalName.isEmpty())
  3065. {
  3066. try
  3067. {
  3068. Owned<IFile> file = createIFile(physicalName.get());
  3069. file->remove();
  3070. }
  3071. catch (IException *e)
  3072. {
  3073. OERRLOG(-1, "Error removing file %s (%s)", lfn.get(), physicalName.get());
  3074. e->Release();
  3075. }
  3076. }
  3077. }
  3078. virtual bool exists() const
  3079. {
  3080. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  3081. // This will make more sense if/when we start to lock earlier.
  3082. if (dFile || isSuper)
  3083. return true; // MORE - may need some thought - especially the isSuper case
  3084. else if (!physicalName.isEmpty())
  3085. return checkFileExists(physicalName.get());
  3086. else
  3087. return false;
  3088. }
  3089. virtual bool isRestrictedAccess() const override
  3090. {
  3091. return (dFile && dFile->isRestrictedAccess());
  3092. }
  3093. };
  3094. /*----------------------------------------------------------------------------------------------------------
  3095. MORE
  3096. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  3097. ------------------------------------------------------------------------------------------------------------*/
  3098. class CAgentDynamicFile : public CResolvedFile
  3099. {
  3100. public:
  3101. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  3102. bool isLocal;
  3103. unsigned channel;
  3104. ServerIdentifier serverId;
  3105. public:
  3106. CAgentDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  3107. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), isOpt(_isOpt), isLocal(_isLocal), channel(header->channel), serverId(header->serverId)
  3108. {
  3109. // call back to the server to get the info
  3110. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  3111. try
  3112. {
  3113. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK, 0); // subchannel not relevant
  3114. bool ok = false;
  3115. for (unsigned i = 0; i < callbackRetries; i++)
  3116. {
  3117. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  3118. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  3119. char *buf = (char *) output->getBuffer(len, true);
  3120. buf[0] = isOpt;
  3121. buf[1] = isLocal;
  3122. strcpy(buf+2, lfn.get());
  3123. output->putBuffer(buf, len, true);
  3124. output->flush();
  3125. output.clear();
  3126. if (callback->wait(callbackTimeout))
  3127. {
  3128. ok = true;
  3129. break;
  3130. }
  3131. else
  3132. {
  3133. DBGLOG("timed out waiting for server callback - retrying");
  3134. }
  3135. }
  3136. if (ok)
  3137. {
  3138. if (traceLevel > 6)
  3139. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  3140. MemoryBuffer &serverData = callback->queryData();
  3141. byte type;
  3142. serverData.read(type);
  3143. fileType = (RoxieFileType) type;
  3144. fileTimeStamp.deserialize(serverData);
  3145. serverData.read(fileCheckSum);
  3146. serverData.read(fileSize);
  3147. serverData.read(isSuper);
  3148. unsigned numSubFiles;
  3149. serverData.read(numSubFiles);
  3150. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  3151. {
  3152. StringBuffer subName;
  3153. serverData.read(subName);
  3154. subNames.append(subName.str());
  3155. deserializeFilePart(serverData, subFiles, fileNo, false);
  3156. bool remotePresent;
  3157. serverData.read(remotePresent);
  3158. if (remotePresent)
  3159. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  3160. else
  3161. remoteSubFiles.append(NULL);
  3162. unsigned formatCrc;
  3163. serverData.read(formatCrc);
  3164. formatCrcs.append(formatCrc);
  3165. byte diskTypeInfoPresent;
  3166. serverData.read(diskTypeInfoPresent);
  3167. switch (diskTypeInfoPresent)
  3168. {
  3169. case 0:
  3170. diskTypeInfo.append(NULL);
  3171. break;
  3172. case 1:
  3173. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, false));
  3174. break;
  3175. case 2:
  3176. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, true));
  3177. break;
  3178. case 3:
  3179. assertex(fileNo > 0);
  3180. diskTypeInfo.append(LINK(diskTypeInfo.item(fileNo-1)));
  3181. break;
  3182. default:
  3183. throwUnexpected();
  3184. }
  3185. }
  3186. bool propertiesPresent;
  3187. serverData.read(propertiesPresent);
  3188. if (propertiesPresent)
  3189. properties.setown(createPTree(serverData, ipt_lowmem));
  3190. }
  3191. else
  3192. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  3193. }
  3194. catch (...)
  3195. {
  3196. ROQ->removePendingCallback(callback);
  3197. throw;
  3198. }
  3199. ROQ->removePendingCallback(callback);
  3200. }
  3201. private:
  3202. void deserializeFilePart(MemoryBuffer &serverData, IPointerArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  3203. {
  3204. IArrayOf<IPartDescriptor> parts;
  3205. deserializePartFileDescriptors(serverData, parts);
  3206. if (parts.length())
  3207. {
  3208. files.append(LINK(&parts.item(0).queryOwner()));
  3209. }
  3210. else
  3211. {
  3212. if (traceLevel > 6)
  3213. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  3214. files.append(NULL);
  3215. }
  3216. }
  3217. };
  3218. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  3219. {
  3220. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
  3221. }
  3222. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
  3223. {
  3224. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  3225. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
  3226. }
  3227. class CAgentDynamicFileCache : implements IAgentDynamicFileCache, public CInterface
  3228. {
  3229. unsigned tableSize;
  3230. mutable CriticalSection crit;
  3231. CIArrayOf<CAgentDynamicFile> files; // expect numbers to be small - probably not worth hashing
  3232. public:
  3233. IMPLEMENT_IINTERFACE;
  3234. CAgentDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  3235. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal) override
  3236. {
  3237. if (logctx.queryTraceLevel() > 5)
  3238. {
  3239. StringBuffer s;
  3240. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  3241. }
  3242. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  3243. CriticalBlock b(crit);
  3244. if (!cacheDate.isNull())
  3245. {
  3246. unsigned idx = 0;
  3247. while (files.isItem(idx))
  3248. {
  3249. CAgentDynamicFile &f = files.item(idx);
  3250. if (f.channel==header->channel && f.serverId==header->serverId && stricmp(f.queryFileName(), lfn)==0)
  3251. {
  3252. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  3253. {
  3254. if (f.isKey())
  3255. clearKeyStoreCacheEntry(f.queryFileName());
  3256. files.remove(idx);
  3257. idx--;
  3258. }
  3259. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  3260. {
  3261. files.swap(idx, 0);
  3262. return LINK(&f);
  3263. }
  3264. }
  3265. idx++;
  3266. }
  3267. }
  3268. Owned<CAgentDynamicFile> ret;
  3269. {
  3270. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  3271. CriticalUnblock b1(crit);
  3272. ret.setown(new CAgentDynamicFile(logctx, lfn, header, isOpt, isLocal));
  3273. }
  3274. if (!ret->isSuperFile())
  3275. {
  3276. // Cache results for improved performance - we DON'T cache superfiles as they are liable to change during the course of a query.
  3277. // Note that even caching non-superfiles is also potentially going to give stale results, if the cache persists beyond the current
  3278. // query.
  3279. while (files.length() > tableSize)
  3280. files.remove(files.length()-1);
  3281. files.add(*ret.getLink(), 0);
  3282. }
  3283. return ret.getClear();
  3284. }
  3285. virtual void releaseAll() override
  3286. {
  3287. CriticalBlock b(crit);
  3288. files.kill();
  3289. }
  3290. };
  3291. static CriticalSection agentDynamicFileCacheCrit;
  3292. static Owned<IAgentDynamicFileCache> agentDynamicFileCache;
  3293. extern IAgentDynamicFileCache *queryAgentDynamicFileCache()
  3294. {
  3295. if (!agentDynamicFileCache)
  3296. {
  3297. CriticalBlock b(agentDynamicFileCacheCrit);
  3298. if (!agentDynamicFileCache)
  3299. agentDynamicFileCache.setown(new CAgentDynamicFileCache(20));
  3300. }
  3301. return agentDynamicFileCache;
  3302. }
  3303. extern void releaseAgentDynamicFileCache()
  3304. {
  3305. CriticalBlock b(agentDynamicFileCacheCrit);
  3306. if (agentDynamicFileCache)
  3307. agentDynamicFileCache->releaseAll();
  3308. }
  3309. static Singleton<CRoxieFileCache> fileCache;
  3310. // Initialization/termination
  3311. MODULE_INIT(INIT_PRIORITY_STANDARD)
  3312. {
  3313. return true;
  3314. }
  3315. MODULE_EXIT()
  3316. {
  3317. auto cache = fileCache.queryExisting();
  3318. if (cache)
  3319. {
  3320. cache->join();
  3321. cache->Release();
  3322. }
  3323. }
  3324. extern IRoxieFileCache &queryFileCache()
  3325. {
  3326. return *fileCache.query([] { return new CRoxieFileCache; });
  3327. }
  3328. class CRoxieWriteHandler : implements IRoxieWriteHandler, public CInterface
  3329. {
  3330. public:
  3331. IMPLEMENT_IINTERFACE;
  3332. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  3333. : daliHelper(_daliHelper), dFile(_dFile)
  3334. {
  3335. ForEachItemIn(idx, _clusters)
  3336. {
  3337. addCluster(_clusters.item(idx));
  3338. }
  3339. if (dFile->queryDistributedFile())
  3340. {
  3341. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  3342. if (isTemporary)
  3343. {
  3344. UNIMPLEMENTED;
  3345. }
  3346. else
  3347. localFile.setown(dFile->getPartFile(0, 0));
  3348. }
  3349. else
  3350. {
  3351. isTemporary = false;
  3352. localFile.setown(dFile->getPartFile(0, 0));
  3353. }
  3354. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  3355. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  3356. }
  3357. virtual IFile *queryFile() const
  3358. {
  3359. return localFile;
  3360. }
  3361. void getClusters(StringArray &clusters) const
  3362. {
  3363. ForEachItemIn(idx, allClusters)
  3364. {
  3365. clusters.append(allClusters.item(idx));
  3366. }
  3367. }
  3368. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  3369. {
  3370. if (success)
  3371. {
  3372. copyPhysical();
  3373. if (daliHelper && daliHelper->connected())
  3374. publish(activity);
  3375. }
  3376. if (isTemporary || !success)
  3377. {
  3378. localFile->remove();
  3379. }
  3380. }
  3381. private:
  3382. bool isTemporary;
  3383. Linked<IRoxieDaliHelper> daliHelper;
  3384. Owned<ILocalOrDistributedFile> dFile;
  3385. Owned<IFile> localFile;
  3386. Owned<IGroup> localCluster;
  3387. StringAttr localClusterName;
  3388. IArrayOf<IGroup> remoteNodes;
  3389. StringArray allClusters;
  3390. void copyPhysical() const
  3391. {
  3392. RemoteFilename rfn, rdn;
  3393. dFile->getPartFilename(rfn, 0, 0);
  3394. StringBuffer physicalName, physicalDir, physicalBase;
  3395. rfn.getLocalPath(physicalName);
  3396. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  3397. rdn.setLocalPath(physicalDir.str());
  3398. if (remoteNodes.length())
  3399. {
  3400. ForEachItemIn(idx, remoteNodes)
  3401. {
  3402. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  3403. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  3404. Owned<IFile> targetdir = createIFile(rdn);
  3405. Owned<IFile> target = createIFile(rfn);
  3406. targetdir->createDirectory();
  3407. copyFile(target, localFile);
  3408. }
  3409. }
  3410. }
  3411. void publish(const IRoxiePublishCallback *activity)
  3412. {
  3413. if (!dFile->isExternal())
  3414. {
  3415. Owned<IFileDescriptor> desc = createFileDescriptor();
  3416. desc->setNumParts(1);
  3417. RemoteFilename rfn;
  3418. dFile->getPartFilename(rfn, 0, 0);
  3419. StringBuffer physicalName, physicalDir, physicalBase;
  3420. rfn.getLocalPath(physicalName);
  3421. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  3422. desc->setDefaultDir(physicalDir.str());
  3423. desc->setPartMask(physicalBase.str());
  3424. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  3425. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  3426. offset_t fileSize = localFile->size();
  3427. fileProps.setPropInt64("@size", fileSize);
  3428. partProps.setPropInt64("@size", fileSize);
  3429. CDateTime createTime, modifiedTime, accessedTime;
  3430. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  3431. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  3432. unsigned hour, min, sec, nanosec;
  3433. modifiedTime.getTime(hour, min, sec, nanosec);
  3434. modifiedTime.setTime(hour, min, sec, 0);
  3435. StringBuffer timestr;
  3436. modifiedTime.getString(timestr);
  3437. if(timestr.length())
  3438. partProps.setProp("@modified", timestr.str());
  3439. ClusterPartDiskMapSpec partmap;
  3440. if (localCluster)
  3441. {
  3442. desc->addCluster(localCluster, partmap);
  3443. desc->setClusterGroupName(0, localClusterName.get());
  3444. }
  3445. ForEachItemIn(idx, remoteNodes)
  3446. desc->addCluster(&remoteNodes.item(idx), partmap);
  3447. if (activity)
  3448. activity->setFileProperties(desc);
  3449. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  3450. publishFile->setAccessedTime(modifiedTime);
  3451. IUserDescriptor * userdesc = NULL;
  3452. if (activity)
  3453. userdesc = activity->queryUserDescriptor();
  3454. else
  3455. {
  3456. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  3457. if (daliHelper)
  3458. userdesc = daliHelper->queryUserDescriptor();//predeployed query mode
  3459. }
  3460. publishFile->attach(dFile->queryLogicalName(), userdesc);
  3461. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  3462. }
  3463. }
  3464. void addCluster(char const * cluster)
  3465. {
  3466. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  3467. if (!group)
  3468. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  3469. cluster, dFile->queryLogicalName());
  3470. #ifdef _CONTAINERIZED // NB: really is-off-nodestorage
  3471. localCluster.setown(group.getClear());
  3472. localClusterName.set(cluster);
  3473. #else
  3474. rank_t r = group->rank();
  3475. if (RANK_NULL != r)
  3476. {
  3477. if (localCluster)
  3478. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  3479. cluster, dFile->queryLogicalName());
  3480. SocketEndpointArray eps;
  3481. SocketEndpoint me(0, myNode.getIpAddress());
  3482. eps.append(me);
  3483. localCluster.setown(createIGroup(eps));
  3484. StringBuffer clusterName(cluster);
  3485. if (group->ordinality()>1)
  3486. clusterName.appendf("[%u]", r+1);
  3487. localClusterName.set(clusterName);
  3488. }
  3489. else
  3490. {
  3491. ForEachItemIn(idx, remoteNodes)
  3492. {
  3493. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  3494. if (group->isMember(other))
  3495. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  3496. cluster, dFile->queryLogicalName());
  3497. }
  3498. remoteNodes.append(*group.getClear());
  3499. }
  3500. #endif
  3501. allClusters.append(cluster);
  3502. }
  3503. };
  3504. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  3505. {
  3506. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  3507. }
  3508. //================================================================================================================
  3509. #ifdef _USE_CPPUNIT
  3510. #include "unittests.hpp"
  3511. class CcdFileTest : public CppUnit::TestFixture
  3512. {
  3513. CPPUNIT_TEST_SUITE(CcdFileTest);
  3514. CPPUNIT_TEST(testCopy);
  3515. CPPUNIT_TEST_SUITE_END();
  3516. protected:
  3517. class DummyPartDescriptor : public CInterfaceOf<IPartDescriptor>
  3518. {
  3519. virtual unsigned queryPartIndex() { UNIMPLEMENTED; }
  3520. virtual unsigned numCopies() { UNIMPLEMENTED; }
  3521. virtual INode *getNode(unsigned copy=0) { UNIMPLEMENTED; }
  3522. virtual INode *queryNode(unsigned copy=0) { UNIMPLEMENTED; }
  3523. virtual IPropertyTree &queryProperties() { UNIMPLEMENTED; }
  3524. virtual IPropertyTree *getProperties() { UNIMPLEMENTED; }
  3525. virtual RemoteFilename &getFilename(unsigned copy, RemoteFilename &rfn) { UNIMPLEMENTED; }
  3526. virtual StringBuffer &getTail(StringBuffer &name) { UNIMPLEMENTED; }
  3527. virtual StringBuffer &getDirectory(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  3528. virtual StringBuffer &getPath(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  3529. virtual void serialize(MemoryBuffer &tgt) { UNIMPLEMENTED; }
  3530. virtual bool isMulti() { UNIMPLEMENTED; }
  3531. virtual RemoteMultiFilename &getMultiFilename(unsigned copy, RemoteMultiFilename &rfn) { UNIMPLEMENTED; }
  3532. virtual bool getCrc(unsigned &crc) { UNIMPLEMENTED; }
  3533. virtual IFileDescriptor &queryOwner() { UNIMPLEMENTED; }
  3534. virtual const char *queryOverrideName() { UNIMPLEMENTED; }
  3535. virtual unsigned copyClusterNum(unsigned copy,unsigned *replicate=NULL) { UNIMPLEMENTED; }
  3536. virtual IReplicatedFile *getReplicatedFile() { UNIMPLEMENTED; }
  3537. };
  3538. void testCopy()
  3539. {
  3540. selfTestMode = true;
  3541. remove("test.local");
  3542. remove("test.remote");
  3543. remove("test.buddy");
  3544. StringArray localEnough;
  3545. StringArray remotes;
  3546. DummyPartDescriptor pdesc;
  3547. CDateTime dummy;
  3548. remotes.append("test.remote");
  3549. int f = open("test.remote", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  3550. CPPUNIT_ASSERT(f >= 0);
  3551. int val = 1;
  3552. int wrote = write(f, &val, sizeof(int));
  3553. CPPUNIT_ASSERT(wrote==sizeof(int));
  3554. close(f);
  3555. CRoxieFileCache &cache = static_cast<CRoxieFileCache &>(queryFileCache());
  3556. Owned<ILazyFileIO> io = cache.openFile("test.local", 0, 0, "test.local", NULL, localEnough, remotes, sizeof(int), dummy);
  3557. CPPUNIT_ASSERT(io != NULL);
  3558. // Reading it should read 1
  3559. val = 0;
  3560. ssize_t bytesRead = io->read(0, sizeof(int), &val);
  3561. CPPUNIT_ASSERT(bytesRead==4);
  3562. CPPUNIT_ASSERT(val==1);
  3563. // Now create the buddy
  3564. f = open("test.buddy", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  3565. val = 2;
  3566. ssize_t numwritten = write(f, &val, sizeof(int));
  3567. CPPUNIT_ASSERT(numwritten == sizeof(int));
  3568. close(f);
  3569. // Reading it should still read 1...
  3570. val = 0;
  3571. io->read(0, sizeof(int), &val);
  3572. CPPUNIT_ASSERT(val==1);
  3573. // Now copy it - should copy the buddy
  3574. cache.doCopy(io, false);
  3575. // Reading it should read 2...
  3576. val = 0;
  3577. io->read(0, sizeof(int), &val);
  3578. CPPUNIT_ASSERT(val==2);
  3579. // And the data in the file should be 2
  3580. f = open("test.local", _O_RDONLY);
  3581. val = 0;
  3582. ssize_t numread = read(f, &val, sizeof(int));
  3583. CPPUNIT_ASSERT(numread == sizeof(int));
  3584. close(f);
  3585. CPPUNIT_ASSERT(val==2);
  3586. io.clear();
  3587. remove("test.local");
  3588. remove("test.remote");
  3589. remove("test.buddy");
  3590. }
  3591. };
  3592. CPPUNIT_TEST_SUITE_REGISTRATION( CcdFileTest );
  3593. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CcdFileTest, "CcdFileTest" );
  3594. #endif