filecopy.cpp 121 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include <algorithm>
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include <math.h>
  19. #include "jmutex.hpp"
  20. #include "jfile.hpp"
  21. #include "jsocket.hpp"
  22. #include "jdebug.hpp"
  23. #include "fterror.hpp"
  24. #include "dadfs.hpp"
  25. #include "rmtspawn.hpp"
  26. #include "filecopy.ipp"
  27. #include "jptree.hpp"
  28. #include "daft.hpp"
  29. #include "daftcfg.hpp"
  30. #include "fterror.hpp"
  31. #include "daftformat.hpp"
  32. #include "daftmc.hpp"
  33. #include "dasds.hpp"
  34. #include "jlog.hpp"
  35. #include "dalienv.hpp"
  36. #include "ftbase.ipp"
  37. #ifdef _CONTAINERIZED
  38. //Temporary see HPCC-25822
  39. inline bool canAccessFilesDirectly(const RemoteFilename & file)
  40. {
  41. if (file.queryEndpoint().port!=0)
  42. return false;
  43. const IpAddress & ip = file.queryIP();
  44. if (ip.isLocal()||ip.isNull()) // the isNull check is probably an error but saves time
  45. return true; // I think usually already checked, but another can't harm
  46. return false;
  47. }
  48. inline void setCanAccessDirectly(RemoteFilename & file)
  49. {
  50. setCanAccessDirectly(file,canAccessFilesDirectly(file));
  51. }
  52. #endif
  53. #define DEFAULT_MAX_CONNECTIONS 800
  54. #define PARTITION_RECOVERY_LIMIT 1000
  55. #define EXPECTED_RESPONSE_TIME (60 * 1000)
  56. #define RESPONSE_TIME_TIMEOUT (60 * 60 * 1000)
  57. #define DEFAULT_MAX_XML_RECORD_SIZE 0x100000
  58. //#define CLEANUP_RECOVERY
  59. //Use hash defines for properties so I can't mis-spell them....
  60. #define ANcomplete "@complete"
  61. #define ANcompress "@compress"
  62. #define ANcrc "@crc"
  63. #define ANcrcCheck "@crcCheck"
  64. #define ANcrcDiffers "@crcDiffers"
  65. #define ANdone "@done"
  66. #define ANhasPartition "@hasPartition"
  67. #define ANhasProgress "@hasProgress"
  68. #define ANhasRecovery "@hasRecovery"
  69. #define ANmaxConnections "@maxConnections"
  70. #define ANnocommon "@noCommon"
  71. #define ANnoRecover "@noRecover"
  72. #define ANnosplit "@nosplit"
  73. #define ANnosplit2 "@noSplit"
  74. #define ANprefix "@prefix"
  75. #define ANpull "@pull"
  76. #define ANpush "@push"
  77. #define ANsafe "@safe"
  78. #define ANsizedate "@sizedate"
  79. #define ANsplit "@split"
  80. #define ANsplitPrefix "@splitPrefix"
  81. #define ANthrottle "@throttle"
  82. #define ANverify "@verify"
  83. #define ANtransferBufferSize "@transferBufferSize"
  84. #define ANencryptKey "@encryptKey"
  85. #define ANdecryptKey "@decryptKey"
  86. #define ANumask "@umask"
  87. #define PNpartition "partition"
  88. #define PNprogress "progress"
  89. //File attributes
  90. #define FArecordSize "@recordSize"
  91. #define FArecordCount "@recordCount"
  92. #define FAformat "@format"
  93. #define FAcrc "@fileCrc"
  94. #define FAsize "@size"
  95. #define FAcompressedSize "@compressedSize"
  96. const unsigned operatorUpdateFrequency = 5000; // time between updates in ms
  97. const unsigned abortCheckFrequency = 20000; // time between updates in ms
  98. const unsigned sdsUpdateFrequency = 20000; // time between updates in ms
  99. const unsigned maxSlaveUpdateFrequency = 1000; // time between updates in ms - small number of nodes.
  100. const unsigned minSlaveUpdateFrequency = 5000; // time between updates in ms - large number of nodes.
  101. bool TargetLocation::canPull()
  102. {
  103. return queryOS(filename.queryIP()) != MachineOsSolaris;
  104. }
  105. //----------------------------------------------------------------------------
  106. FilePartInfo::FilePartInfo(const RemoteFilename & _filename)
  107. {
  108. filename.set(_filename);
  109. init();
  110. }
  111. FilePartInfo::FilePartInfo()
  112. {
  113. init();
  114. }
  115. bool FilePartInfo::canPush()
  116. {
  117. return queryOS(filename.queryIP()) != MachineOsSolaris;
  118. }
  119. void FilePartInfo::extractExtra(IPartDescriptor &part)
  120. {
  121. unsigned _crc;
  122. hasCRC = part.getCrc(_crc);
  123. if (hasCRC)
  124. crc = _crc;
  125. properties.set(&part.queryProperties());
  126. if (part.queryProperties().hasProp("@modified"))
  127. modifiedTime.setString(part.queryProperties().queryProp("@modified"));
  128. }
  129. void FilePartInfo::extractExtra(IDistributedFilePart &part)
  130. {
  131. unsigned _crc;
  132. hasCRC = part.getCrc(_crc);
  133. if (hasCRC)
  134. crc = _crc;
  135. properties.set(&part.queryAttributes());
  136. if (part.queryAttributes().hasProp("@modified"))
  137. modifiedTime.setString(part.queryAttributes().queryProp("@modified"));
  138. }
  139. void FilePartInfo::init()
  140. {
  141. offset = 0;
  142. size = UNKNOWN_PART_SIZE;
  143. psize = UNKNOWN_PART_SIZE;
  144. headerSize = 0;
  145. hasCRC = false;
  146. xmlHeaderLength = 0;
  147. xmlFooterLength = 0;
  148. }
  149. //----------------------------------------------------------------------------
  150. void shuffle(CIArray & array)
  151. {
  152. //Use our own seeded random number generator, so that multiple dfu at the same time are less likely to clash.
  153. Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
  154. random->seed(123456789);
  155. unsigned i = array.ordinality();
  156. while (i > 1)
  157. {
  158. unsigned j = random->next() % i;
  159. i--;
  160. array.swap(i, j);
  161. }
  162. }
  163. //----------------------------------------------------------------------------
  164. FileTransferThread::FileTransferThread(FileSprayer & _sprayer, byte _action, const SocketEndpoint & _ep, bool _calcCRC, const char *_wuid)
  165. : Thread("pullThread"), sprayer(_sprayer), wuid(_wuid)
  166. {
  167. calcCRC = _calcCRC;
  168. action = _action;
  169. ep.set(_ep);
  170. // progressInfo = _progressInfo;
  171. sem = NULL;
  172. ok = false;
  173. job = unknownJob;
  174. allDone = false;
  175. started = false;
  176. }
  177. void FileTransferThread::addPartition(PartitionPoint & nextPartition, OutputProgress & nextProgress)
  178. {
  179. partition.append(OLINK(nextPartition));
  180. progress.append(OLINK(nextProgress));
  181. }
  182. unsigned __int64 FileTransferThread::getInputSize()
  183. {
  184. unsigned __int64 inputSize = 0;
  185. ForEachItemIn(idx, partition)
  186. inputSize += partition.item(idx).inputLength;
  187. return inputSize;
  188. }
  189. void FileTransferThread::go(Semaphore & _sem)
  190. {
  191. sem = &_sem;
  192. if (partition.empty())
  193. transferAndSignal(); // do nothing, but don't start a new thread
  194. else
  195. {
  196. #ifdef RUN_SLAVES_ON_THREADS
  197. start();
  198. #else
  199. transferAndSignal();
  200. #endif
  201. }
  202. }
  203. bool FileTransferThread::isAborting()
  204. {
  205. return sprayer.isAborting() || ::isAborting();
  206. }
  207. void FileTransferThread::logIfRunning(StringBuffer &list)
  208. {
  209. if (started && !allDone && !error)
  210. {
  211. StringBuffer url;
  212. ep.getUrlStr(url);
  213. DBGLOG("Still waiting for slave %s", url.str());
  214. if (list.length())
  215. list.append(',');
  216. list.append(url);
  217. }
  218. }
  219. bool FileTransferThread::catchReadBuffer(ISocket * socket, MemoryBuffer & msg, unsigned timeout)
  220. {
  221. unsigned nowTime = msTick();
  222. unsigned abortCheckTimeout = 120*1000;
  223. for (;;)
  224. {
  225. try
  226. {
  227. readBuffer(socket, msg, abortCheckTimeout);
  228. return true;
  229. }
  230. catch (IException * e)
  231. {
  232. switch (e->errorCode())
  233. {
  234. case JSOCKERR_graceful_close:
  235. break;
  236. case JSOCKERR_timeout_expired:
  237. if (isAborting())
  238. break;
  239. if (msTick() - nowTime < timeout)
  240. {
  241. e->Release();
  242. continue;
  243. }
  244. break;
  245. default:
  246. EXCLOG(e,"FileTransferThread::catchReadBuffer");
  247. break;
  248. }
  249. e->Release();
  250. return false;
  251. }
  252. }
  253. }
  254. bool FileTransferThread::performTransfer()
  255. {
  256. bool ok = false;
  257. StringBuffer url;
  258. ep.getUrlStr(url);
  259. LOG(MCdebugProgress, job, "Transferring part %s [%p]", url.str(), this);
  260. started = true;
  261. allDone = true;
  262. if (sprayer.isSafeMode || action == FTactionpush)
  263. {
  264. ForEachItemIn(i, progress)
  265. {
  266. if (progress.item(i).status != OutputProgress::StatusCopied)
  267. allDone = false;
  268. }
  269. }
  270. else
  271. {
  272. unsigned whichOutput = (unsigned)-1;
  273. ForEachItemIn(i, progress)
  274. {
  275. PartitionPoint & curPartition = partition.item(i);
  276. OutputProgress & curProgress = progress.item(i);
  277. //pull should rename as well as copy the files.
  278. if (curPartition.whichOutput != whichOutput)
  279. {
  280. if (curProgress.status != OutputProgress::StatusRenamed)
  281. allDone = false;
  282. whichOutput = curPartition.whichOutput;
  283. }
  284. }
  285. }
  286. if (allDone)
  287. {
  288. LOG(MCdebugInfo, job, "Creation of part %s already completed", url.str());
  289. return true;
  290. }
  291. if (partition.empty())
  292. {
  293. LOG(MCdebugInfo, job, "No elements to transfer for this slave");
  294. return true;
  295. }
  296. LOG(MCdebugProgressDetail, job, "Start generate part %s [%p]", url.str(), this);
  297. StringBuffer tmp;
  298. Owned<ISocket> socket = spawnRemoteChild(SPAWNdfu, sprayer.querySlaveExecutable(ep, tmp), ep, DAFT_VERSION, queryFtSlaveLogDir(), this, wuid);
  299. if (socket)
  300. {
  301. MemoryBuffer msg;
  302. msg.setEndian(__BIG_ENDIAN);
  303. //MORE: Allow this to be configured by an option.
  304. unsigned slaveUpdateFrequency = minSlaveUpdateFrequency;
  305. if (sprayer.numParallelSlaves() < 5)
  306. slaveUpdateFrequency = maxSlaveUpdateFrequency;
  307. //Send message and wait for response...
  308. msg.append(action);
  309. // send 0 for password info that was in <= 7.6 versions
  310. unsigned zero = 0;
  311. msg.append(zero);
  312. ep.serialize(msg);
  313. sprayer.srcFormat.serialize(msg);
  314. sprayer.tgtFormat.serialize(msg);
  315. msg.append(sprayer.calcInputCRC());
  316. msg.append(calcCRC);
  317. serialize(partition, msg);
  318. msg.append(sprayer.numParallelSlaves());
  319. msg.append(slaveUpdateFrequency);
  320. msg.append(sprayer.replicate); // NB: controls whether FtSlave copies source timestamp
  321. msg.append(sprayer.mirroring);
  322. msg.append(sprayer.isSafeMode);
  323. msg.append(progress.ordinality());
  324. ForEachItemIn(i, progress)
  325. progress.item(i).serializeCore(msg);
  326. msg.append(sprayer.throttleNicSpeed);
  327. msg.append(sprayer.compressedInput);
  328. msg.append(sprayer.compressOutput);
  329. msg.append(sprayer.copyCompressed);
  330. msg.append(sprayer.transferBufferSize);
  331. msg.append(sprayer.encryptKey);
  332. msg.append(sprayer.decryptKey);
  333. sprayer.srcFormat.serializeExtra(msg, 1);
  334. sprayer.tgtFormat.serializeExtra(msg, 1);
  335. ForEachItemIn(i2, progress)
  336. progress.item(i2).serializeExtra(msg, 1);
  337. //NB: Any extra data must be appended at the end...
  338. msg.append(sprayer.fileUmask);
  339. if (!catchWriteBuffer(socket, msg))
  340. throwError1(RFSERR_TimeoutWaitConnect, url.str());
  341. bool done;
  342. for (;;)
  343. {
  344. msg.clear();
  345. if (!catchReadBuffer(socket, msg, FTTIME_PROGRESS))
  346. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  347. msg.setEndian(__BIG_ENDIAN);
  348. msg.read(done);
  349. if (done)
  350. break;
  351. OutputProgress newProgress;
  352. newProgress.deserializeCore(msg);
  353. newProgress.deserializeExtra(msg, 1);
  354. sprayer.updateProgress(newProgress);
  355. LOG(MCdebugProgress(10000), job, "Update %s: %d %" I64F "d->%" I64F "d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);
  356. if (isAborting())
  357. {
  358. if (!sendRemoteAbort(socket))
  359. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  360. }
  361. }
  362. msg.read(ok);
  363. setErrorOwn(deserializeException(msg));
  364. LOG(MCdebugProgressDetail, job, "Finished generating part %s [%p] ok(%d) error(%d)", url.str(), this, (int)ok, (int)(error!=NULL));
  365. msg.clear().append(true);
  366. catchWriteBuffer(socket, msg);
  367. if (sprayer.options->getPropInt("@fail", 0))
  368. throwError(DFTERR_CopyFailed);
  369. }
  370. else
  371. {
  372. throwError1(DFTERR_FailedStartSlave, url.str());
  373. }
  374. LOG(MCdebugProgressDetail, job, "Stopped generate part %s [%p]", url.str(), this);
  375. allDone = true;
  376. return ok;
  377. }
  378. void FileTransferThread::setErrorOwn(IException * e)
  379. {
  380. error.setown(e);
  381. if (error)
  382. sprayer.setError(ep, error);
  383. }
  384. bool FileTransferThread::transferAndSignal()
  385. {
  386. ok = false;
  387. if (!isAborting())
  388. {
  389. try
  390. {
  391. ok = performTransfer();
  392. }
  393. catch (IException * e)
  394. {
  395. FLLOG(MCexception(e), job, e, "Transferring files");
  396. setErrorOwn(e);
  397. }
  398. }
  399. sem->signal();
  400. return ok;
  401. }
  402. int FileTransferThread::run()
  403. {
  404. transferAndSignal();
  405. return 0;
  406. }
  407. //----------------------------------------------------------------------------
  408. FileSizeThread::FileSizeThread(FilePartInfoArray & _queue, CriticalSection & _cs, bool _isCompressed, bool _errorIfMissing) : Thread("fileSizeThread"), queue(_queue), cs(_cs)
  409. {
  410. isCompressed = _isCompressed;
  411. errorIfMissing = _errorIfMissing;
  412. }
  413. bool FileSizeThread::wait(unsigned timems)
  414. {
  415. while (!sem.wait(timems))
  416. { // report every time
  417. StringBuffer rfn;
  418. {
  419. CriticalBlock lock(cs);
  420. if (cur.get())
  421. {
  422. if (copy)
  423. {
  424. if (!cur->mirrorFilename.isNull())
  425. cur->mirrorFilename.getRemotePath(rfn);
  426. }
  427. else
  428. {
  429. cur->filename.getRemotePath(rfn);
  430. }
  431. }
  432. }
  433. if (!rfn.isEmpty())
  434. {
  435. OWARNLOG("Waiting for file: %s",rfn.str());
  436. return false;
  437. }
  438. }
  439. sem.signal(); // if called again
  440. return true;
  441. }
  442. int FileSizeThread::run()
  443. {
  444. try
  445. {
  446. RemoteFilename remoteFilename;
  447. for (;;)
  448. {
  449. {
  450. CriticalBlock lock(cs);
  451. cur.clear();
  452. if (queue.ordinality())
  453. cur.setown(&queue.popGet());
  454. }
  455. if (!cur.get())
  456. break;
  457. copy=0;
  458. for (copy = 0;copy<2;copy++)
  459. {
  460. if (copy)
  461. {
  462. if (cur->mirrorFilename.isNull())
  463. continue; // not break
  464. remoteFilename.set(cur->mirrorFilename);
  465. }
  466. else
  467. remoteFilename.set(cur->filename);
  468. OwnedIFile thisFile = createIFile(remoteFilename);
  469. offset_t thisSize = thisFile->size();
  470. if (thisSize == -1)
  471. {
  472. if (errorIfMissing)
  473. {
  474. StringBuffer s;
  475. throwError1(DFTERR_CouldNotOpenFile, remoteFilename.getRemotePath(s).str());
  476. }
  477. continue;
  478. }
  479. cur->psize = thisSize;
  480. if (isCompressed)
  481. {
  482. Owned<IFileIO> io = createCompressedFileReader(thisFile); //check succeeded?
  483. if (!io)
  484. {
  485. StringBuffer s;
  486. throwError1(DFTERR_CouldNotOpenCompressedFile, remoteFilename.getRemotePath(s).str());
  487. }
  488. thisSize = io->size();
  489. }
  490. cur->size = thisSize;
  491. break;
  492. }
  493. if (copy==1)
  494. { // need to set primary
  495. CriticalBlock lock(cs);
  496. cur->mirrorFilename.set(cur->filename);
  497. cur->filename.set(remoteFilename);
  498. }
  499. }
  500. }
  501. catch (IException * e)
  502. {
  503. error.setown(e);
  504. }
  505. sem.signal();
  506. return 0;
  507. }
  508. //----------------------------------------------------------------------------
  509. FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * _recoveryConnection, const char *_wuid)
  510. : wuid(_wuid), fileSprayerAbortChecker(*this)
  511. {
  512. totalSize = 0;
  513. replicate = false;
  514. copySource = false;
  515. unknownSourceFormat = true;
  516. unknownTargetFormat = true;
  517. progressTree.set(_progress);
  518. recoveryConnection = _recoveryConnection;
  519. options.set(_options);
  520. if (!options)
  521. options.setown(createPTree());
  522. if (!progressTree)
  523. progressTree.setown(createPTree("progress", ipt_caseInsensitive));
  524. //split prefix messes up recovery because the target filenames aren't saved in the recovery info.
  525. allowRecovery = !options->getPropBool(ANnoRecover) && !querySplitPrefix();
  526. isRecovering = allowRecovery && progressTree->hasProp(ANhasProgress);
  527. isSafeMode = options->getPropBool(ANsafe);
  528. job = unknownJob;
  529. progressReport = NULL;
  530. abortChecker = NULL;
  531. sizeToBeRead = 0;
  532. calcedPullPush = false;
  533. mirroring = false;
  534. lastAbortCheckTick = lastSDSTick = lastOperatorTick = msTick();
  535. calcedInputCRC = false;
  536. aborting = false;
  537. totalLengthRead = 0;
  538. totalNumReads = 0;
  539. totalNumWrites = 0;
  540. throttleNicSpeed = 0;
  541. compressedInput = false;
  542. compressOutput = options->getPropBool(ANcompress);
  543. copyCompressed = false;
  544. transferBufferSize = options->getPropInt(ANtransferBufferSize);
  545. if (transferBufferSize)
  546. LOG(MCdebugProgressDetail, job, "Using transfer buffer size %d", transferBufferSize);
  547. else // zero is default
  548. transferBufferSize = DEFAULT_STD_BUFFER_SIZE;
  549. progressDone = false;
  550. encryptKey.set(options->queryProp(ANencryptKey));
  551. decryptKey.set(options->queryProp(ANdecryptKey));
  552. fileUmask = -1;
  553. const char *umaskStr = options->queryProp(ANumask);
  554. if (umaskStr)
  555. {
  556. char *eptr = NULL;
  557. errno = 0;
  558. fileUmask = (int)strtol(umaskStr, &eptr, 8);
  559. if (errno || *eptr != '\0')
  560. {
  561. LOG(MCdebugInfo, job, "Invalid umask value <%s> ignored", umaskStr);
  562. fileUmask = -1;
  563. }
  564. else
  565. {
  566. // never strip off owner
  567. fileUmask &= 077;
  568. }
  569. }
  570. }
  571. class AsyncAfterTransfer : public CAsyncFor
  572. {
  573. public:
  574. AsyncAfterTransfer(FileSprayer & _sprayer) : sprayer(_sprayer) {}
  575. virtual void Do(unsigned idxTarget)
  576. {
  577. TargetLocation & cur = sprayer.targets.item(idxTarget);
  578. if (!sprayer.filter || sprayer.filter->includePart(idxTarget))
  579. {
  580. RemoteFilename & targetFilename = cur.filename;
  581. if (sprayer.isSafeMode)
  582. {
  583. OwnedIFile file = createIFile(targetFilename);
  584. file->remove();
  585. }
  586. renameDfuTempToFinal(targetFilename);
  587. if (sprayer.replicate && !sprayer.mirroring)
  588. {
  589. OwnedIFile file = createIFile(targetFilename);
  590. file->setTime(NULL, &cur.modifiedTime, NULL);
  591. }
  592. else if (cur.modifiedTime.isNull())
  593. {
  594. OwnedIFile file = createIFile(targetFilename);
  595. file->getTime(NULL, &cur.modifiedTime, NULL);
  596. }
  597. }
  598. }
  599. protected:
  600. FileSprayer & sprayer;
  601. };
  602. void FileSprayer::addEmptyFilesToPartition(unsigned from, unsigned to)
  603. {
  604. for (unsigned i = from; i < to ; i++)
  605. {
  606. LOG(MCdebugProgressDetail, job, "Insert a dummy entry for target %d", i);
  607. PartitionPoint & next = createLiteral(0, NULL, 0);
  608. next.whichOutput = i;
  609. partition.append(next);
  610. }
  611. }
  612. void FileSprayer::addEmptyFilesToPartition()
  613. {
  614. unsigned lastOutput = (unsigned)-1;;
  615. ForEachItemIn(idx, partition)
  616. {
  617. PartitionPoint & cur = partition.item(idx);
  618. if (cur.whichOutput != lastOutput)
  619. {
  620. if (cur.whichOutput != lastOutput+1)
  621. addEmptyFilesToPartition(lastOutput+1, cur.whichOutput);
  622. lastOutput = cur.whichOutput;
  623. }
  624. }
  625. if (lastOutput != targets.ordinality()-1)
  626. addEmptyFilesToPartition(lastOutput+1, targets.ordinality());
  627. }
  628. void FileSprayer::afterTransfer()
  629. {
  630. if (calcInputCRC())
  631. {
  632. LOG(MCdebugProgressDetail, job, "Checking input CRCs");
  633. CRC32Merger partCRC;
  634. unsigned startCurSource = 0;
  635. ForEachItemIn(idx, partition)
  636. {
  637. PartitionPoint & curPartition = partition.item(idx);
  638. OutputProgress & curProgress = progress.item(idx);
  639. if (!curProgress.hasInputCRC)
  640. {
  641. LOG(MCdebugProgressDetail, job, "Could not calculate input CRCs - cannot check");
  642. break;
  643. }
  644. partCRC.addChildCRC(curProgress.inputLength, curProgress.inputCRC, false);
  645. StringBuffer errorText;
  646. bool failed = false;
  647. UnsignedArray failedOutputs;
  648. if (idx+1 == partition.ordinality() || partition.item(idx+1).whichInput != curPartition.whichInput)
  649. {
  650. FilePartInfo & curSource = sources.item(curPartition.whichInput);
  651. if (curSource.crc != partCRC.get())
  652. {
  653. StringBuffer name;
  654. if (!failed)
  655. errorText.append("Input CRCs do not match for part ");
  656. else
  657. errorText.append(", ");
  658. curSource.filename.getPath(errorText);
  659. failed = true;
  660. //Need to copy anything that involves this part of the file again.
  661. //pulling it will be the whole file, if pushing we can cope with single parts
  662. //in the middle of the partition.
  663. for (unsigned i = startCurSource; i <= idx; i++)
  664. {
  665. OutputProgress & cur = progress.item(i);
  666. cur.reset();
  667. if (cur.tree)
  668. cur.save(cur.tree);
  669. unsigned out = partition.item(i).whichOutput;
  670. if (failedOutputs.find(out) == NotFound)
  671. failedOutputs.append(out);
  672. }
  673. }
  674. partCRC.clear();
  675. startCurSource = idx+1;
  676. //If copying m to n, and not splitting, there may be some dummy text entries (containing nothing) on the end.
  677. //if so skip them, otherwise you'll get crc errors on part 1
  678. if (partition.isItem(startCurSource) && (partition.item(startCurSource).whichInput == 0))
  679. idx = partition.ordinality()-1;
  680. }
  681. if (failed)
  682. {
  683. if (usePullOperation())
  684. {
  685. //Need to clear progress for any partitions that copy to the same target file
  686. //However, need to do it after the crc checking, otherwise it will generate more errors...
  687. ForEachItemIn(idx, partition)
  688. {
  689. if (failedOutputs.find(partition.item(idx).whichOutput) != NotFound)
  690. {
  691. OutputProgress & cur = progress.item(idx);
  692. cur.reset();
  693. if (cur.tree)
  694. cur.save(cur.tree);
  695. }
  696. }
  697. }
  698. if (recoveryConnection)
  699. recoveryConnection->commit();
  700. throw MakeStringException(DFTERR_InputCrcMismatch, "%s", errorText.str());
  701. }
  702. }
  703. }
  704. //For safe mode and push mode the temporary files need to be renamed once everything has completed.
  705. if (isSafeMode || usePushOperation())
  706. {
  707. unsigned numTargets = targets.ordinality();
  708. AsyncAfterTransfer async(*this);
  709. async.For(numTargets, (unsigned)sqrt((float)numTargets));
  710. }
  711. else
  712. {
  713. ForEachItemIn(idx, progress)
  714. {
  715. OutputProgress & curProgress = progress.item(idx);
  716. if (!curProgress.resultTime.isNull())
  717. targets.item(partition.item(idx).whichOutput).modifiedTime.set(curProgress.resultTime);
  718. }
  719. }
  720. }
  721. bool FileSprayer::allowSplit()
  722. {
  723. return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
  724. }
  725. void FileSprayer::assignPartitionFilenames()
  726. {
  727. ForEachItemIn(idx, partition)
  728. {
  729. PartitionPoint & cur = partition.item(idx);
  730. if (cur.whichInput != (unsigned)-1)
  731. {
  732. cur.inputName.set(sources.item(cur.whichInput).filename);
  733. setCanAccessDirectly(cur.inputName);
  734. }
  735. cur.outputName.set(targets.item(cur.whichOutput).filename);
  736. setCanAccessDirectly(cur.outputName);
  737. // NB: partition (cur) is serialized to ftslave and it's this modifiedTime is used if present
  738. if (replicate)
  739. cur.modifiedTime.set(targets.item(cur.whichOutput).modifiedTime);
  740. }
  741. }
  742. class CheckExists : public CAsyncFor
  743. {
  744. public:
  745. CheckExists(TargetLocationArray & _targets, IDFPartFilter * _filter) : targets(_targets) { filter = _filter; }
  746. virtual void Do(unsigned idx)
  747. {
  748. if (!filter || filter->includePart(idx))
  749. {
  750. const RemoteFilename & cur = targets.item(idx).filename;
  751. OwnedIFile file = createIFile(cur);
  752. if (file->exists())
  753. {
  754. StringBuffer s;
  755. throwError1(DFTERR_PhysicalExistsNoOverwrite, cur.getRemotePath(s).str());
  756. }
  757. }
  758. }
  759. public:
  760. TargetLocationArray & targets;
  761. IDFPartFilter * filter;
  762. };
  763. void FileSprayer::beforeTransfer()
  764. {
  765. if (!isRecovering && !options->getPropBool("@overwrite", true))
  766. {
  767. CheckExists checker(targets, filter);
  768. checker.For(targets.ordinality(), 25, true, true);
  769. }
  770. if (!isRecovering && writeFromMultipleSlaves())
  771. {
  772. try {
  773. //Should this be on an option. Shouldn't be too inefficient since push is seldom used.
  774. ForEachItemIn(idx2, targets)
  775. {
  776. if (!filter || filter->includePart(idx2))
  777. {
  778. //MORE: This does not cope with creating directories on a solaris machine.
  779. StringBuffer remoteFilename, remoteDirectory;
  780. targets.item(idx2).filename.getRemotePath(remoteFilename);
  781. splitUNCFilename(remoteFilename.str(), &remoteDirectory, &remoteDirectory, NULL, NULL);
  782. Owned<IFile> dir = createIFile(remoteDirectory.str());
  783. if (!dir->exists())
  784. {
  785. dir->createDirectory();
  786. if (fileUmask != -1)
  787. dir->setFilePermissions(~fileUmask&0777);
  788. }
  789. }
  790. }
  791. }
  792. catch (IException *e) {
  793. FLLOG(MCexception(e), job, e, "Creating Directory");
  794. e->Release();
  795. LOG(MCdebugInfo, job, "Ignoring create directory error");
  796. }
  797. // If pushing files, and not recovering, then need to delete the target files, because the slaves might be writing in any order
  798. // for pull, the slave deletes it when creating the file.
  799. unsigned curPartition = 0;
  800. ForEachItemIn(idxTarget, targets)
  801. {
  802. if (!filter || filter->includePart(idxTarget))
  803. {
  804. if (!isSafeMode)
  805. {
  806. OwnedIFile file = createIFile(targets.item(idxTarget).filename);
  807. file->remove();
  808. }
  809. //unsigned firstPartition = curPartition;
  810. while (partition.isItem(curPartition+1) && partition.item(curPartition+1).whichOutput == idxTarget)
  811. curPartition++;
  812. //MORE: If 1:N mapping then don't extend to the maximum length - it is a waste of time, and messes up
  813. //And should generate the file header on the push machine - would always be more efficient.
  814. //Possibly conditional on whether it is worth pre-extending on the target os.
  815. //if (curPartition == firstPartition)
  816. // continue;
  817. PartitionPoint & lastPartition = partition.item(curPartition);
  818. offset_t lastOutputOffset = lastPartition.outputOffset + lastPartition.outputLength;
  819. RemoteFilename remote;
  820. getDfuTempName(remote, targets.item(idxTarget).filename);
  821. OwnedIFile file = createIFile(remote);
  822. OwnedIFileIO io = file->open(IFOcreate);
  823. if (!io)
  824. {
  825. StringBuffer name;
  826. remote.getPath(name);
  827. throwError1(DFTERR_CouldNotCreateOutput, name.str());
  828. }
  829. if (fileUmask != -1)
  830. file->setFilePermissions(~fileUmask&0666);
  831. //Create the headers on the utf files.
  832. unsigned headerSize = getHeaderSize(tgtFormat.type);
  833. if (headerSize)
  834. io->write(0, headerSize, getHeaderText(tgtFormat.type));
  835. if ((lastOutputOffset != 0)&&!compressOutput)
  836. {
  837. char null = 0;
  838. io->write(lastOutputOffset-sizeof(null), sizeof(null), &null);
  839. }
  840. }
  841. }
  842. }
  843. throttleNicSpeed = options->getPropInt(ANthrottle, 0);
  844. #ifndef _CONTAINERIZED
  845. //MORE: This is very old windows support code. We could add support for per-plane throttling if it is required.
  846. if (throttleNicSpeed == 0 && !usePullOperation() && targets.ordinality() == 1 && sources.ordinality() > 1)
  847. {
  848. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  849. Owned<IConstEnvironment> env = factory->openEnvironment();
  850. StringBuffer ipText;
  851. targets.item(0).filename.queryIP().getIpText(ipText);
  852. Owned<IConstMachineInfo> machine = env->getMachineByAddress(ipText.str());
  853. if (machine)
  854. {
  855. if (machine->getOS() == MachineOsW2K)
  856. {
  857. throttleNicSpeed = machine->getNicSpeedMbitSec();
  858. LOG(MCdebugInfo, job, "Throttle target speed to %dMbit/sec", throttleNicSpeed);
  859. }
  860. }
  861. }
  862. #endif
  863. }
  864. bool FileSprayer::calcCRC()
  865. {
  866. return options->getPropBool(ANcrc, true) && !compressOutput && !copyCompressed;
  867. }
  868. bool FileSprayer::calcInputCRC()
  869. {
  870. if (!calcedInputCRC)
  871. {
  872. calcedInputCRC = true;
  873. cachedInputCRC = false;
  874. if (options->getPropBool(ANcrcCheck, true) && !compressedInput)
  875. {
  876. ForEachItemIn(idx, sources)
  877. {
  878. if (!sources.item(idx).hasCRC)
  879. return cachedInputCRC;
  880. }
  881. cachedInputCRC = true;
  882. //If keeping headers then we lose bits of the input files, so they can't be crc checked.
  883. bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
  884. if (options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag && sources.ordinality() > 1)
  885. cachedInputCRC = false;
  886. if (querySplitPrefix())
  887. cachedInputCRC = false;
  888. }
  889. }
  890. return cachedInputCRC;
  891. }
  892. void FileSprayer::calculateOne2OnePartition()
  893. {
  894. LOG(MCdebugProgressDetail, job, "Setting up one2One partition");
  895. if (sources.ordinality() != targets.ordinality())
  896. throwError(DFTERR_ReplicateNumPartsDiffer);
  897. if (!srcFormat.equals(tgtFormat))
  898. throwError(DFTERR_ReplicateSameFormat);
  899. if (compressedInput && compressOutput && (strcmp(encryptKey.str(),decryptKey.str())==0))
  900. setCopyCompressedRaw();
  901. ForEachItemIn(idx, sources)
  902. {
  903. FilePartInfo & cur = sources.item(idx);
  904. RemoteFilename curFilename;
  905. curFilename.set(cur.filename);
  906. setCanAccessDirectly(curFilename);
  907. partition.append(*new PartitionPoint(idx, idx, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
  908. targets.item(idx).modifiedTime.set(cur.modifiedTime);
  909. }
  910. if (srcFormat.isCsv())
  911. examineCsvStructure();
  912. }
  913. class AsyncExtractBlobInfo : public CAsyncFor
  914. {
  915. friend class FileSprayer;
  916. public:
  917. AsyncExtractBlobInfo(const char * _splitPrefix, FileSprayer & _sprayer) : sprayer(_sprayer)
  918. {
  919. extracted = new ExtractedBlobArray[sprayer.sources.ordinality()];
  920. splitPrefix = _splitPrefix;
  921. }
  922. ~AsyncExtractBlobInfo()
  923. {
  924. delete [] extracted;
  925. }
  926. virtual void Do(unsigned i)
  927. {
  928. if (!sprayer.sources.item(i).filename.isLocal()) {
  929. try {
  930. remoteExtractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
  931. return;
  932. }
  933. catch (IException *e) {
  934. StringBuffer path;
  935. StringBuffer err;
  936. OWARNLOG("dafilesrv ExtractBlobElements(%s) failed with: %s",
  937. sprayer.sources.item(i).filename.getPath(path).str(),
  938. e->errorMessage(err).str());
  939. PROGLOG("Trying direct access (this may be slow)");
  940. e->Release();
  941. }
  942. }
  943. // try local
  944. extractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
  945. }
  946. protected:
  947. FileSprayer & sprayer;
  948. const char * splitPrefix;
  949. ExtractedBlobArray * extracted;
  950. };
  951. void FileSprayer::calculateSplitPrefixPartition(const char * splitPrefix)
  952. {
  953. if (targets.ordinality() != 1)
  954. throwError(DFTERR_SplitPrefixSingleTarget);
  955. if (!srcFormat.equals(tgtFormat))
  956. throwError(DFTERR_SplitPrefixSameFormat);
  957. LOG(MCdebugProgressDetail, job, "Setting up split prefix partition");
  958. Owned<TargetLocation> target = &targets.popGet(); // remove target, add lots of new ones
  959. RemoteFilename blobTarget;
  960. StringBuffer remoteTargetPath, remoteFilename;
  961. target->filename.getRemotePath(remoteTargetPath);
  962. char sepChar = target->filename.getPathSeparator();
  963. //Remove the tail name from the filename
  964. const char * temp = remoteTargetPath.str();
  965. remoteTargetPath.setLength(strrchr(temp, sepChar)-temp);
  966. AsyncExtractBlobInfo extractor(splitPrefix, *this);
  967. unsigned numSources = sources.ordinality();
  968. extractor.For(numSources, numParallelConnections(numSources), true, false);
  969. ForEachItemIn(idx, sources)
  970. {
  971. FilePartInfo & cur = sources.item(idx);
  972. ExtractedBlobArray & extracted = extractor.extracted[idx];
  973. ForEachItemIn(i, extracted)
  974. {
  975. ExtractedBlobInfo & curBlob = extracted.item(i);
  976. remoteFilename.clear().append(remoteTargetPath);
  977. addPathSepChar(remoteFilename, sepChar);
  978. remoteFilename.append(curBlob.filename);
  979. blobTarget.clear();
  980. blobTarget.setRemotePath(remoteFilename);
  981. targets.append(* new TargetLocation(blobTarget));
  982. partition.append(*new PartitionPoint(idx, targets.ordinality()-1, curBlob.offset, curBlob.length, curBlob.length));
  983. }
  984. }
  985. }
  986. void FileSprayer::calculateMany2OnePartition()
  987. {
  988. LOG(MCdebugProgressDetail, job, "Setting up many2one partition");
  989. const char *partSeparator = srcFormat.getPartSeparatorString();
  990. offset_t partSeparatorLength = ( partSeparator == nullptr ? 0 : strlen(partSeparator));
  991. offset_t lastContentLength = 0;
  992. ForEachItemIn(idx, sources)
  993. {
  994. FilePartInfo & cur = sources.item(idx);
  995. RemoteFilename curFilename;
  996. curFilename.set(cur.filename);
  997. setCanAccessDirectly(curFilename);
  998. if (cur.size)
  999. {
  1000. if (partSeparator)
  1001. {
  1002. if (lastContentLength)
  1003. {
  1004. PartitionPoint &part = createLiteral(1, partSeparator, (unsigned) -1);
  1005. part.whichOutput = 0;
  1006. partition.append(part);
  1007. }
  1008. lastContentLength = cur.size;
  1009. }
  1010. partition.append(*new PartitionPoint(idx, 0, cur.headerSize, cur.size, cur.size));
  1011. }
  1012. }
  1013. if (srcFormat.isCsv())
  1014. examineCsvStructure();
  1015. }
  1016. void FileSprayer::calculateNoSplitPartition()
  1017. {
  1018. LOG(MCdebugProgressDetail, job, "Setting up no split partition");
  1019. if (!usePullOperation() && !srcFormat.equals(tgtFormat))
  1020. throwError(DFTERR_NoSplitPushChangeFormat);
  1021. #if 1
  1022. //split by number
  1023. unsigned numSources = sources.ordinality();
  1024. unsigned numTargets = targets.ordinality();
  1025. if (numSources < numTargets)
  1026. numTargets = numSources;
  1027. unsigned tally = 0;
  1028. unsigned curTarget = 0;
  1029. ForEachItemIn(idx, sources)
  1030. {
  1031. FilePartInfo & cur = sources.item(idx);
  1032. partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
  1033. tally += numTargets;
  1034. if (tally >= numSources)
  1035. {
  1036. tally -= numSources;
  1037. curTarget++;
  1038. }
  1039. }
  1040. #else
  1041. //split by size
  1042. offset_t totalSize = 0;
  1043. ForEachItemIn(i, sources)
  1044. totalSize += sources.item(i).size;
  1045. unsigned numTargets = targets.ordinality();
  1046. offset_t chunkSize = (totalSize / numTargets);
  1047. offset_t nextBoundary = chunkSize;
  1048. offset_t sizeSoFar = 0;
  1049. unsigned curTarget = 0;
  1050. ForEachItemIn(idx, sources)
  1051. {
  1052. FilePartInfo & cur = sources.item(idx);
  1053. offset_t nextSize = sizeSoFar + cur.size;
  1054. if ((sizeSoFar >= nextBoundary) ||
  1055. ((nextSize > nextBoundary) &&
  1056. (nextBoundary - sizeSoFar < nextSize - nextBoundary)))
  1057. {
  1058. if (curTarget != numTargets-1)
  1059. {
  1060. curTarget++;
  1061. nextBoundary += chunkSize;
  1062. }
  1063. }
  1064. RemoteFilename curFilename;
  1065. curFilename.set(cur.filename);
  1066. setCanAccessDirectly(curFilename);
  1067. partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, cur.size, cur.size)); // outputoffset == 0
  1068. sizeSoFar = nextSize;
  1069. }
  1070. #endif
  1071. if (srcFormat.isCsv())
  1072. examineCsvStructure();
  1073. }
  1074. void FileSprayer::calculateSprayPartition()
  1075. {
  1076. LOG(MCdebugProgressDetail, job, "Calculating N:M partition");
  1077. bool calcOutput = needToCalcOutput();
  1078. FormatPartitionerArray partitioners;
  1079. unsigned numParts = targets.ordinality();
  1080. StringBuffer remoteFilename;
  1081. ForEachItemIn(idx, sources)
  1082. {
  1083. IFormatPartitioner * partitioner = createPartitioner(idx, calcOutput, numParts);
  1084. partitioner->setAbort(&fileSprayerAbortChecker);
  1085. partitioners.append(*partitioner);
  1086. }
  1087. unsigned numProcessors = partitioners.ordinality();
  1088. unsigned maxConnections = numParallelConnections(numProcessors);
  1089. //Throttle maximum number of concurrent transfers by starting n threads, and
  1090. //then waiting for one to complete before going on to the next
  1091. Semaphore sem;
  1092. unsigned goIndex;
  1093. for (goIndex=0; goIndex < maxConnections; goIndex++)
  1094. partitioners.item(goIndex).calcPartitions(&sem);
  1095. for (; goIndex<numProcessors; goIndex++)
  1096. {
  1097. sem.wait();
  1098. partitioners.item(goIndex).calcPartitions(&sem);
  1099. }
  1100. for (unsigned waitCount=0; waitCount < maxConnections;waitCount++)
  1101. sem.wait();
  1102. ForEachItemIn(idx2, partitioners)
  1103. partitioners.item(idx2).getResults(partition);
  1104. if ((partitioners.ordinality() > 0) && !srcAttr->hasProp("ECL"))
  1105. {
  1106. // Store discovered CSV record structure into target logical file.
  1107. storeCsvRecordStructure(partitioners.item(0));
  1108. }
  1109. if (compressedInput && compressOutput && streq(encryptKey.str(),decryptKey.str()))
  1110. copyCompressed = true;
  1111. }
  1112. void FileSprayer::storeCsvRecordStructure(IFormatPartitioner &partitioner)
  1113. {
  1114. StringBuffer recStru;
  1115. partitioner.getRecordStructure(recStru);
  1116. if ((recStru.length() > 0) && strstr(recStru.str(),"END;"))
  1117. {
  1118. if (distributedTarget)
  1119. distributedTarget->setECL(recStru.str());
  1120. }
  1121. }
  1122. IFormatPartitioner * FileSprayer::createPartitioner(aindex_t index, bool calcOutput, unsigned numParts)
  1123. {
  1124. StringBuffer remoteFilename;
  1125. FilePartInfo & cur = sources.item(index);
  1126. cur.filename.getRemotePath(remoteFilename.clear());
  1127. LOG(MCdebugInfoDetail, job, "Partition %d(%s)", index, remoteFilename.str());
  1128. srcFormat.quotedTerminator = options->getPropBool("@quotedTerminator", true);
  1129. const SocketEndpoint & ep = cur.filename.queryEndpoint();
  1130. IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);
  1131. // CSV record structure discovery of the first source
  1132. bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
  1133. partitioner->setRecordStructurePresent(isRecordStructurePresent);
  1134. RemoteFilename name;
  1135. name.set(cur.filename);
  1136. setCanAccessDirectly(name);
  1137. partitioner->setPartitionRange(totalSize, cur.offset, cur.size, cur.headerSize, numParts);
  1138. partitioner->setSource(index, name, compressedInput, decryptKey);
  1139. return partitioner;
  1140. }
  1141. void FileSprayer::examineCsvStructure()
  1142. {
  1143. if (srcAttr && srcAttr->hasProp("ECL"))
  1144. // Already has, keep it.
  1145. return;
  1146. bool calcOutput = needToCalcOutput();
  1147. if (sources.ordinality())
  1148. {
  1149. Owned<IFormatPartitioner> partitioner = createPartitioner(0, calcOutput, targets.ordinality());
  1150. storeCsvRecordStructure(*partitioner);
  1151. }
  1152. else
  1153. LOG(MCdebugInfoDetail, job, "No source CSV file to examine.");
  1154. }
  1155. void FileSprayer::calculateOutputOffsets()
  1156. {
  1157. unsigned headerSize = getHeaderSize(tgtFormat.type);
  1158. offset_t outputOffset = headerSize;
  1159. unsigned curOutput = 0;
  1160. ForEachItemIn(idx, partition)
  1161. {
  1162. PartitionPoint & cur = partition.item(idx);
  1163. if (curOutput != cur.whichOutput)
  1164. {
  1165. outputOffset = headerSize;
  1166. curOutput = cur.whichOutput;
  1167. }
  1168. cur.outputOffset = outputOffset;
  1169. outputOffset += cur.outputLength;
  1170. }
  1171. }
  1172. void FileSprayer::checkFormats()
  1173. {
  1174. if (unknownSourceFormat)
  1175. {
  1176. //If target format is specified, use that - not really very good, but...
  1177. srcFormat.set(tgtFormat);
  1178. //If format omitted, and number of parts are the same then okay to omit the format
  1179. if (sources.ordinality() == targets.ordinality() && !disallowImplicitReplicate())
  1180. copySource = true;
  1181. bool noSplit = !allowSplit();
  1182. if (!replicate && !copySource && !noSplit)
  1183. {
  1184. //copy to a single target => assume same format concatenated.
  1185. if (targets.ordinality() != 1)
  1186. {
  1187. if (!unknownTargetFormat)
  1188. throwError(DFTERR_TargetFormatUnknownSource);
  1189. else
  1190. throwError(DFTERR_FormatNotSpecified);
  1191. }
  1192. }
  1193. }
  1194. FileFormatType srcType = srcFormat.type;
  1195. FileFormatType tgtType = tgtFormat.type;
  1196. if (srcType != tgtType)
  1197. {
  1198. switch (srcType)
  1199. {
  1200. case FFTfixed:
  1201. if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
  1202. throwError(DFTERR_BadSrcTgtCombination);
  1203. break;
  1204. case FFTvariable:
  1205. if ((tgtType != FFTfixed) && (tgtType != FFTblocked)&& (tgtType != FFTvariablebigendian))
  1206. throwError(DFTERR_BadSrcTgtCombination);
  1207. break;
  1208. case FFTvariablebigendian:
  1209. if ((tgtType != FFTfixed) && (tgtType != FFTblocked) && (tgtType != FFTvariable))
  1210. throwError(DFTERR_BadSrcTgtCombination);
  1211. break;
  1212. case FFTblocked:
  1213. if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
  1214. throwError(DFTERR_BadSrcTgtCombination);
  1215. break;
  1216. case FFTcsv:
  1217. throwError(DFTERR_BadSrcTgtCombination);
  1218. case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  1219. switch (tgtFormat.type)
  1220. {
  1221. case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  1222. break;
  1223. default:
  1224. throwError(DFTERR_OnlyConvertUtfUtf);
  1225. break;
  1226. }
  1227. break;
  1228. }
  1229. }
  1230. switch (srcType)
  1231. {
  1232. case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  1233. if (srcFormat.rowTag)
  1234. {
  1235. srcFormat.maxRecordSize = srcFormat.maxRecordSize > DEFAULT_MAX_XML_RECORD_SIZE ? srcFormat.maxRecordSize : DEFAULT_MAX_XML_RECORD_SIZE;
  1236. }
  1237. break;
  1238. default:
  1239. break;
  1240. }
  1241. }
  1242. void FileSprayer::calibrateProgress()
  1243. {
  1244. sizeToBeRead = 0;
  1245. ForEachItemIn(idx, transferSlaves)
  1246. sizeToBeRead += transferSlaves.item(idx).getInputSize();
  1247. totalLengthRead = calcSizeReadAlready();
  1248. }
  1249. void FileSprayer::checkForOverlap()
  1250. {
  1251. unsigned num = std::min(sources.ordinality(), targets.ordinality());
  1252. for (unsigned idx = 0; idx < num; idx++)
  1253. {
  1254. RemoteFilename & srcName = sources.item(idx).filename;
  1255. RemoteFilename & tgtName = targets.item(idx).filename;
  1256. if (srcName.equals(tgtName))
  1257. {
  1258. StringBuffer x;
  1259. srcName.getPath(x);
  1260. throwError1(DFTERR_CopyFileOntoSelf, x.str());
  1261. }
  1262. }
  1263. }
  1264. void FileSprayer::cleanupRecovery()
  1265. {
  1266. progressTree->setPropBool(ANcomplete, true);
  1267. #ifdef CLEANUP_RECOVERY
  1268. progressTree->removeProp(ANhasPartition);
  1269. progressTree->removeProp(ANhasProgress);
  1270. progressTree->removeProp(ANhasRecovery);
  1271. progressTree->removeProp(PNpartition);
  1272. progressTree->removeProp(PNprogress);
  1273. progressTree->removeProp(ANpull);
  1274. #endif
  1275. }
  1276. bool FileSprayer::usePushWholeOperation() const
  1277. {
  1278. return targets.item(0).filename.isUrl();
  1279. }
  1280. bool FileSprayer::canRenameOutput() const
  1281. {
  1282. return targets.item(0).filename.queryFileSystemProperties().canRename;
  1283. }
  1284. void FileSprayer::checkSprayOptions()
  1285. {
  1286. if (isSafeMode && !canRenameOutput())
  1287. {
  1288. isSafeMode = false;
  1289. UWARNLOG("Safe mode is disable because the target cannot be renamed");
  1290. }
  1291. }
  1292. //Several files being pulled to the same machine - only run ftslave once...
  1293. void FileSprayer::commonUpSlaves()
  1294. {
  1295. unsigned max = partition.ordinality();
  1296. bool pull = usePullOperation();
  1297. bool pushWhole = usePushWholeOperation();
  1298. bool slaveMatchesOutput = pull || pushWhole; // One slave per target if a url
  1299. for (unsigned idx = 0; idx < max; idx++)
  1300. {
  1301. PartitionPoint & cur = partition.item(idx);
  1302. cur.whichSlave = slaveMatchesOutput ? cur.whichOutput : cur.whichInput;
  1303. if (cur.whichSlave == -1)
  1304. cur.whichSlave = 0;
  1305. }
  1306. if (options->getPropBool(ANnocommon, true) || pushWhole)
  1307. return;
  1308. //First work out which are the same slaves, and then map the partition.
  1309. //Previously it was n^2 in partition, which is fine until you spray 100K files.
  1310. unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
  1311. unsigned * slaveMapping = new unsigned [numSlaves];
  1312. for (unsigned i = 0; i < numSlaves; i++)
  1313. slaveMapping[i] = i;
  1314. if (pull)
  1315. {
  1316. for (unsigned i1 = 1; i1 < numSlaves; i1++)
  1317. {
  1318. TargetLocation & cur = targets.item(i1);
  1319. for (unsigned i2 = 0; i2 < i1; i2++)
  1320. {
  1321. if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
  1322. {
  1323. slaveMapping[i1] = i2;
  1324. break;
  1325. }
  1326. }
  1327. }
  1328. }
  1329. else
  1330. {
  1331. for (unsigned i1 = 1; i1 < numSlaves; i1++)
  1332. {
  1333. FilePartInfo & cur = sources.item(i1);
  1334. for (unsigned i2 = 0; i2 < i1; i2++)
  1335. {
  1336. if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
  1337. {
  1338. slaveMapping[i1] = i2;
  1339. break;
  1340. }
  1341. }
  1342. }
  1343. }
  1344. for (unsigned i3 = 0; i3 < max; i3++)
  1345. {
  1346. PartitionPoint & cur = partition.item(i3);
  1347. cur.whichSlave = slaveMapping[cur.whichSlave];
  1348. }
  1349. delete [] slaveMapping;
  1350. }
  1351. void FileSprayer::analyseFileHeaders(bool setcurheadersize)
  1352. {
  1353. FileFormatType defaultFormat = FFTunknown;
  1354. switch (srcFormat.type)
  1355. {
  1356. case FFTutf:
  1357. case FFTutf8:
  1358. defaultFormat = FFTutf8n;
  1359. break;
  1360. case FFTutf16:
  1361. defaultFormat = FFTutf16be;
  1362. break;
  1363. case FFTutf32:
  1364. defaultFormat = FFTutf32be;
  1365. break;
  1366. default:
  1367. if (!srcFormat.rowTag)
  1368. return;
  1369. break;
  1370. }
  1371. FileFormatType actualType = FFTunknown;
  1372. unsigned numEmptyXml = 0;
  1373. ForEachItemIn(idx, sources)
  1374. {
  1375. FilePartInfo & cur = sources.item(idx);
  1376. StringBuffer s;
  1377. cur.filename.getPath(s);
  1378. LOG(MCdebugInfo, job, "Examine header of file %s", s.str());
  1379. Owned<IFile> file = createIFile(cur.filename);
  1380. Owned<IFileIO> io = file->open(IFOread);
  1381. if (!io)
  1382. {
  1383. StringBuffer s;
  1384. cur.filename.getRemotePath(s);
  1385. throwError1(DFTERR_CouldNotOpenFilePart, s.str());
  1386. }
  1387. if (compressedInput) {
  1388. Owned<IExpander> expander;
  1389. if (!decryptKey.isEmpty()) {
  1390. StringBuffer key;
  1391. decrypt(key,decryptKey);
  1392. expander.setown(createAESExpander256(key.length(),key.str()));
  1393. }
  1394. io.setown(createCompressedFileReader(io,expander));
  1395. }
  1396. if (defaultFormat != FFTunknown)
  1397. {
  1398. FileFormatType thisType;
  1399. unsigned char header[4];
  1400. memset(header, 255, sizeof(header)); // fill so don't get clashes if file is very small!
  1401. unsigned numRead = io->read(0, 4, header);
  1402. unsigned headerSize = 0;
  1403. if ((memcmp(header, "\xEF\xBB\xBF", 3) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf8))
  1404. {
  1405. thisType = FFTutf8n;
  1406. headerSize = 3;
  1407. }
  1408. else if ((memcmp(header, "\xFF\xFE\x00\x00", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
  1409. {
  1410. thisType = FFTutf32le;
  1411. headerSize = 4;
  1412. }
  1413. else if ((memcmp(header, "\x00\x00\xFE\xFF", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
  1414. {
  1415. thisType = FFTutf32be;
  1416. headerSize = 4;
  1417. }
  1418. else if ((memcmp(header, "\xFF\xFE", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
  1419. {
  1420. thisType = FFTutf16le;
  1421. headerSize = 2;
  1422. }
  1423. else if ((memcmp(header, "\xFE\xFF", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
  1424. {
  1425. thisType = FFTutf16be;
  1426. headerSize = 2;
  1427. }
  1428. else
  1429. {
  1430. thisType = defaultFormat;
  1431. headerSize = 0;
  1432. }
  1433. if (actualType == FFTunknown)
  1434. actualType = thisType;
  1435. else if (actualType != thisType)
  1436. throwError(DFTERR_PartsDoNotHaveSameUtfFormat);
  1437. if (setcurheadersize) {
  1438. cur.headerSize = headerSize;
  1439. cur.size -= headerSize;
  1440. }
  1441. }
  1442. if (srcFormat.rowTag&&setcurheadersize)
  1443. {
  1444. try
  1445. {
  1446. if (distributedSource)
  1447. {
  1448. // Despray from distributed file
  1449. // Check XMLheader/footer in file level
  1450. DistributedFilePropertyLock lock(distributedSource);
  1451. IPropertyTree &curProps = lock.queryAttributes();
  1452. if (curProps.hasProp(FPheaderLength) && curProps.hasProp(FPfooterLength))
  1453. {
  1454. cur.xmlHeaderLength = curProps.getPropInt(FPheaderLength, 0);
  1455. cur.xmlFooterLength = curProps.getPropInt(FPfooterLength, 0);
  1456. }
  1457. else
  1458. {
  1459. // Try it in file part level
  1460. Owned<IDistributedFilePart> curPart = distributedSource->getPart(idx);
  1461. IPropertyTree& curPartProps = curPart->queryAttributes();
  1462. cur.xmlHeaderLength = curPartProps.getPropInt(FPheaderLength, 0);
  1463. cur.xmlFooterLength = curPartProps.getPropInt(FPfooterLength, 0);
  1464. }
  1465. }
  1466. else
  1467. {
  1468. // Spray from file
  1469. if (srcFormat.headerLength == (unsigned)-1 || srcFormat.footerLength == (unsigned)-1)
  1470. locateContentHeader(io, cur.headerSize, cur.xmlHeaderLength, cur.xmlFooterLength);
  1471. else
  1472. {
  1473. cur.xmlHeaderLength = srcFormat.headerLength;
  1474. cur.xmlFooterLength = srcFormat.footerLength;
  1475. }
  1476. }
  1477. cur.headerSize += (unsigned)cur.xmlHeaderLength;
  1478. if (cur.size >= cur.xmlHeaderLength + cur.xmlFooterLength)
  1479. {
  1480. cur.size -= (cur.xmlHeaderLength + cur.xmlFooterLength);
  1481. if (cur.size <= srcFormat.rowTag.length()) // implies there's a header and footer but no rows (whitespace only)
  1482. cur.size = 0;
  1483. }
  1484. else
  1485. throwError3(DFTERR_InvalidXmlPartSize, cur.size, cur.xmlHeaderLength, cur.xmlFooterLength);
  1486. }
  1487. catch (IException * e)
  1488. {
  1489. if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
  1490. throw;
  1491. e->Release();
  1492. if (!replicate)
  1493. {
  1494. cur.headerSize = 0;
  1495. cur.size = 0;
  1496. }
  1497. numEmptyXml++;
  1498. }
  1499. }
  1500. }
  1501. if (numEmptyXml == sources.ordinality())
  1502. {
  1503. if (numEmptyXml == 1)
  1504. throwError(DFTERR_CannotFindFirstXmlRecord);
  1505. // else
  1506. // throwError(DFTERR_CannotFindAnyXmlRecord);
  1507. }
  1508. if (defaultFormat != FFTunknown)
  1509. srcFormat.type = actualType;
  1510. if (unknownTargetFormat)
  1511. {
  1512. tgtFormat.set(srcFormat);
  1513. if (distributedTarget)
  1514. {
  1515. DistributedFilePropertyLock lock(distributedTarget);
  1516. IPropertyTree &curProps = lock.queryAttributes();
  1517. tgtFormat.save(&curProps);
  1518. }
  1519. }
  1520. }
  1521. void FileSprayer::locateXmlHeader(IFileIO * io, unsigned headerSize, offset_t & xmlHeaderLength, offset_t & xmlFooterLength)
  1522. {
  1523. Owned<IFileIOStream> in = createIOStream(io);
  1524. XmlSplitter splitter(srcFormat);
  1525. BufferedDirectReader reader;
  1526. reader.set(in);
  1527. reader.seek(headerSize);
  1528. if (xmlHeaderLength == 0)
  1529. {
  1530. try
  1531. {
  1532. xmlHeaderLength = splitter.getHeaderLength(reader);
  1533. }
  1534. catch (IException * e)
  1535. {
  1536. if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
  1537. throw;
  1538. e->Release();
  1539. xmlHeaderLength = 0;
  1540. }
  1541. }
  1542. offset_t size = io->size();
  1543. offset_t endOffset = (size > srcFormat.maxRecordSize*2 + headerSize) ? size - srcFormat.maxRecordSize*2 : headerSize;
  1544. reader.seek(endOffset);
  1545. if (xmlFooterLength == 0)
  1546. {
  1547. try
  1548. {
  1549. xmlFooterLength = splitter.getFooterLength(reader, size);
  1550. }
  1551. catch (IException * e)
  1552. {
  1553. if (e->errorCode() != DFTERR_CannotFindLastXmlRecord)
  1554. throw;
  1555. e->Release();
  1556. xmlFooterLength= 0;
  1557. }
  1558. }
  1559. }
  1560. void FileSprayer::locateJsonHeader(IFileIO * io, unsigned headerSize, offset_t & headerLength, offset_t & footerLength)
  1561. {
  1562. Owned<IFileIOStream> in = createIOStream(io);
  1563. JsonSplitter jsplitter(srcFormat, *in);
  1564. headerLength = jsplitter.getHeaderLength();
  1565. footerLength = jsplitter.getFooterLength();
  1566. }
  1567. void FileSprayer::locateContentHeader(IFileIO * io, unsigned headerSize, offset_t & headerLength, offset_t & footerLength)
  1568. {
  1569. if (srcFormat.markup == FMTjson)
  1570. locateJsonHeader(io, headerSize, headerLength, footerLength);
  1571. else
  1572. locateXmlHeader(io, headerSize, headerLength, footerLength);
  1573. }
  1574. void FileSprayer::derivePartitionExtra()
  1575. {
  1576. calculateOutputOffsets();
  1577. assignPartitionFilenames();
  1578. commonUpSlaves();
  1579. IPropertyTreeIterator * iter = NULL;
  1580. if (isRecovering)
  1581. {
  1582. Owned<IPropertyTreeIterator> iter = progressTree->getElements(PNprogress);
  1583. ForEach(*iter)
  1584. {
  1585. OutputProgress & next = * new OutputProgress;
  1586. next.restore(&iter->query());
  1587. next.tree.set(&iter->query());
  1588. progress.append(next);
  1589. }
  1590. assertex(progress.ordinality() == partition.ordinality());
  1591. }
  1592. else
  1593. {
  1594. if (allowRecovery)
  1595. progressTree->setPropBool(ANhasProgress, true);
  1596. ForEachItemIn(idx, partition)
  1597. {
  1598. OutputProgress & next = * new OutputProgress;
  1599. next.whichPartition=idx;
  1600. if (allowRecovery)
  1601. {
  1602. IPropertyTree * progressInfo = progressTree->addPropTree(PNprogress, createPTree(PNprogress, ipt_caseInsensitive));
  1603. next.tree.set(progressInfo);
  1604. next.save(progressInfo);
  1605. }
  1606. progress.append(next);
  1607. }
  1608. }
  1609. }
  1610. void FileSprayer::displayPartition()
  1611. {
  1612. ForEachItemIn(idx, partition)
  1613. {
  1614. partition.item(idx).display();
  1615. #ifdef _DEBUG
  1616. if ((partition.item(idx).whichInput >= 0) && (partition.item(idx).whichInput < sources.ordinality()) )
  1617. LOG(MCdebugInfoDetail, unknownJob,
  1618. " Header size: %" I64F "u, XML header size: %" I64F "u, XML footer size: %" I64F "u",
  1619. sources.item(partition.item(idx).whichInput).headerSize,
  1620. sources.item(partition.item(idx).whichInput).xmlHeaderLength,
  1621. sources.item(partition.item(idx).whichInput).xmlFooterLength
  1622. );
  1623. else
  1624. LOG(MCdebugInfoDetail, unknownJob," No source file for this partition");
  1625. #endif
  1626. }
  1627. }
  1628. void FileSprayer::extractSourceFormat(IPropertyTree * props)
  1629. {
  1630. if (srcFormat.restore(props))
  1631. unknownSourceFormat = false;
  1632. else
  1633. srcFormat.set(FFTfixed, 1);
  1634. bool blockcompressed;
  1635. if (isCompressed(*props, &blockcompressed))
  1636. {
  1637. if (!blockcompressed)
  1638. throwError(DFTERR_RowCompressedNotSupported);
  1639. compressedInput = true;
  1640. }
  1641. else if (!decryptKey.isEmpty())
  1642. compressedInput = true;
  1643. }
  1644. void FileSprayer::gatherFileSizes(bool errorIfMissing)
  1645. {
  1646. FilePartInfoArray fileSizeQueue;
  1647. LOG(MCdebugProgress, job, "Start gathering file sizes...");
  1648. ForEachItemIn(idx, sources)
  1649. {
  1650. FilePartInfo & cur = sources.item(idx);
  1651. if (cur.size == UNKNOWN_PART_SIZE)
  1652. fileSizeQueue.append(OLINK(cur));
  1653. }
  1654. gatherFileSizes(fileSizeQueue, errorIfMissing);
  1655. LOG(MCdebugProgress, job, "Finished gathering file sizes...");
  1656. }
  1657. void FileSprayer::afterGatherFileSizes()
  1658. {
  1659. if (!copyCompressed)
  1660. {
  1661. StringBuffer tailStr;
  1662. ForEachItemIn(idx2, sources)
  1663. {
  1664. FilePartInfo & cur = sources.item(idx2);
  1665. LOG(MCdebugProgress, job, "%9u:%s (size: %llu bytes)",
  1666. idx2, cur.filename.getTail(tailStr.clear()).str(), cur.size
  1667. );
  1668. cur.offset = totalSize;
  1669. totalSize += cur.size;
  1670. if (cur.size % srcFormat.getUnitSize())
  1671. {
  1672. StringBuffer s;
  1673. if (srcFormat.isUtf())
  1674. throwError2(DFTERR_InputIsInvalidMultipleUtf, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
  1675. else
  1676. throwError2(DFTERR_InputIsInvalidMultiple, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
  1677. }
  1678. }
  1679. LOG(MCdebugProgress, job, "----------------------------------------------");
  1680. LOG(MCdebugProgress, job, "All together: %llu bytes in %u file(s)", totalSize, sources.ordinality());
  1681. }
  1682. }
  1683. void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorIfMissing)
  1684. {
  1685. if (fileSizeQueue.ordinality())
  1686. {
  1687. CIArrayOf<FileSizeThread> threads;
  1688. CriticalSection fileSizeCS;
  1689. //Is this a good guess? start square root of number of files threads??
  1690. unsigned numThreads = (unsigned)sqrt((float)fileSizeQueue.ordinality());
  1691. if (numThreads>20)
  1692. numThreads = 20;
  1693. LOG(MCdebugProgress, job, "Gathering %d file sizes on %d threads", fileSizeQueue.ordinality(), numThreads);
  1694. unsigned idx;
  1695. for (idx = 0; idx < numThreads; idx++)
  1696. threads.append(*new FileSizeThread(fileSizeQueue, fileSizeCS, compressedInput&&!copyCompressed, errorIfMissing));
  1697. for (idx = 0; idx < numThreads; idx++)
  1698. threads.item(idx).start();
  1699. for (;;) {
  1700. bool alldone = true;
  1701. StringBuffer err;
  1702. for (idx = 0; idx < numThreads; idx++) {
  1703. bool ok = threads.item(idx).wait(10*1000);
  1704. if (!ok)
  1705. alldone = false;
  1706. }
  1707. if (alldone)
  1708. break;
  1709. }
  1710. for (idx = 0; idx < numThreads; idx++)
  1711. threads.item(idx).queryThrowError();
  1712. }
  1713. }
  1714. void FileSprayer::gatherMissingSourceTarget(IFileDescriptor * source)
  1715. {
  1716. //First gather all the file sizes...
  1717. RemoteFilename filename;
  1718. FilePartInfoArray primparts;
  1719. FilePartInfoArray secparts;
  1720. UnsignedArray secstart;
  1721. FilePartInfoArray queue;
  1722. unsigned numParts = source->numParts();
  1723. for (unsigned idx1=0; idx1 < numParts; idx1++)
  1724. {
  1725. if (!filter.get() || filter->includePart(idx1))
  1726. {
  1727. unsigned numCopies = source->numCopies(idx1);
  1728. if (numCopies>=1) // only add if there is one or more replicates
  1729. {
  1730. for (unsigned copy=0; copy < numCopies; copy++)
  1731. {
  1732. FilePartInfo & next = * new FilePartInfo;
  1733. source->getFilename(idx1, copy, next.filename);
  1734. if (copy==0)
  1735. primparts.append(next);
  1736. else
  1737. {
  1738. if (copy==1)
  1739. secstart.append(secparts.ordinality());
  1740. secparts.append(next);
  1741. }
  1742. queue.append(OLINK(next));
  1743. }
  1744. }
  1745. }
  1746. }
  1747. secstart.append(secparts.ordinality());
  1748. gatherFileSizes(queue, false);
  1749. //Now process the information...
  1750. StringBuffer primaryPath, secondaryPath;
  1751. for (unsigned idx=0; idx < primparts.ordinality(); idx++)
  1752. {
  1753. FilePartInfo & primary = primparts.item(idx);
  1754. offset_t primarySize = primary.size;
  1755. primary.filename.getRemotePath(primaryPath.clear());
  1756. for (unsigned idx2=secstart.item(idx);idx2<secstart.item(idx+1);idx2++)
  1757. {
  1758. FilePartInfo & secondary = secparts.item(idx2);
  1759. offset_t secondarySize = secondary.size;
  1760. secondary.filename.getRemotePath(secondaryPath.clear());
  1761. unsigned sourceCopy = 0;
  1762. if (primarySize != secondarySize)
  1763. {
  1764. if (primarySize == -1)
  1765. {
  1766. sourceCopy = 1;
  1767. }
  1768. else if (secondarySize != -1)
  1769. {
  1770. LOG(MCwarning, unknownJob, "Replicate - primary and secondary copies have different sizes (%" I64F "d v %" I64F "d) for part %u", primarySize, secondarySize, idx);
  1771. continue; // ignore copy
  1772. }
  1773. }
  1774. else
  1775. {
  1776. if (primarySize == -1)
  1777. {
  1778. LOG(MCwarning, unknownJob, "Replicate - neither primary or secondary copies exist for part %u", idx);
  1779. primarySize = 0; // to stop later failure to gather the file size
  1780. }
  1781. continue; // ignore copy
  1782. }
  1783. RemoteFilename *dst= (sourceCopy == 0) ? &secondary.filename : &primary.filename;
  1784. // check nothing else to same destination
  1785. bool done = false;
  1786. ForEachItemIn(dsti,targets) {
  1787. TargetLocation &tgt = targets.item(dsti);
  1788. if (tgt.filename.equals(*dst)) {
  1789. done = true;
  1790. break;
  1791. }
  1792. }
  1793. if (!done) {
  1794. sources.append(* new FilePartInfo(*((sourceCopy == 0)? &primary.filename : &secondary.filename)));
  1795. targets.append(* new TargetLocation(*dst));
  1796. sources.tos().size = (sourceCopy == 0) ? primarySize : secondarySize;
  1797. }
  1798. }
  1799. }
  1800. filter.clear(); // we have already filtered
  1801. }
  1802. unsigned __int64 FileSprayer::calcSizeReadAlready()
  1803. {
  1804. unsigned __int64 sizeRead = 0;
  1805. ForEachItemIn(idx, progress)
  1806. {
  1807. OutputProgress & cur = progress.item(idx);
  1808. sizeRead += cur.inputLength;
  1809. }
  1810. return sizeRead;
  1811. }
  1812. unsigned __int64 FileSprayer::getSizeReadAlready()
  1813. {
  1814. return totalLengthRead;
  1815. }
  1816. PartitionPoint & FileSprayer::createLiteral(size32_t len, const void * data, unsigned idx)
  1817. {
  1818. PartitionPoint & next = * new PartitionPoint;
  1819. next.inputOffset = 0;
  1820. next.inputLength = len;
  1821. next.outputLength = len;
  1822. next.fixedText.set(len, data);
  1823. if (partition.isItem(idx))
  1824. {
  1825. PartitionPoint & cur = partition.item(idx);
  1826. next.whichInput = cur.whichInput;
  1827. next.whichOutput = cur.whichOutput;
  1828. }
  1829. else
  1830. {
  1831. next.whichInput = (unsigned)-1;
  1832. next.whichOutput = (unsigned)-1;
  1833. }
  1834. return next;
  1835. }
  1836. void FileSprayer::addHeaderFooter(size32_t len, const void * data, unsigned idx, bool before)
  1837. {
  1838. PartitionPoint & next = createLiteral(len, data, idx);
  1839. unsigned insertPos = before ? idx : idx+1;
  1840. partition.add(next, insertPos);
  1841. }
  1842. //MORE: This should be moved to jlib....
  1843. //MORE: I should really be doing this on unicode characters and supporting \u \U
  1844. void replaceEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid)
  1845. {
  1846. out.ensureCapacity(strlen(in)+1);
  1847. while (*in)
  1848. {
  1849. char c = *in++;
  1850. if (c == '\\')
  1851. {
  1852. char next = *in;
  1853. if (next)
  1854. {
  1855. in++;
  1856. switch (next)
  1857. {
  1858. case 'a': c = '\a'; break;
  1859. case 'b': c = '\b'; break;
  1860. case 'f': c = '\f'; break;
  1861. case 'n': c = '\n'; break;
  1862. case 'r': c = '\r'; break;
  1863. case 't': c = '\t'; break;
  1864. case 'v': c = '\v'; break;
  1865. case '\\':
  1866. case '\'':
  1867. case '?':
  1868. case '\"': break;
  1869. case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7':
  1870. {
  1871. c = next - '0';
  1872. if (*in >= '0' && *in <= '7')
  1873. {
  1874. c = c << 3 | (*in++-'0');
  1875. if (*in >= '0' && *in <= '7')
  1876. c = c << 3 | (*in++-'0');
  1877. }
  1878. break;
  1879. }
  1880. case 'x':
  1881. c = 0;
  1882. while (isxdigit(*in))
  1883. {
  1884. next = *in++;
  1885. c = c << 4;
  1886. if (next >= '0' && next <= '9') c |= (next - '0');
  1887. else if (next >= 'A' && next <= 'F') c |= (next - 'A' + 10);
  1888. else if (next >= 'a' && next <= 'f') c |= (next - 'a' + 10);
  1889. }
  1890. break;
  1891. default:
  1892. if (errorIfInvalid)
  1893. throw MakeStringException(1, "unrecognised character escape sequence '\\%c'", next);
  1894. in--; // keep it as is.
  1895. break;
  1896. }
  1897. }
  1898. }
  1899. out.append(c);
  1900. }
  1901. }
  1902. void FileSprayer::addHeaderFooter(const char * data, unsigned idx, bool before)
  1903. {
  1904. StringBuffer expanded;
  1905. //MORE: Should really expand as unicode, so can have unicode control characters.
  1906. decodeCppEscapeSequence(expanded, data, true);
  1907. MemoryBuffer translated;
  1908. convertUtf(translated, getUtfFormatType(tgtFormat.type), expanded.length(), expanded.str(), UtfReader::Utf8);
  1909. //MORE: Convert from utf-8 to target format.
  1910. addHeaderFooter(translated.length(), translated.toByteArray(), idx, before);
  1911. }
  1912. void FileSprayer::cloneHeaderFooter(unsigned idx, bool isHeader)
  1913. {
  1914. PartitionPoint & cur = partition.item(idx);
  1915. FilePartInfo & curSrc = sources.item(cur.whichInput);
  1916. PartitionPoint & next = * new PartitionPoint;
  1917. //NB: headerSize include the size of the xmlHeader; size includes neither header or footers.
  1918. if (isHeader)
  1919. // Set offset to the XML header
  1920. next.inputOffset = curSrc.headerSize - curSrc.xmlHeaderLength;
  1921. else
  1922. //Set offset to the XML footer
  1923. next.inputOffset = curSrc.headerSize + curSrc.size;
  1924. next.inputLength = isHeader ? curSrc.xmlHeaderLength : curSrc.xmlFooterLength;
  1925. next.outputLength = needToCalcOutput() ? next.inputLength : 0;
  1926. next.whichInput = cur.whichInput;
  1927. next.whichOutput = cur.whichOutput;
  1928. if (isHeader)
  1929. partition.add(next, idx);
  1930. else
  1931. partition.add(next, idx+1);
  1932. }
  1933. void FileSprayer::addPrefix(size32_t len, const void * data, unsigned idx, PartitionPointArray & partitionWork)
  1934. {
  1935. //Merge header and original partition item into partitionWork array
  1936. PartitionPoint & header = createLiteral(len, data, idx);
  1937. partitionWork.append(header);
  1938. PartitionPoint & partData = partition.item(idx);
  1939. partitionWork.append(OLINK(partData));
  1940. }
  1941. void FileSprayer::insertHeaders()
  1942. {
  1943. const char * header = options->queryProp("@header");
  1944. const char * footer = options->queryProp("@footer");
  1945. const char * glue = options->queryProp("@glue");
  1946. const char * prefix = options->queryProp(ANprefix);
  1947. bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
  1948. bool keepHeader = options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag;
  1949. if (header || footer || prefix || glue)
  1950. keepHeader = false;
  1951. if (keepHeader && !canKeepHeader)
  1952. throwError(DFTERR_CannotKeepHeaderChangeFormat);
  1953. if (header || footer || keepHeader)
  1954. {
  1955. unsigned idx;
  1956. unsigned curOutput = (unsigned)-1;
  1957. bool footerPending = false;
  1958. for (idx = 0; idx < partition.ordinality(); idx++)
  1959. {
  1960. PartitionPoint & cur = partition.item(idx);
  1961. if (curOutput != cur.whichOutput)
  1962. {
  1963. if (keepHeader)
  1964. {
  1965. if (footerPending && (idx != 0))
  1966. {
  1967. footerPending = false;
  1968. cloneHeaderFooter(idx-1, false);
  1969. idx++;
  1970. }
  1971. //Don't add a header if there are no records in this part, and coming from more than one source file
  1972. //If coming from one then we'll be guaranteed to have a correct header in that part.
  1973. //If more than one, (and not replicating) then we will have failed to know where the header/footers are for this part.
  1974. if ((cur.inputLength == 0) && (sources.ordinality() > 1))
  1975. continue;
  1976. cloneHeaderFooter(idx, true);
  1977. footerPending = true;
  1978. idx++;
  1979. }
  1980. if (footer && (idx != 0))
  1981. {
  1982. addHeaderFooter(footer, idx-1, false);
  1983. idx++;
  1984. }
  1985. if (header)
  1986. {
  1987. addHeaderFooter(header, idx, true);
  1988. idx++;
  1989. }
  1990. curOutput = cur.whichOutput;
  1991. }
  1992. }
  1993. if (keepHeader && footerPending)
  1994. {
  1995. while (idx && partition.item(idx-1).inputLength == 0)
  1996. idx--;
  1997. if (idx)
  1998. {
  1999. cloneHeaderFooter(idx-1, false);
  2000. idx++;
  2001. }
  2002. }
  2003. if (footer)
  2004. {
  2005. addHeaderFooter(footer, idx-1, false);
  2006. idx++;
  2007. }
  2008. }
  2009. if (glue)
  2010. {
  2011. unsigned idx;
  2012. unsigned curInput = 0;
  2013. unsigned curOutput = 0;
  2014. for (idx = 0; idx < partition.ordinality(); idx++)
  2015. {
  2016. PartitionPoint & cur = partition.item(idx);
  2017. if ((curInput != cur.whichInput) && (curOutput == cur.whichOutput))
  2018. {
  2019. addHeaderFooter(glue, idx, true);
  2020. idx++;
  2021. }
  2022. curInput = cur.whichInput;
  2023. curOutput = cur.whichOutput;
  2024. }
  2025. }
  2026. if (prefix)
  2027. {
  2028. if (!srcFormat.equals(tgtFormat))
  2029. throwError(DFTERR_PrefixCannotTransform);
  2030. if (glue || header || footer)
  2031. throwError(DFTERR_PrefixCannotAlsoAddHeader);
  2032. PartitionPointArray partitionWork;
  2033. MemoryBuffer filePrefix;
  2034. filePrefix.setEndian(__LITTLE_ENDIAN);
  2035. for (unsigned idx = 0; idx < partition.ordinality(); idx++)
  2036. {
  2037. PartitionPoint & cur = partition.item(idx);
  2038. filePrefix.clear();
  2039. const char * finger = prefix;
  2040. while (finger)
  2041. {
  2042. StringAttr command;
  2043. const char * comma = strchr(finger, ',');
  2044. if (comma)
  2045. {
  2046. command.set(finger, comma-finger);
  2047. finger = comma+1;
  2048. }
  2049. else
  2050. {
  2051. command.set(finger);
  2052. finger = NULL;
  2053. }
  2054. command.toUpperCase();
  2055. if (memcmp(command, "FILENAME", 8) == 0)
  2056. {
  2057. StringBuffer filename;
  2058. cur.inputName.split(NULL, NULL, &filename, &filename);
  2059. if (command[8] == ':')
  2060. {
  2061. unsigned maxLen = atoi(command+9);
  2062. filename.padTo(maxLen);
  2063. filePrefix.append(maxLen, filename.str());
  2064. }
  2065. else
  2066. {
  2067. filePrefix.append((unsigned)filename.length());
  2068. filePrefix.append(filename.length(), filename.str());
  2069. }
  2070. }
  2071. else if ((memcmp(command, "FILESIZE", 8) == 0) || (command.length() == 2))
  2072. {
  2073. const char * format = command;
  2074. if (memcmp(format, "FILESIZE", 8) == 0)
  2075. {
  2076. if (format[8] == ':')
  2077. format = format+9;
  2078. else
  2079. format = "L4";
  2080. }
  2081. bool bigEndian;
  2082. char c = format[0];
  2083. if (c == 'B')
  2084. bigEndian = true;
  2085. else if (c == 'L')
  2086. bigEndian = false;
  2087. else
  2088. throwError1(DFTERR_InvalidPrefixFormat, format);
  2089. c = format[1];
  2090. if ((c <= '0') || (c > '8'))
  2091. throwError1(DFTERR_InvalidPrefixFormat, format);
  2092. unsigned length = (c - '0');
  2093. unsigned __int64 value = cur.inputLength;
  2094. byte temp[8];
  2095. for (unsigned i=0; i<length; i++)
  2096. {
  2097. temp[i] = (byte)value;
  2098. value >>= 8;
  2099. }
  2100. if (value)
  2101. throwError(DFTERR_PrefixTooSmall);
  2102. if (bigEndian)
  2103. {
  2104. byte temp2[8];
  2105. _cpyrevn(&temp2, &temp, length);
  2106. filePrefix.append(length, &temp2);
  2107. }
  2108. else
  2109. filePrefix.append(length, &temp);
  2110. }
  2111. else
  2112. throwError1(DFTERR_InvalidPrefixFormat, command.get());
  2113. }
  2114. addPrefix(filePrefix.length(), filePrefix.toByteArray(), idx, partitionWork);
  2115. }
  2116. LOG(MCdebugProgress, job, "Publish headers");
  2117. partition.swapWith(partitionWork);
  2118. }
  2119. }
  2120. bool FileSprayer::needToCalcOutput()
  2121. {
  2122. return !usePullOperation() || options->getPropBool(ANverify);
  2123. }
  2124. unsigned FileSprayer::numParallelConnections(unsigned limit)
  2125. {
  2126. unsigned maxConnections = options->getPropInt(ANmaxConnections, limit);
  2127. if ((maxConnections == 0) || (maxConnections > limit)) maxConnections = limit;
  2128. return maxConnections;
  2129. }
  2130. unsigned FileSprayer::numParallelSlaves()
  2131. {
  2132. unsigned numPullers = transferSlaves.ordinality();
  2133. unsigned maxConnections = DEFAULT_MAX_CONNECTIONS;
  2134. unsigned connectOption = options->getPropInt(ANmaxConnections, 0);
  2135. if (connectOption)
  2136. maxConnections = connectOption;
  2137. else if (mirroring && (maxConnections * 3 < numPullers))
  2138. maxConnections = numPullers/3;
  2139. if (maxConnections > numPullers) maxConnections = numPullers;
  2140. return maxConnections;
  2141. }
  2142. void FileSprayer::performTransfer()
  2143. {
  2144. unsigned numSlaves = transferSlaves.ordinality();
  2145. unsigned maxConnections = numParallelSlaves();
  2146. unsigned failure = options->getPropInt("@fail", 0);
  2147. if (failure) maxConnections = 1;
  2148. calibrateProgress();
  2149. numSlavesCompleted = 0;
  2150. if (maxConnections > 1)
  2151. shuffle(transferSlaves);
  2152. if (progressReport)
  2153. progressReport->setRange(getSizeReadAlready(), sizeToBeRead, transferSlaves.ordinality());
  2154. LOG(MCdebugInfo, job, "Begin to transfer parts (%d threads)\n", maxConnections);
  2155. //Throttle maximum number of concurrent transfers by starting n threads, and
  2156. //then waiting for one to complete before going on to the next
  2157. lastProgressTick = msTick();
  2158. Semaphore sem;
  2159. unsigned goIndex;
  2160. for (goIndex=0; goIndex<maxConnections; goIndex++)
  2161. transferSlaves.item(goIndex).go(sem);
  2162. //MORE: Should abort early if we get an error on one of the transfers...
  2163. // to do that we will need a queue of completed pullers.
  2164. for (; !error && goIndex<numSlaves;goIndex++)
  2165. {
  2166. waitForTransferSem(sem);
  2167. numSlavesCompleted++;
  2168. transferSlaves.item(goIndex).go(sem);
  2169. }
  2170. for (unsigned waitCount=0; waitCount<maxConnections;waitCount++)
  2171. {
  2172. waitForTransferSem(sem);
  2173. numSlavesCompleted++;
  2174. }
  2175. if (error)
  2176. throw LINK(error);
  2177. bool ok = true;
  2178. ForEachItemIn(idx3, transferSlaves)
  2179. {
  2180. FileTransferThread & cur = transferSlaves.item(idx3);
  2181. if (!cur.ok)
  2182. ok = false;
  2183. }
  2184. if (!ok) {
  2185. if (isAborting())
  2186. throwError(DFTERR_CopyAborted);
  2187. else
  2188. throwError(DFTERR_CopyFailed);
  2189. }
  2190. }
  2191. void FileSprayer::pullParts()
  2192. {
  2193. bool needCalcCRC = calcCRC();
  2194. LOG(MCdebugInfoDetail, job, "Calculate CRC = %d", needCalcCRC);
  2195. ForEachItemIn(idx, targets)
  2196. {
  2197. FileTransferThread & next = * new FileTransferThread(*this, FTactionpull, targets.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
  2198. transferSlaves.append(next);
  2199. }
  2200. ForEachItemIn(idx3, partition)
  2201. {
  2202. PartitionPoint & cur = partition.item(idx3);
  2203. if (!filter || filter->includePart(cur.whichOutput))
  2204. transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
  2205. }
  2206. performTransfer();
  2207. }
  2208. //Execute a parallel write to a remote part, but each slave writes the entire contents of the file
  2209. void FileSprayer::pushWholeParts()
  2210. {
  2211. bool needCalcCRC = calcCRC();
  2212. LOG(MCdebugInfoDetail, job, "Calculate CRC = %d", needCalcCRC);
  2213. //Create a slave for each of the target files, but execute it on the node corresponding to the first source file
  2214. //For container mode this will need to execute on this node, or on a load balanced service
  2215. ForEachItemIn(idx, targets)
  2216. {
  2217. TargetLocation & cur = targets.item(idx);
  2218. SocketEndpoint ep;
  2219. ForEachItemIn(idx3, partition)
  2220. {
  2221. PartitionPoint & cur = partition.item(idx3);
  2222. if (cur.whichOutput == idx)
  2223. {
  2224. ep = sources.item(cur.whichInput).filename.queryEndpoint();
  2225. break;
  2226. }
  2227. }
  2228. FileTransferThread & next = * new FileTransferThread(*this, FTactionpull, ep, needCalcCRC, wuid);
  2229. transferSlaves.append(next);
  2230. }
  2231. ForEachItemIn(idx3, partition)
  2232. {
  2233. PartitionPoint & cur = partition.item(idx3);
  2234. if (!filter || filter->includePart(cur.whichOutput))
  2235. transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
  2236. }
  2237. performTransfer();
  2238. }
  2239. void FileSprayer::pushParts()
  2240. {
  2241. bool needCalcCRC = calcCRC();
  2242. ForEachItemIn(idx, sources)
  2243. {
  2244. FileTransferThread & next = * new FileTransferThread(*this, FTactionpush, sources.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
  2245. transferSlaves.append(next);
  2246. }
  2247. ForEachItemIn(idx3, partition)
  2248. {
  2249. PartitionPoint & cur = partition.item(idx3);
  2250. if (!filter || filter->includePart(cur.whichOutput))
  2251. transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
  2252. }
  2253. performTransfer();
  2254. }
  2255. void FileSprayer::removeSource()
  2256. {
  2257. LOG(MCwarning, job, "Source file removal not yet implemented");
  2258. }
  2259. bool FileSprayer::restorePartition()
  2260. {
  2261. if (allowRecovery && progressTree->getPropBool(ANhasPartition))
  2262. {
  2263. IPropertyTreeIterator * iter = progressTree->getElements(PNpartition);
  2264. ForEach(*iter)
  2265. {
  2266. PartitionPoint & next = * new PartitionPoint;
  2267. next.restore(&iter->query());
  2268. partition.append(next);
  2269. }
  2270. iter->Release();
  2271. return (partition.ordinality() != 0);
  2272. }
  2273. return false;
  2274. }
  2275. void FileSprayer::savePartition()
  2276. {
  2277. if (allowRecovery)
  2278. {
  2279. ForEachItemIn(idx, partition)
  2280. {
  2281. IPropertyTree * child = createPTree(PNpartition, ipt_caseInsensitive);
  2282. partition.item(idx).save(child);
  2283. progressTree->addPropTree(PNpartition, child);
  2284. }
  2285. progressTree->setPropBool(ANhasPartition, true);
  2286. }
  2287. }
  2288. void FileSprayer::setCopyCompressedRaw()
  2289. {
  2290. assertex(compressedInput && compressOutput);
  2291. // encrypt/decrypt keys should be same
  2292. compressedInput = false;
  2293. compressOutput = false;
  2294. calcedInputCRC = true;
  2295. cachedInputCRC = false;
  2296. copyCompressed = true;
  2297. }
  2298. void FileSprayer::setError(const SocketEndpoint & ep, IException * e)
  2299. {
  2300. CriticalBlock lock(errorCS);
  2301. if (!error)
  2302. {
  2303. StringBuffer url;
  2304. ep.getUrlStr(url);
  2305. error.setown(MakeStringException(e->errorCode(), "%s", e->errorMessage(url.append(": ")).str()));
  2306. }
  2307. }
  2308. void FileSprayer::setPartFilter(IDFPartFilter * _filter)
  2309. {
  2310. filter.set(_filter);
  2311. }
  2312. void FileSprayer::setProgress(IDaftProgress * _progress)
  2313. {
  2314. progressReport = _progress;
  2315. }
  2316. void FileSprayer::setAbort(IAbortRequestCallback * _abort)
  2317. {
  2318. abortChecker = _abort;
  2319. }
  2320. void FileSprayer::setReplicate(bool _replicate)
  2321. {
  2322. replicate = _replicate;
  2323. }
  2324. void FileSprayer::setSource(IDistributedFile * source)
  2325. {
  2326. distributedSource.set(source);
  2327. srcAttr.setown(createPTreeFromIPT(&source->queryAttributes()));
  2328. IPropertyTree *history = source->queryHistory();
  2329. if (history)
  2330. srcHistory.setown(createPTreeFromIPT(history));
  2331. compressedInput = source->isCompressed();
  2332. extractSourceFormat(srcAttr);
  2333. unsigned numParts = source->numParts();
  2334. for (unsigned idx=0; idx < numParts; idx++)
  2335. {
  2336. Owned<IDistributedFilePart> curPart = source->getPart(idx);
  2337. RemoteFilename rfn;
  2338. FilePartInfo & next = * new FilePartInfo(curPart->getFilename(rfn));
  2339. next.extractExtra(*curPart);
  2340. if (curPart->numCopies()>1)
  2341. next.mirrorFilename.set(curPart->getFilename(rfn,1));
  2342. // don't set the following here - force to check disk
  2343. //next.size = curPart->getFileSize(true,false);
  2344. //next.psize = curPart->getDiskSize(true,false);
  2345. sources.append(next);
  2346. }
  2347. gatherFileSizes(false);
  2348. }
  2349. void FileSprayer::setSource(IFileDescriptor * source)
  2350. {
  2351. setSource(source, 0, 1);
  2352. //Now get the size of the files directly (to check they exist). If they don't exist then switch to the backup instead.
  2353. gatherFileSizes(false);
  2354. }
  2355. void FileSprayer::setSource(IFileDescriptor * source, unsigned copy, unsigned mirrorCopy)
  2356. {
  2357. IPropertyTree *attr = &source->queryProperties();
  2358. compressedInput = source->isCompressed();
  2359. extractSourceFormat(attr);
  2360. srcAttr.setown(createPTreeFromIPT(&source->queryProperties()));
  2361. IPropertyTree *history = source->queryHistory();
  2362. if (history)
  2363. srcHistory.setown(createPTreeFromIPT(history));
  2364. extractSourceFormat(srcAttr);
  2365. RemoteFilename filename;
  2366. unsigned numParts = source->numParts();
  2367. for (unsigned idx=0; idx < numParts; idx++)
  2368. {
  2369. if (source->isMulti(idx))
  2370. {
  2371. RemoteMultiFilename multi;
  2372. source->getMultiFilename(idx, copy, multi);
  2373. multi.expandWild();
  2374. ForEachItemIn(i, multi)
  2375. {
  2376. const RemoteFilename &rfn = multi.item(i);
  2377. FilePartInfo & next = * new FilePartInfo(rfn);
  2378. Owned<IPartDescriptor> part = source->getPart(idx);
  2379. next.extractExtra(*part);
  2380. // If size doesn't set here it will be forced to check the file size on disk (expensive)
  2381. next.size = multi.getSize(i);
  2382. sources.append(next);
  2383. }
  2384. //MORE: Need to extract the backup filenames for mirror files.
  2385. }
  2386. else
  2387. {
  2388. source->getFilename(idx, copy, filename);
  2389. FilePartInfo & next = * new FilePartInfo(filename);
  2390. Owned<IPartDescriptor> part = source->getPart(idx);
  2391. next.extractExtra(*part);
  2392. if (mirrorCopy != (unsigned)-1)
  2393. source->getFilename(idx, mirrorCopy, next.mirrorFilename);
  2394. sources.append(next);
  2395. }
  2396. }
  2397. if (sources.ordinality() == 0)
  2398. LOG(MCuserWarning, unknownJob, "The wildcarded source did not match any filenames");
  2399. // throwError(DFTERR_NoFilesMatchWildcard);
  2400. //Now get the size of the files directly (to check they exist). If they don't exist then switch to the backup instead.
  2401. gatherFileSizes(false);
  2402. }
  2403. void FileSprayer::setSource(IDistributedFilePart * part)
  2404. {
  2405. tgtFormat.set(FFTfixed, 1);
  2406. unsigned copy = 0;
  2407. RemoteFilename rfn;
  2408. sources.append(* new FilePartInfo(part->getFilename(rfn,copy)));
  2409. if (compressedInput)
  2410. {
  2411. calcedInputCRC = true;
  2412. cachedInputCRC = false;
  2413. }
  2414. }
  2415. void FileSprayer::setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode)
  2416. {
  2417. extractSourceFormat(&fd->queryProperties());
  2418. tgtFormat.set(srcFormat);
  2419. if (options->getPropBool(ANcrcDiffers, false))
  2420. throwError1(DFTERR_ReplicateOptionNoSupported, "crcDiffers");
  2421. if (options->getPropBool(ANsizedate, false))
  2422. throwError1(DFTERR_ReplicateOptionNoSupported, "sizedate");
  2423. switch (mode)
  2424. {
  2425. case DRMreplicatePrimary: // doesn't work for multi copies
  2426. setSource(fd, 0);
  2427. setTarget(fd, 1);
  2428. break;
  2429. case DRMreplicateSecondary: // doesn't work for multi copies
  2430. setSource(fd, 1);
  2431. setTarget(fd, 0);
  2432. break;
  2433. case DRMcreateMissing: // this does though (but I am not sure works with mult-files)
  2434. gatherMissingSourceTarget(fd);
  2435. break;
  2436. }
  2437. isSafeMode = false;
  2438. mirroring = true;
  2439. replicate = true;
  2440. //Optimize replicating compressed - copy it raw, but it means we can't check the input crc
  2441. assertex(compressOutput == compressedInput);
  2442. if (compressedInput)
  2443. setCopyCompressedRaw();
  2444. }
  2445. void FileSprayer::setTarget(IDistributedFile * target)
  2446. {
  2447. distributedTarget.set(target);
  2448. compressOutput = target->isCompressed();
  2449. LOG(MCdebugInfo, unknownJob, "FileSprayer::setTarget: compressedInput:%s, compressOutput:%s",
  2450. boolToStr(compressedInput),
  2451. boolToStr(compressOutput));
  2452. if (tgtFormat.restore(&target->queryAttributes()))
  2453. unknownTargetFormat = false;
  2454. else
  2455. {
  2456. const char* separator = srcFormat.separate.get();
  2457. if (separator && (strcmp(separator, ",") == 0))
  2458. srcFormat.separate.set("\\,");
  2459. tgtFormat.set(srcFormat);
  2460. if (!unknownSourceFormat)
  2461. {
  2462. DistributedFilePropertyLock lock(target);
  2463. IPropertyTree &curProps = lock.queryAttributes();
  2464. tgtFormat.save(&curProps);
  2465. }
  2466. }
  2467. unsigned copy = 0;
  2468. unsigned numParts = target->numParts();
  2469. if (numParts == 0)
  2470. throwError(DFTERR_NoPartsInDestination);
  2471. for (unsigned idx=0; idx < numParts; idx++)
  2472. {
  2473. Owned<IDistributedFilePart> curPart(target->getPart(idx));
  2474. RemoteFilename rfn;
  2475. TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy));
  2476. targets.append(next);
  2477. }
  2478. checkSprayOptions();
  2479. }
  2480. void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
  2481. {
  2482. if (tgtFormat.restore(&target->queryProperties()))
  2483. unknownTargetFormat = false;
  2484. else
  2485. tgtFormat.set(srcFormat);
  2486. compressOutput = !encryptKey.isEmpty()||target->isCompressed();
  2487. unsigned numParts = target->numParts();
  2488. if (numParts == 0)
  2489. throwError(DFTERR_NoPartsInDestination);
  2490. RemoteFilename filename;
  2491. for (unsigned idx=0; idx < numParts; idx++)
  2492. {
  2493. target->getFilename(idx, copy, filename);
  2494. targets.append(*new TargetLocation(filename));
  2495. }
  2496. checkSprayOptions();
  2497. }
  2498. void FileSprayer::updateProgress(const OutputProgress & newProgress)
  2499. {
  2500. CriticalBlock block(soFarCrit);
  2501. lastProgressTick = msTick();
  2502. OutputProgress & curProgress = progress.item(newProgress.whichPartition);
  2503. totalLengthRead += (newProgress.inputLength - curProgress.inputLength);
  2504. totalNumReads += (newProgress.numReads - curProgress.numReads);
  2505. totalNumWrites += (newProgress.numWrites - curProgress.numWrites);
  2506. curProgress.set(newProgress);
  2507. if (curProgress.tree)
  2508. curProgress.save(curProgress.tree);
  2509. if (newProgress.status != OutputProgress::StatusRenamed)
  2510. updateSizeRead();
  2511. }
  2512. void FileSprayer::updateSizeRead()
  2513. {
  2514. if (progressDone)
  2515. return;
  2516. unsigned nowTick = msTick();
  2517. //MORE: This call shouldn't need to be done so often...
  2518. unsigned __int64 sizeReadSoFar = getSizeReadAlready();
  2519. bool done = sizeReadSoFar == sizeToBeRead;
  2520. if (progressReport)
  2521. {
  2522. // A cheat to get 100% saying all the slaves have completed - should really
  2523. // pass completed information in the progress info, or return the last progress
  2524. // info when a slave is done.
  2525. unsigned numCompleted = (sizeReadSoFar == sizeToBeRead) ? transferSlaves.ordinality() : numSlavesCompleted;
  2526. if (done || (nowTick - lastOperatorTick >= operatorUpdateFrequency))
  2527. {
  2528. progressReport->onProgress(sizeReadSoFar, sizeToBeRead, numCompleted, totalNumReads, totalNumWrites);
  2529. lastOperatorTick = nowTick;
  2530. progressDone = done;
  2531. }
  2532. }
  2533. if (allowRecovery && recoveryConnection)
  2534. {
  2535. if (done || (nowTick - lastSDSTick >= sdsUpdateFrequency))
  2536. {
  2537. recoveryConnection->commit();
  2538. lastSDSTick = nowTick;
  2539. progressDone = done;
  2540. }
  2541. }
  2542. }
  2543. void FileSprayer::waitForTransferSem(Semaphore & sem)
  2544. {
  2545. while (!sem.wait(EXPECTED_RESPONSE_TIME))
  2546. {
  2547. unsigned timeSinceProgress = msTick() - lastProgressTick;
  2548. if (timeSinceProgress > EXPECTED_RESPONSE_TIME)
  2549. {
  2550. LOG(MCwarning, unknownJob, "No response from any slaves in last %d seconds.", timeSinceProgress/1000);
  2551. CriticalBlock block(soFarCrit);
  2552. StringBuffer list;
  2553. ForEachItemIn(i, transferSlaves)
  2554. transferSlaves.item(i).logIfRunning(list);
  2555. if (timeSinceProgress>RESPONSE_TIME_TIMEOUT)
  2556. {
  2557. //Set an error - the transfer threads will check it after a couple of minutes, and then terminate gracefully
  2558. CriticalBlock lock(errorCS);
  2559. if (!error)
  2560. error.setown(MakeStringException(RFSERR_TimeoutWaitSlave, RFSERR_TimeoutWaitSlave_Text, list.str()));
  2561. }
  2562. }
  2563. }
  2564. }
  2565. void FileSprayer::addTarget(unsigned idx, INode * node)
  2566. {
  2567. RemoteFilename filename;
  2568. filename.set(sources.item(idx).filename);
  2569. filename.setEp(node->endpoint());
  2570. targets.append(* new TargetLocation(filename));
  2571. checkSprayOptions();
  2572. }
  2573. bool FileSprayer::isAborting()
  2574. {
  2575. if (aborting || error)
  2576. return true;
  2577. unsigned nowTick = msTick();
  2578. if (abortChecker && (nowTick - lastAbortCheckTick >= abortCheckFrequency))
  2579. {
  2580. if (abortChecker->abortRequested())
  2581. {
  2582. LOG(MCdebugInfo, unknownJob, "Abort requested via callback");
  2583. aborting = true;
  2584. }
  2585. lastAbortCheckTick = nowTick;
  2586. }
  2587. return aborting || error;
  2588. }
  2589. const char * FileSprayer::querySplitPrefix()
  2590. {
  2591. const char * ret = options->queryProp(ANsplitPrefix);
  2592. if (ret && *ret)
  2593. return ret;
  2594. return NULL;
  2595. }
  2596. const char * FileSprayer::querySlaveExecutable(const IpAddress &ip, StringBuffer &ret) const
  2597. {
  2598. #ifdef _CONTAINERIZED
  2599. return ret.append("ftslave").str();
  2600. #else
  2601. const char * slave = queryFixedSlave();
  2602. try {
  2603. queryFtSlaveExecutable(ip, ret);
  2604. if (ret.length())
  2605. return ret.str();
  2606. }
  2607. catch (IException * e) {
  2608. if (!slave||!*slave)
  2609. throw;
  2610. e->Release();
  2611. }
  2612. if (slave)
  2613. ret.append(slave);
  2614. return ret.str();
  2615. #endif
  2616. }
  2617. const char * FileSprayer::queryFixedSlave() const
  2618. {
  2619. return options->queryProp("@slave");
  2620. }
  2621. void FileSprayer::setTarget(IGroup * target)
  2622. {
  2623. tgtFormat.set(srcFormat);
  2624. if (sources.ordinality() != target->ordinality())
  2625. throwError(DFTERR_ReplicateNumPartsDiffer);
  2626. ForEachItemIn(idx, sources)
  2627. addTarget(idx, &target->queryNode(idx));
  2628. }
  2629. void FileSprayer::setTarget(INode * target)
  2630. {
  2631. tgtFormat.set(srcFormat);
  2632. if (sources.ordinality() != 1)
  2633. throwError(DFTERR_ReplicateNumPartsDiffer);
  2634. addTarget(0, target);
  2635. }
  2636. inline bool nonempty(IPropertyTree *pt, const char *p) { const char *s = pt->queryProp(p); return s&&*s; }
  2637. bool FileSprayer::disallowImplicitReplicate()
  2638. {
  2639. return options->getPropBool(ANsplit) ||
  2640. options->getPropBool(ANnosplit) ||
  2641. querySplitPrefix() ||
  2642. nonempty(options,"@header") ||
  2643. nonempty(options,"@footer") ||
  2644. nonempty(options,"@glue") ||
  2645. nonempty(options,ANprefix) ||
  2646. nonempty(options,ANencryptKey) ||
  2647. nonempty(options,ANdecryptKey);
  2648. }
  2649. void FileSprayer::spray()
  2650. {
  2651. if (!allowSplit() && querySplitPrefix())
  2652. throwError(DFTERR_SplitNoSplitClash);
  2653. aindex_t sourceSize = sources.ordinality();
  2654. bool failIfNoSourceFile = options->getPropBool("@failIfNoSourceFile");
  2655. if (sourceSize == 0)
  2656. {
  2657. if (failIfNoSourceFile)
  2658. throwError(DFTERR_NoFilesMatchWildcard);
  2659. else
  2660. progressTree->setPropBool("@noFileMatch", true);
  2661. }
  2662. LOG(MCdebugInfo, job, "compressedInput:%d, compressOutput:%d", compressedInput, compressOutput);
  2663. LocalAbortHandler localHandler(daftAbortHandler);
  2664. if (allowRecovery && progressTree->getPropBool(ANcomplete))
  2665. {
  2666. LOG(MCdebugInfo, job, "Command completed successfully in previous invocation");
  2667. return;
  2668. }
  2669. checkFormats();
  2670. checkForOverlap();
  2671. progressTree->setPropBool(ANpull, usePullOperation());
  2672. const char * splitPrefix = querySplitPrefix();
  2673. if (!replicate && (sources.ordinality() == targets.ordinality()))
  2674. {
  2675. if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
  2676. copySource = true;
  2677. }
  2678. if (compressOutput&&!replicate&&!copySource)
  2679. {
  2680. PROGLOG("Compress output forcing pull");
  2681. options->setPropBool(ANpull, true);
  2682. allowRecovery = false;
  2683. }
  2684. gatherFileSizes(true);
  2685. if (!replicate||copySource) // NB: When copySource=true, analyseFileHeaders mainly just sets srcFormat.type
  2686. analyseFileHeaders(!copySource); // if pretending replicate don't want to remove headers
  2687. afterGatherFileSizes();
  2688. if (compressOutput && !usePullOperation() && !replicate && !copySource)
  2689. throwError(DFTERR_CannotPushAndCompress);
  2690. if (restorePartition())
  2691. {
  2692. LOG(MCdebugProgress, job, "Partition restored from recovery information");
  2693. }
  2694. else
  2695. {
  2696. LOG(MCdebugProgress, job, "Calculate partition information");
  2697. if (replicate || copySource)
  2698. calculateOne2OnePartition();
  2699. else if (!allowSplit())
  2700. calculateNoSplitPartition();
  2701. else if (splitPrefix && *splitPrefix)
  2702. calculateSplitPrefixPartition(splitPrefix);
  2703. else if ((targets.ordinality() == 1) && srcFormat.equals(tgtFormat))
  2704. calculateMany2OnePartition();
  2705. else
  2706. calculateSprayPartition();
  2707. if (partition.ordinality() > PARTITION_RECOVERY_LIMIT)
  2708. allowRecovery = false;
  2709. savePartition();
  2710. }
  2711. assignPartitionFilenames(); // assign source filenames - used in insertHeaders..
  2712. if (!replicate && !copySource)
  2713. {
  2714. LOG(MCdebugProgress, job, "Insert headers");
  2715. insertHeaders();
  2716. }
  2717. addEmptyFilesToPartition();
  2718. derivePartitionExtra();
  2719. if (partition.ordinality() < 1000)
  2720. displayPartition();
  2721. if (isRecovering)
  2722. displayProgress(progress);
  2723. throwExceptionIfAborting();
  2724. beforeTransfer();
  2725. if (usePushWholeOperation())
  2726. pushWholeParts();
  2727. else if (usePullOperation())
  2728. pullParts();
  2729. else
  2730. pushParts();
  2731. afterTransfer();
  2732. //If got here then we have succeeded
  2733. updateTargetProperties();
  2734. //Calculate and store file access cost
  2735. double fileAccessCost = 0.0;
  2736. if (distributedTarget)
  2737. {
  2738. StringBuffer cluster;
  2739. distributedTarget->getClusterName(0, cluster);
  2740. if (!cluster.isEmpty())
  2741. fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
  2742. }
  2743. if (distributedSource && distributedSource->querySuperFile()==nullptr)
  2744. {
  2745. StringBuffer cluster;
  2746. distributedSource->getClusterName(0, cluster);
  2747. if (!cluster.isEmpty())
  2748. fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
  2749. }
  2750. progressReport->setFileAccessCost(fileAccessCost);
  2751. StringBuffer copyEventText; // [logical-source] > [logical-target]
  2752. if (distributedSource)
  2753. copyEventText.append(distributedSource->queryLogicalName());
  2754. copyEventText.append(">");
  2755. if (distributedTarget && distributedTarget->queryLogicalName())
  2756. copyEventText.append(distributedTarget->queryLogicalName());
  2757. //MORE: use new interface to send 'file copied' event
  2758. //LOG(MCevent, unknownJob, EVENT_FILECOPIED, copyEventText.str());
  2759. cleanupRecovery();
  2760. }
  2761. bool FileSprayer::isSameSizeHeaderFooter()
  2762. {
  2763. bool retVal = true;
  2764. if (sources.ordinality() == 0)
  2765. return retVal;
  2766. unsigned whichHeaderInput = 0;
  2767. headerSize = sources.item(whichHeaderInput).xmlHeaderLength;
  2768. footerSize = sources.item(whichHeaderInput).xmlFooterLength;
  2769. ForEachItemIn(idx, partition)
  2770. {
  2771. PartitionPoint & cur = partition.item(idx);
  2772. if (cur.inputLength && (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput))
  2773. {
  2774. if (headerSize != sources.item(whichHeaderInput).xmlHeaderLength)
  2775. {
  2776. retVal = false;
  2777. break;
  2778. }
  2779. if (footerSize != sources.item(cur.whichInput).xmlFooterLength)
  2780. {
  2781. retVal = false;
  2782. break;
  2783. }
  2784. if ( idx+1 != partition.ordinality() )
  2785. whichHeaderInput = partition.item(idx+1).whichInput;
  2786. }
  2787. }
  2788. return retVal;
  2789. }
  2790. void FileSprayer::updateTargetProperties()
  2791. {
  2792. TimeSection timer("FileSprayer::updateTargetProperties() time");
  2793. Owned<IException> error;
  2794. if (distributedTarget)
  2795. {
  2796. StringBuffer failedParts;
  2797. CRC32Merger partCRC;
  2798. offset_t partLength = 0;
  2799. CRC32Merger totalCRC;
  2800. offset_t totalLength = 0;
  2801. offset_t totalCompressedSize = 0;
  2802. unsigned whichHeaderInput = 0;
  2803. bool sameSizeHeaderFooter = isSameSizeHeaderFooter();
  2804. bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts());
  2805. offset_t partCompressedLength = 0;
  2806. ForEachItemIn(idx, partition)
  2807. {
  2808. PartitionPoint & cur = partition.item(idx);
  2809. OutputProgress & curProgress = progress.item(idx);
  2810. partCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
  2811. totalCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
  2812. if (copyCompressed && sameSizeSourceTarget) {
  2813. FilePartInfo & curSource = sources.item(cur.whichInput);
  2814. partLength = curSource.size;
  2815. totalLength += partLength;
  2816. }
  2817. else {
  2818. partLength += curProgress.outputLength; // AFAICS this might as well be =
  2819. totalLength += curProgress.outputLength;
  2820. }
  2821. if (compressOutput)
  2822. partCompressedLength += curProgress.compressedPartSize;
  2823. if (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput)
  2824. {
  2825. Owned<IDistributedFilePart> curPart = distributedTarget->getPart(cur.whichOutput);
  2826. // TODO: Create DistributedFilePropertyLock for parts
  2827. curPart->lockProperties();
  2828. IPropertyTree& curProps = curPart->queryAttributes();
  2829. if (!sameSizeHeaderFooter)
  2830. {
  2831. FilePartInfo & curHeaderSource = sources.item(whichHeaderInput);
  2832. curProps.setPropInt64(FPheaderLength, curHeaderSource.xmlHeaderLength);
  2833. FilePartInfo & curFooterSource = sources.item(cur.whichInput);
  2834. curProps.setPropInt64(FPfooterLength, curFooterSource.xmlFooterLength);
  2835. if ( idx+1 != partition.ordinality() )
  2836. whichHeaderInput = partition.item(idx+1).whichInput;
  2837. }
  2838. if (calcCRC())
  2839. {
  2840. curProps.setPropInt(FAcrc, partCRC.get());
  2841. if (cur.whichInput != (unsigned)-1)
  2842. {
  2843. FilePartInfo & curSource = sources.item(cur.whichInput);
  2844. if (replicate && curSource.hasCRC)
  2845. {
  2846. if ((partCRC.get() != curSource.crc)&&(compressedInput==compressOutput)) // if expanding will be different!
  2847. {
  2848. if (failedParts.length())
  2849. failedParts.append(", ");
  2850. else
  2851. failedParts.append("Output CRC failed to match expected: ");
  2852. curSource.filename.getPath(failedParts);
  2853. failedParts.appendf("(%x,%x)",partCRC.get(),curSource.crc);
  2854. }
  2855. }
  2856. }
  2857. }
  2858. else if (compressOutput || copyCompressed)
  2859. curProps.setPropInt(FAcrc, (int)COMPRESSEDFILECRC);
  2860. curProps.setPropInt64(FAsize, partLength);
  2861. if (compressOutput)
  2862. {
  2863. curProps.setPropInt64(FAcompressedSize, partCompressedLength);
  2864. totalCompressedSize += partCompressedLength;
  2865. } else if (copyCompressed)
  2866. {
  2867. curProps.setPropInt64(FAcompressedSize, curProgress.outputLength);
  2868. totalCompressedSize += curProgress.outputLength;
  2869. }
  2870. TargetLocation & curTarget = targets.item(cur.whichOutput);
  2871. if (!curTarget.modifiedTime.isNull())
  2872. {
  2873. CDateTime temp;
  2874. StringBuffer timestr;
  2875. temp.set(curTarget.modifiedTime);
  2876. unsigned hour, min, sec, nanosec;
  2877. temp.getTime(hour, min, sec, nanosec);
  2878. temp.setTime(hour, min, sec, 0);
  2879. curProps.setProp("@modified", temp.getString(timestr).str());
  2880. }
  2881. if ((distributedSource != distributedTarget) && (cur.whichInput != (unsigned)-1))
  2882. {
  2883. FilePartInfo & curSource = sources.item(cur.whichInput);
  2884. if (curSource.properties)
  2885. {
  2886. Owned<IAttributeIterator> aiter = curSource.properties->getAttributes();
  2887. ForEach(*aiter)
  2888. {
  2889. const char *aname = aiter->queryName();
  2890. if ( !( strieq(aname,"@fileCrc") ||
  2891. strieq(aname,"@modified") ||
  2892. strieq(aname,"@node") ||
  2893. strieq(aname,"@num") ||
  2894. strieq(aname,"@size") ||
  2895. strieq(aname,"@name") ) ||
  2896. ( strieq(aname,"@recordCount") && (sources.ordinality() == targets.ordinality()) )
  2897. )
  2898. curProps.setProp(aname,aiter->queryValue());
  2899. }
  2900. }
  2901. }
  2902. curPart->unlockProperties();
  2903. partCRC.clear();
  2904. partLength = 0;
  2905. partCompressedLength = 0;
  2906. }
  2907. }
  2908. if (failedParts.length())
  2909. error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));
  2910. DistributedFilePropertyLock lock(distributedTarget);
  2911. IPropertyTree &curProps = lock.queryAttributes();
  2912. curProps.setPropInt64("@numDiskWrites", totalNumWrites);
  2913. if (calcCRC())
  2914. curProps.setPropInt(FAcrc, totalCRC.get());
  2915. curProps.setPropInt64(FAsize, totalLength);
  2916. if (totalCompressedSize != 0)
  2917. curProps.setPropInt64(FAcompressedSize, totalCompressedSize);
  2918. unsigned rs = curProps.getPropInt(FArecordSize); // set by user
  2919. bool gotrc = false;
  2920. if (rs && (totalLength%rs == 0)) {
  2921. curProps.setPropInt64(FArecordCount,totalLength/(offset_t)rs);
  2922. gotrc = true;
  2923. }
  2924. if (sameSizeHeaderFooter && ((srcFormat.markup == FMTjson ) || (srcFormat.markup == FMTxml)))
  2925. {
  2926. curProps.setPropInt64(FPheaderLength, headerSize);
  2927. curProps.setPropInt64(FPfooterLength, footerSize);
  2928. }
  2929. if (srcAttr.get() && !mirroring) {
  2930. StringBuffer s;
  2931. // copy some attributes (do as iterator in case we want to change to *exclude* some
  2932. Owned<IAttributeIterator> aiter = srcAttr->getAttributes();
  2933. ForEach(*aiter) {
  2934. const char *aname = aiter->queryName();
  2935. if (!curProps.hasProp(aname)&&
  2936. ((stricmp(aname,"@job")==0)||
  2937. (stricmp(aname,"@workunit")==0)||
  2938. (stricmp(aname,"@description")==0)||
  2939. (stricmp(aname,"@eclCRC")==0)||
  2940. (stricmp(aname,"@formatCrc")==0)||
  2941. (stricmp(aname,"@owner")==0)||
  2942. ((stricmp(aname,FArecordCount)==0)&&!gotrc) ||
  2943. ((stricmp(aname,"@blockCompressed")==0)&&copyCompressed) ||
  2944. ((stricmp(aname,"@rowCompressed")==0)&&copyCompressed)||
  2945. (stricmp(aname,"@local")==0)||
  2946. (stricmp(aname,"@recordCount")==0)
  2947. )
  2948. )
  2949. curProps.setProp(aname,aiter->queryValue());
  2950. }
  2951. // Keep source kind
  2952. if (srcAttr->hasProp(FPkind))
  2953. {
  2954. curProps.setProp(FPkind, srcAttr->queryProp(FPkind));
  2955. if (srcAttr->hasProp(FPformat))
  2956. curProps.setProp(FPformat, srcAttr->queryProp(FPformat));
  2957. }
  2958. else
  2959. {
  2960. const char * targetKind = nullptr;
  2961. if (tgtFormat.markup == FMTxml)
  2962. targetKind = "xml";
  2963. else if (tgtFormat.markup == FMTjson)
  2964. targetKind = "json";
  2965. const char * targetFormat = nullptr;
  2966. switch (tgtFormat.type)
  2967. {
  2968. case FFTfixed:
  2969. case FFTvariable:
  2970. case FFTblocked:
  2971. targetKind = "flat";
  2972. break;
  2973. case FFTcsv:
  2974. targetKind = "csv";
  2975. break;
  2976. case FFTutf:
  2977. targetFormat = "utf8n";
  2978. break;
  2979. case FFTutf8:
  2980. targetFormat = "utf8";
  2981. break;
  2982. case FFTutf16:
  2983. targetFormat = "utf16";
  2984. break;
  2985. case FFTutf16be:
  2986. targetFormat = "utf16be";
  2987. break;
  2988. case FFTutf16le:
  2989. targetFormat = "utf16le";
  2990. break;
  2991. case FFTutf32:
  2992. targetFormat = "utf32";
  2993. break;
  2994. case FFTutf32be:
  2995. targetFormat = "utf32be";
  2996. break;
  2997. case FFTutf32le:
  2998. targetFormat = "utf32le";
  2999. break;
  3000. case FFTrecfmvb:
  3001. targetFormat = "recfmvb";
  3002. break;
  3003. case FFTrecfmv:
  3004. targetFormat = "recfmv";
  3005. break;
  3006. case FFTvariablebigendian:
  3007. targetFormat = "variablebigendian";
  3008. break;
  3009. }
  3010. if (targetKind)
  3011. curProps.setProp(FPkind, targetKind);
  3012. if (targetFormat)
  3013. curProps.setProp(FPformat, targetFormat);
  3014. }
  3015. // and simple (top level) elements
  3016. // History copied as well
  3017. Owned<IPropertyTreeIterator> iter = srcAttr->getElements("*");
  3018. ForEach(*iter)
  3019. {
  3020. const char *aname = iter->query().queryName();
  3021. if (stricmp(aname, "Protect") != 0)
  3022. curProps.addPropTree(aname, createPTreeFromIPT(&iter->query()));
  3023. }
  3024. //
  3025. // Add new History record
  3026. //
  3027. IPropertyTree * curHistory = curProps.queryPropTree("History");
  3028. // If there wasn't previous History (like Spray/Import)
  3029. if (!curHistory)
  3030. curHistory = curProps.setPropTree("History", createPTree());
  3031. // Add new record about this operation
  3032. Owned<IPropertyTree> newRecord = createPTree();
  3033. CDateTime temp;
  3034. temp.setNow();
  3035. unsigned hour, min, sec, nanosec;
  3036. temp.getTime(hour, min, sec, nanosec);
  3037. temp.setTime(hour, min, sec, 0);
  3038. StringBuffer timestr;
  3039. newRecord->setProp("@timestamp",temp.getString(timestr).str());
  3040. newRecord->setProp("@owner", srcAttr->queryProp("@owner"));
  3041. if (srcAttr->hasProp("@workunit"))
  3042. newRecord->setProp("@workunit", srcAttr->queryProp("@workunit"));
  3043. newRecord->setProp("@operation", getOperationTypeString());
  3044. // In Spray case there is not distributedSource
  3045. if (distributedSource)
  3046. {
  3047. // add original file name from a single distributed source (like Copy)
  3048. if (distributedSource->numParts())
  3049. {
  3050. RemoteFilename remoteFile;
  3051. distributedSource->queryPart(0).getFilename(remoteFile, 0);
  3052. splitAndCollectFileInfo(newRecord, remoteFile);
  3053. }
  3054. }
  3055. else if (sources.ordinality())
  3056. {
  3057. FilePartInfo & firstSource = sources.item((aindex_t)0);
  3058. RemoteFilename &remoteFile = firstSource.filename;
  3059. splitAndCollectFileInfo(newRecord, remoteFile, false);
  3060. }
  3061. curHistory->addPropTree("Origin",newRecord.getClear());
  3062. }
  3063. int expireDays = options->getPropInt("@expireDays", -1);
  3064. if (expireDays != -1)
  3065. curProps.setPropInt("@expireDays", expireDays);
  3066. }
  3067. if (distributedSource)
  3068. {
  3069. if (distributedSource->querySuperFile()==nullptr)
  3070. distributedSource->addAttrValue("@numDiskReads", totalNumReads);
  3071. }
  3072. if (error)
  3073. throw error.getClear();
  3074. }
  3075. void FileSprayer::splitAndCollectFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
  3076. bool isDistributedSource)
  3077. {
  3078. StringBuffer drive;
  3079. StringBuffer path;
  3080. StringBuffer tail;
  3081. StringBuffer ext;
  3082. remoteFileName.split(&drive, &path, &tail, &ext);
  3083. if (drive.isEmpty())
  3084. {
  3085. remoteFileName.queryIP().getIpText(drive.clear());
  3086. newRecord->setProp("@ip", drive.str());
  3087. }
  3088. else
  3089. newRecord->setProp("@drive", drive.str());
  3090. newRecord->setProp("@path", path.str());
  3091. // We don't want to store distributed file parts name extension
  3092. if (!isDistributedSource && ext.length())
  3093. tail.append(ext);
  3094. if (sources.ordinality()>1)
  3095. newRecord->setProp("@name", "[MULTI]");
  3096. else
  3097. newRecord->setProp("@name", tail.str());
  3098. }
  3099. void FileSprayer::setOperation(dfu_operation op)
  3100. {
  3101. operation = op;
  3102. }
  3103. dfu_operation FileSprayer::getOperation() const
  3104. {
  3105. return operation;
  3106. }
  3107. const char * FileSprayer::getOperationTypeString() const
  3108. {
  3109. return DfuOperationStr[operation];
  3110. }
  3111. bool FileSprayer::usePullOperation() const
  3112. {
  3113. if (!calcedPullPush)
  3114. {
  3115. calcedPullPush = true;
  3116. cachedUsePull = calcUsePull();
  3117. }
  3118. return cachedUsePull;
  3119. }
  3120. bool FileSprayer::usePushOperation() const
  3121. {
  3122. return !usePullOperation() && !usePushWholeOperation();
  3123. }
  3124. bool FileSprayer::canLocateSlaveForNode(const IpAddress &ip) const
  3125. {
  3126. try
  3127. {
  3128. StringBuffer ret;
  3129. querySlaveExecutable(ip, ret);
  3130. return true;
  3131. }
  3132. catch (IException * e)
  3133. {
  3134. e->Release();
  3135. }
  3136. return false;
  3137. }
  3138. bool FileSprayer::calcUsePull() const
  3139. {
  3140. if (allowRecovery && progressTree->hasProp(ANpull))
  3141. {
  3142. bool usePull = progressTree->getPropBool(ANpull);
  3143. LOG(MCdebugInfo, job, "Pull = %d from recovery", (int)usePull);
  3144. return usePull;
  3145. }
  3146. if (sources.ordinality() == 0)
  3147. return true;
  3148. if (options->getPropBool(ANpull, false))
  3149. {
  3150. LOG(MCdebugInfo, job, "Use pull since explicitly specified");
  3151. return true;
  3152. }
  3153. if (options->getPropBool(ANpush, false))
  3154. {
  3155. LOG(MCdebugInfo, job, "Use push since explicitly specified");
  3156. return false;
  3157. }
  3158. ForEachItemIn(idx2, sources)
  3159. {
  3160. if (!sources.item(idx2).canPush())
  3161. {
  3162. StringBuffer s;
  3163. sources.item(idx2).filename.queryIP().getIpText(s);
  3164. LOG(MCdebugInfo, job, "Use pull operation because %s cannot push", s.str());
  3165. return true;
  3166. }
  3167. }
  3168. if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
  3169. {
  3170. StringBuffer s;
  3171. sources.item(0).filename.queryIP().getIpText(s);
  3172. LOG(MCdebugInfo, job, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
  3173. return true;
  3174. }
  3175. ForEachItemIn(idx, targets)
  3176. {
  3177. if (!targets.item(idx).canPull())
  3178. {
  3179. StringBuffer s;
  3180. targets.item(idx).queryIP().getIpText(s);
  3181. LOG(MCdebugInfo, job, "Use push operation because %s cannot pull", s.str());
  3182. return false;
  3183. }
  3184. }
  3185. if (!canLocateSlaveForNode(targets.item(0).queryIP()))
  3186. {
  3187. StringBuffer s;
  3188. targets.item(0).queryIP().getIpText(s);
  3189. LOG(MCdebugInfo, job, "Use push operation because %s doesn't appear to have an ftslave", s.str());
  3190. return false;
  3191. }
  3192. //Use push if going to a single node.
  3193. if ((targets.ordinality() == 1) && (sources.ordinality() > 1))
  3194. {
  3195. LOG(MCdebugInfo, job, "Use push operation because going to a single node from many");
  3196. return false;
  3197. }
  3198. LOG(MCdebugInfo, job, "Use pull operation as default");
  3199. return true;
  3200. }
  3201. extern DALIFT_API IFileSprayer * createFileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * recoveryConnection, const char *wuid)
  3202. {
  3203. return new FileSprayer(_options, _progress, recoveryConnection, wuid);
  3204. }
  3205. /*
  3206. Parameters:
  3207. 1. A list of target locations (machine+drive?) (and possibly a number for each)
  3208. 2. A list of source locations [derived from logical file]
  3209. 3. Information on the source and target formats
  3210. 3. A mask for the parts that need to be copied. [recovery is special case of this]
  3211. Need to
  3212. a) Start servers on machines that cannot be accessed directly [have to be running anyway]
  3213. b) Work out how the file is going to be partioned
  3214. 1. Find out the sizes of all the files.
  3215. 2. Calculation partion points -
  3216. For each source file pass [thisoffset, totalsize, thissize, startPoint?], and returns a list of
  3217. numbered partion points.
  3218. Two calls: calcPartion() and retreivePartion() to allow for multithreading on variable length.
  3219. A. If variable length
  3220. Start servers on each of the source machines
  3221. Query each server for partion information (which walks file).
  3222. * If N->N copy don't need to calculate the partion, can do it one a 1:1 mapping.
  3223. E.g. copy variable to blocked format with one block per variable.
  3224. c) Save partion information for quick/consistent recovery
  3225. d) Start servers on each of the targets or source for push to non-accessible
  3226. e) Start pulling/pushing
  3227. Each saves flag when complete for recovery
  3228. */
  3229. //----------------------------------------------------------------------------
  3230. void testPartitions()
  3231. {
  3232. unsigned sizes[] = { 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  3233. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  3234. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  3235. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  3236. 10,
  3237. };
  3238. unsigned parts = _elements_in(sizes);
  3239. unsigned offsets[_elements_in(sizes)];
  3240. unsigned targetParts = 20;
  3241. unsigned recordSize = 20;
  3242. unsigned totalSize =0;
  3243. unsigned idx;
  3244. for (idx = 0; idx < parts; idx++)
  3245. {
  3246. offsets[idx] = totalSize;
  3247. totalSize += sizes[idx];
  3248. }
  3249. PartitionPointArray results;
  3250. for (idx = 0; idx < parts; idx++)
  3251. {
  3252. CFixedPartitioner partitioner(recordSize);
  3253. partitioner.setPartitionRange(totalSize, offsets[idx], sizes[idx], 0, targetParts);
  3254. partitioner.calcPartitions(NULL);
  3255. partitioner.getResults(results);
  3256. }
  3257. ForEachItemIn(idx2, results)
  3258. results.item(idx2).display();
  3259. }
  3260. /*
  3261. MORE:
  3262. * What about multiple parts for a source file - what should we do with them?
  3263. Ignore? Try if
  3264. * Pushing - how do we manage it?
  3265. A. Copy all at once.
  3266. 1. For simple non-translation easy to copy all at once.
  3267. 2. For others, could hook up a translator so it only calculates the target size.
  3268. Problem is it is a reasonably complex interaction with the partitioner.
  3269. Easier to implement, but not as efficient, as a separate pass.
  3270. - Optimize for variable to VBX.
  3271. B. Copy a chunk at a time
  3272. 1. The first source for each chunk write in parallel, followed by the next.
  3273. - okay if not all writing to a single large file.
  3274. * Unreachable machines
  3275. 1. Can I use message passing?
  3276. 2. Mock up + test code [ need multi threading access ].
  3277. 3. Implement an exists primitive.
  3278. 4. How do I distinguish machines?
  3279. * Main program needs to survive if slave nodes die.
  3280. * Asynchronus calls + avoiding the thread switching for notifications?
  3281. * Code for replicating parts
  3282. - set up as a copy from fixed1 to fixed1, which partition matching sources.
  3283. */