filecopy.cpp 98 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include <algorithm>
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include <math.h>
  19. #include "jmutex.hpp"
  20. #include "jfile.hpp"
  21. #include "jsocket.hpp"
  22. #include "jdebug.hpp"
  23. #include "fterror.hpp"
  24. #include "dadfs.hpp"
  25. #include "rmtspawn.hpp"
  26. #include "filecopy.ipp"
  27. #include "jptree.hpp"
  28. #include "daft.hpp"
  29. #include "daftcfg.hpp"
  30. #include "fterror.hpp"
  31. #include "daftformat.hpp"
  32. #include "daftmc.hpp"
  33. #include "dasds.hpp"
  34. #include "jlog.hpp"
  35. #include "dalienv.hpp"
  36. #define DEFAULT_MAX_CONNECTIONS 25
  37. #define PARTITION_RECOVERY_LIMIT 1000
  38. #define EXPECTED_RESPONSE_TIME (60 * 1000)
  39. #define RESPONSE_TIME_TIMEOUT (60 * 60 * 1000)
  40. //#define CLEANUP_RECOVERY
  41. //Use hash defines for properties so I can't mis-spell them....
  42. #define ANcomplete "@complete"
  43. #define ANcompress "@compress"
  44. #define ANcrc "@crc"
  45. #define ANcrcCheck "@crcCheck"
  46. #define ANcrcDiffers "@crcDiffers"
  47. #define ANdone "@done"
  48. #define ANhasPartition "@hasPartition"
  49. #define ANhasProgress "@hasProgress"
  50. #define ANhasRecovery "@hasRecovery"
  51. #define ANmaxConnections "@maxConnections"
  52. #define ANnocommon "@noCommon"
  53. #define ANnoRecover "@noRecover"
  54. #define ANnosplit "@nosplit"
  55. #define ANnosplit2 "@noSplit"
  56. #define ANprefix "@prefix"
  57. #define ANpull "@pull"
  58. #define ANpush "@push"
  59. #define ANsafe "@safe"
  60. #define ANsizedate "@sizedate"
  61. #define ANsplit "@split"
  62. #define ANsplitPrefix "@splitPrefix"
  63. #define ANthrottle "@throttle"
  64. #define ANverify "@verify"
  65. #define ANtransferBufferSize "@transferBufferSize"
  66. #define ANencryptKey "@encryptKey"
  67. #define ANdecryptKey "@decryptKey"
  68. #define PNpartition "partition"
  69. #define PNprogress "progress"
  70. //File attributes
  71. #define FArecordSize "@recordSize"
  72. #define FArecordCount "@recordCount"
  73. #define FAformat "@format"
  74. #define FAcrc "@fileCrc"
  75. #define FAsize "@size"
  76. #define FAcompressedSize "@compressedSize"
  77. const unsigned operatorUpdateFrequency = 5000; // time between updates in ms
  78. const unsigned abortCheckFrequency = 20000; // time between updates in ms
  79. const unsigned sdsUpdateFrequency = 20000; // time between updates in ms
  80. const unsigned maxSlaveUpdateFrequency = 1000; // time between updates in ms - small number of nodes.
  81. const unsigned minSlaveUpdateFrequency = 5000; // time between updates in ms - large number of nodes.
  82. bool TargetLocation::canPull()
  83. {
  84. return queryOS(filename.queryIP()) != MachineOsSolaris;
  85. }
  86. //----------------------------------------------------------------------------
  87. FilePartInfo::FilePartInfo(const RemoteFilename & _filename)
  88. {
  89. filename.set(_filename);
  90. init();
  91. }
  92. FilePartInfo::FilePartInfo()
  93. {
  94. init();
  95. }
  96. bool FilePartInfo::canPush()
  97. {
  98. return queryOS(filename.queryIP()) != MachineOsSolaris;
  99. }
  100. void FilePartInfo::extractExtra(IPartDescriptor &part)
  101. {
  102. unsigned _crc;
  103. hasCRC = part.getCrc(_crc);
  104. if (hasCRC)
  105. crc = _crc;
  106. properties.set(&part.queryProperties());
  107. if (part.queryProperties().hasProp("@modified"))
  108. modifiedTime.setString(part.queryProperties().queryProp("@modified"));
  109. }
  110. void FilePartInfo::extractExtra(IDistributedFilePart &part)
  111. {
  112. unsigned _crc;
  113. hasCRC = part.getCrc(_crc);
  114. if (hasCRC)
  115. crc = _crc;
  116. properties.set(&part.queryAttributes());
  117. if (part.queryAttributes().hasProp("@modified"))
  118. modifiedTime.setString(part.queryAttributes().queryProp("@modified"));
  119. }
  120. void FilePartInfo::init()
  121. {
  122. offset = 0;
  123. size = UNKNOWN_PART_SIZE;
  124. psize = UNKNOWN_PART_SIZE;
  125. headerSize = 0;
  126. hasCRC = false;
  127. xmlHeaderLength = 0;
  128. xmlFooterLength = 0;
  129. }
  130. //----------------------------------------------------------------------------
  131. void shuffle(CIArray & array)
  132. {
  133. //Use our own seeded random number generator, so that multiple dfu at the same time are less likely to clash.
  134. Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
  135. random->seed(123456789);
  136. unsigned i = array.ordinality();
  137. while (i > 1)
  138. {
  139. unsigned j = random->next() % i;
  140. i--;
  141. array.swap(i, j);
  142. }
  143. }
  144. //----------------------------------------------------------------------------
  145. FileTransferThread::FileTransferThread(FileSprayer & _sprayer, byte _action, const SocketEndpoint & _ep, bool _calcCRC, const char *_wuid)
  146. : Thread("pullThread"), sprayer(_sprayer), wuid(_wuid)
  147. {
  148. calcCRC = _calcCRC;
  149. action = _action;
  150. ep.set(_ep);
  151. // progressInfo = _progressInfo;
  152. sem = NULL;
  153. ok = false;
  154. job = unknownJob;
  155. allDone = false;
  156. started = false;
  157. }
  158. void FileTransferThread::addPartition(PartitionPoint & nextPartition, OutputProgress & nextProgress)
  159. {
  160. partition.append(OLINK(nextPartition));
  161. progress.append(OLINK(nextProgress));
  162. passwordProvider.addPasswordForIp(nextPartition.inputName.queryIP());
  163. passwordProvider.addPasswordForIp(nextPartition.outputName.queryIP());
  164. }
  165. unsigned __int64 FileTransferThread::getInputSize()
  166. {
  167. unsigned __int64 inputSize = 0;
  168. ForEachItemIn(idx, partition)
  169. inputSize += partition.item(idx).inputLength;
  170. return inputSize;
  171. }
  172. void FileTransferThread::go(Semaphore & _sem)
  173. {
  174. sem = &_sem;
  175. if (partition.empty())
  176. transferAndSignal(); // do nothing, but don't start a new thread
  177. else
  178. {
  179. #ifdef RUN_SLAVES_ON_THREADS
  180. start();
  181. #else
  182. transferAndSignal();
  183. #endif
  184. }
  185. }
  186. bool FileTransferThread::isAborting()
  187. {
  188. return sprayer.isAborting() || ::isAborting();
  189. }
  190. void FileTransferThread::logIfRunning(StringBuffer &list)
  191. {
  192. if (started && !allDone && !error)
  193. {
  194. StringBuffer url;
  195. ep.getUrlStr(url);
  196. LOG(MCwarning, unknownJob, "Still waiting for slave %s", url.str());
  197. if (list.length())
  198. list.append(',');
  199. list.append(url);
  200. }
  201. }
  202. bool FileTransferThread::catchReadBuffer(ISocket * socket, MemoryBuffer & msg, unsigned timeout)
  203. {
  204. unsigned nowTime = msTick();
  205. unsigned abortCheckTimeout = 120*1000;
  206. loop
  207. {
  208. try
  209. {
  210. readBuffer(socket, msg, abortCheckTimeout);
  211. return true;
  212. }
  213. catch (IException * e)
  214. {
  215. switch (e->errorCode())
  216. {
  217. case JSOCKERR_graceful_close:
  218. break;
  219. case JSOCKERR_timeout_expired:
  220. if (isAborting())
  221. break;
  222. if (msTick() - nowTime < timeout)
  223. {
  224. e->Release();
  225. continue;
  226. }
  227. break;
  228. default:
  229. EXCLOG(e,"FileTransferThread::catchReadBuffer");
  230. break;
  231. }
  232. e->Release();
  233. return false;
  234. }
  235. }
  236. }
  237. bool FileTransferThread::performTransfer()
  238. {
  239. bool ok = false;
  240. StringBuffer url;
  241. ep.getUrlStr(url);
  242. LOG(MCdebugProgress, job, "Transferring part %s [%p]", url.str(), this);
  243. started = true;
  244. allDone = true;
  245. if (sprayer.isSafeMode || action == FTactionpush)
  246. {
  247. ForEachItemIn(i, progress)
  248. {
  249. if (progress.item(i).status != OutputProgress::StatusCopied)
  250. allDone = false;
  251. }
  252. }
  253. else
  254. {
  255. unsigned whichOutput = (unsigned)-1;
  256. ForEachItemIn(i, progress)
  257. {
  258. PartitionPoint & curPartition = partition.item(i);
  259. OutputProgress & curProgress = progress.item(i);
  260. //pull should rename as well as copy the files.
  261. if (curPartition.whichOutput != whichOutput)
  262. {
  263. if (curProgress.status != OutputProgress::StatusRenamed)
  264. allDone = false;
  265. whichOutput = curPartition.whichOutput;
  266. }
  267. }
  268. }
  269. if (allDone)
  270. {
  271. LOG(MCdebugInfo, job, "Creation of part %s already completed", url.str());
  272. return true;
  273. }
  274. if (partition.empty())
  275. {
  276. LOG(MCdebugInfo, job, "No elements to transfer for this slave");
  277. return true;
  278. }
  279. LOG(MCdebugProgressDetail, job, "Start generate part %s [%p]", url.str(), this);
  280. StringBuffer tmp;
  281. Owned<ISocket> socket = spawnRemoteChild(SPAWNdfu, sprayer.querySlaveExecutable(ep, tmp), ep, DAFT_VERSION, queryFtSlaveLogDir(), this, wuid);
  282. if (socket)
  283. {
  284. MemoryBuffer msg;
  285. msg.setEndian(__BIG_ENDIAN);
  286. //MORE: Allow this to be configured by an option.
  287. unsigned slaveUpdateFrequency = minSlaveUpdateFrequency;
  288. if (sprayer.numParallelSlaves() < 5)
  289. slaveUpdateFrequency = maxSlaveUpdateFrequency;
  290. //Send message and wait for response...
  291. msg.append(action);
  292. passwordProvider.serialize(msg);
  293. ep.serialize(msg);
  294. sprayer.srcFormat.serialize(msg);
  295. sprayer.tgtFormat.serialize(msg);
  296. msg.append(sprayer.calcInputCRC());
  297. msg.append(calcCRC);
  298. serialize(partition, msg);
  299. msg.append(sprayer.numParallelSlaves());
  300. msg.append(slaveUpdateFrequency);
  301. msg.append(sprayer.replicate);
  302. msg.append(sprayer.mirroring);
  303. msg.append(sprayer.isSafeMode);
  304. msg.append(progress.ordinality());
  305. ForEachItemIn(i, progress)
  306. progress.item(i).serialize(msg);
  307. msg.append(sprayer.throttleNicSpeed);
  308. msg.append(sprayer.compressedInput);
  309. msg.append(sprayer.compressOutput);
  310. msg.append(sprayer.copyCompressed);
  311. msg.append(sprayer.transferBufferSize);
  312. msg.append(sprayer.encryptKey);
  313. msg.append(sprayer.decryptKey);
  314. sprayer.srcFormat.serializeExtra(msg, 1);
  315. sprayer.tgtFormat.serializeExtra(msg, 1);
  316. if (!catchWriteBuffer(socket, msg))
  317. throwError1(RFSERR_TimeoutWaitConnect, url.str());
  318. bool done;
  319. loop
  320. {
  321. msg.clear();
  322. if (!catchReadBuffer(socket, msg, FTTIME_PROGRESS))
  323. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  324. msg.setEndian(__BIG_ENDIAN);
  325. msg.read(done);
  326. if (done)
  327. break;
  328. OutputProgress newProgress;
  329. newProgress.deserialize(msg);
  330. sprayer.updateProgress(newProgress);
  331. LOG(MCdebugProgress(10000), job, "Update %s: %d %"I64F"d->%"I64F"d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);
  332. if (isAborting())
  333. {
  334. if (!sendRemoteAbort(socket))
  335. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  336. }
  337. }
  338. msg.read(ok);
  339. setErrorOwn(deserializeException(msg));
  340. LOG(MCdebugProgressDetail, job, "Finished generating part %s [%p] ok(%d) error(%d)", url.str(), this, (int)ok, (int)(error!=NULL));
  341. msg.clear().append(true);
  342. catchWriteBuffer(socket, msg);
  343. if (sprayer.options->getPropInt("@fail", 0))
  344. throwError(DFTERR_CopyFailed);
  345. }
  346. else
  347. {
  348. throwError1(DFTERR_FailedStartSlave, url.str());
  349. }
  350. LOG(MCdebugProgressDetail, job, "Stopped generate part %s [%p]", url.str(), this);
  351. allDone = true;
  352. return ok;
  353. }
  354. void FileTransferThread::setErrorOwn(IException * e)
  355. {
  356. error.setown(e);
  357. if (error)
  358. sprayer.setError(ep, error);
  359. }
  360. bool FileTransferThread::transferAndSignal()
  361. {
  362. ok = false;
  363. if (!isAborting())
  364. {
  365. try
  366. {
  367. ok = performTransfer();
  368. }
  369. catch (IException * e)
  370. {
  371. FLLOG(MCexception(e), job, e, "Transferring files");
  372. setErrorOwn(e);
  373. }
  374. }
  375. sem->signal();
  376. return ok;
  377. }
  378. int FileTransferThread::run()
  379. {
  380. transferAndSignal();
  381. return 0;
  382. }
  383. //----------------------------------------------------------------------------
  384. FileSizeThread::FileSizeThread(FilePartInfoArray & _queue, CriticalSection & _cs, bool _isCompressed, bool _errorIfMissing) : Thread("fileSizeThread"), queue(_queue), cs(_cs)
  385. {
  386. isCompressed = _isCompressed;
  387. errorIfMissing = _errorIfMissing;
  388. }
  389. bool FileSizeThread::wait(unsigned timems)
  390. {
  391. while (!sem.wait(timems)) { // report every time
  392. if (!cur.get())
  393. continue; // window here?
  394. cur->Link();
  395. RemoteFilename *rfn=NULL;
  396. if (copy) {
  397. if (!cur->mirrorFilename.isNull())
  398. rfn = &cur->mirrorFilename;
  399. }
  400. else {
  401. rfn = &cur->filename;
  402. }
  403. if (rfn) {
  404. StringBuffer url;
  405. WARNLOG("Waiting for file: %s",rfn->getRemotePath(url).str());
  406. cur->Release();
  407. return false;
  408. }
  409. cur->Release();
  410. }
  411. sem.signal(); // if called again
  412. return true;
  413. }
  414. int FileSizeThread::run()
  415. {
  416. try
  417. {
  418. RemoteFilename remoteFilename;
  419. loop
  420. {
  421. cur.clear();
  422. cs.enter();
  423. if (queue.ordinality())
  424. cur.setown(&queue.popGet());
  425. cs.leave();
  426. if (!cur.get())
  427. break;
  428. copy=0;
  429. for (copy = 0;copy<2;copy++) {
  430. if (copy) {
  431. if (cur->mirrorFilename.isNull())
  432. continue; // not break
  433. remoteFilename.set(cur->mirrorFilename);
  434. }
  435. else
  436. remoteFilename.set(cur->filename);
  437. OwnedIFile thisFile = createIFile(remoteFilename);
  438. offset_t thisSize = thisFile->size();
  439. if (thisSize == -1) {
  440. if (errorIfMissing) {
  441. StringBuffer s;
  442. throwError1(DFTERR_CouldNotOpenFile, remoteFilename.getRemotePath(s).str());
  443. }
  444. continue;
  445. }
  446. cur->psize = thisSize;
  447. if (isCompressed)
  448. {
  449. Owned<IFileIO> io = createCompressedFileReader(thisFile); //check succeeded?
  450. if (!io) {
  451. StringBuffer s;
  452. throwError1(DFTERR_CouldNotOpenCompressedFile, remoteFilename.getRemotePath(s).str());
  453. }
  454. thisSize = io->size();
  455. }
  456. cur->size = thisSize;
  457. break;
  458. }
  459. if (copy==1) { // need to set primary
  460. cur->mirrorFilename.set(cur->filename);
  461. cur->filename.set(remoteFilename);
  462. }
  463. cur.clear();
  464. }
  465. }
  466. catch (IException * e)
  467. {
  468. error.setown(e);
  469. }
  470. sem.signal();
  471. return 0;
  472. }
  473. //----------------------------------------------------------------------------
  474. FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * _recoveryConnection, const char *_wuid)
  475. : wuid(_wuid)
  476. {
  477. totalSize = 0;
  478. replicate = false;
  479. unknownSourceFormat = true;
  480. unknownTargetFormat = true;
  481. progressTree.set(_progress);
  482. recoveryConnection = _recoveryConnection;
  483. options.set(_options);
  484. if (!options)
  485. options.setown(createPTree());
  486. if (!progressTree)
  487. progressTree.setown(createPTree("progress", ipt_caseInsensitive));
  488. //split prefix messes up recovery because the target filenames aren't saved in the recovery info.
  489. allowRecovery = !options->getPropBool(ANnoRecover) && !querySplitPrefix();
  490. isRecovering = allowRecovery && progressTree->hasProp(ANhasProgress);
  491. isSafeMode = options->getPropBool(ANsafe);
  492. job = unknownJob;
  493. progressReport = NULL;
  494. abortChecker = NULL;
  495. sizeToBeRead = 0;
  496. calcedPullPush = false;
  497. mirroring = false;
  498. lastAbortCheckTick = lastSDSTick = lastOperatorTick = msTick();
  499. calcedInputCRC = false;
  500. aborting = false;
  501. totalLengthRead = 0;
  502. throttleNicSpeed = 0;
  503. compressedInput = false;
  504. compressOutput = options->getPropBool(ANcompress);
  505. copyCompressed = false;
  506. transferBufferSize = options->getPropInt(ANtransferBufferSize);
  507. if (transferBufferSize)
  508. LOG(MCdebugProgressDetail, job, "Using transfer buffer size %d", transferBufferSize);
  509. else // zero is default
  510. transferBufferSize = DEFAULT_STD_BUFFER_SIZE;
  511. progressDone = false;
  512. encryptKey.set(options->queryProp(ANencryptKey));
  513. decryptKey.set(options->queryProp(ANdecryptKey));
  514. }
  515. class AsyncAfterTransfer : public CAsyncFor
  516. {
  517. public:
  518. AsyncAfterTransfer(FileSprayer & _sprayer) : sprayer(_sprayer) {}
  519. virtual void Do(unsigned idxTarget)
  520. {
  521. TargetLocation & cur = sprayer.targets.item(idxTarget);
  522. if (!sprayer.filter || sprayer.filter->includePart(idxTarget))
  523. {
  524. RemoteFilename & targetFilename = cur.filename;
  525. if (sprayer.isSafeMode)
  526. {
  527. OwnedIFile file = createIFile(targetFilename);
  528. file->remove();
  529. }
  530. renameDfuTempToFinal(targetFilename);
  531. if (sprayer.replicate)
  532. {
  533. OwnedIFile file = createIFile(targetFilename);
  534. if (!sprayer.mirroring)
  535. file->setTime(NULL, &cur.modifiedTime, NULL);
  536. }
  537. else if (cur.modifiedTime.isNull())
  538. {
  539. OwnedIFile file = createIFile(targetFilename);
  540. file->getTime(NULL, &cur.modifiedTime, NULL);
  541. }
  542. }
  543. }
  544. protected:
  545. FileSprayer & sprayer;
  546. };
  547. void FileSprayer::addEmptyFilesToPartition(unsigned from, unsigned to)
  548. {
  549. for (unsigned i = from; i < to ; i++)
  550. {
  551. LOG(MCdebugProgressDetail, job, "Insert a dummy entry for target %d", i);
  552. PartitionPoint & next = createLiteral(0, NULL, 0);
  553. next.whichOutput = i;
  554. partition.append(next);
  555. }
  556. }
  557. void FileSprayer::addEmptyFilesToPartition()
  558. {
  559. unsigned lastOutput = (unsigned)-1;;
  560. ForEachItemIn(idx, partition)
  561. {
  562. PartitionPoint & cur = partition.item(idx);
  563. if (cur.whichOutput != lastOutput)
  564. {
  565. if (cur.whichOutput != lastOutput+1)
  566. addEmptyFilesToPartition(lastOutput+1, cur.whichOutput);
  567. lastOutput = cur.whichOutput;
  568. }
  569. }
  570. if (lastOutput != targets.ordinality()-1)
  571. addEmptyFilesToPartition(lastOutput+1, targets.ordinality());
  572. }
  573. void FileSprayer::afterTransfer()
  574. {
  575. if (calcInputCRC())
  576. {
  577. LOG(MCdebugProgressDetail, job, "Checking input CRCs");
  578. CRC32Merger partCRC;
  579. unsigned startCurSource = 0;
  580. ForEachItemIn(idx, partition)
  581. {
  582. PartitionPoint & curPartition = partition.item(idx);
  583. OutputProgress & curProgress = progress.item(idx);
  584. if (!curProgress.hasInputCRC)
  585. {
  586. LOG(MCdebugProgressDetail, job, "Could not calculate input CRCs - cannot check");
  587. break;
  588. }
  589. partCRC.addChildCRC(curProgress.inputLength, curProgress.inputCRC, false);
  590. StringBuffer errorText;
  591. bool failed = false;
  592. UnsignedArray failedOutputs;
  593. if (idx+1 == partition.ordinality() || partition.item(idx+1).whichInput != curPartition.whichInput)
  594. {
  595. FilePartInfo & curSource = sources.item(curPartition.whichInput);
  596. if (curSource.crc != partCRC.get())
  597. {
  598. StringBuffer name;
  599. if (!failed)
  600. errorText.append("Input CRCs do not match for part ");
  601. else
  602. errorText.append(", ");
  603. curSource.filename.getPath(errorText);
  604. failed = true;
  605. //Need to copy anything that involves this part of the file again.
  606. //pulling it will be the whole file, if pushing we can cope with single parts
  607. //in the middle of the partition.
  608. for (unsigned i = startCurSource; i <= idx; i++)
  609. {
  610. OutputProgress & cur = progress.item(i);
  611. cur.reset();
  612. if (cur.tree)
  613. cur.save(cur.tree);
  614. unsigned out = partition.item(i).whichOutput;
  615. if (failedOutputs.find(out) == NotFound)
  616. failedOutputs.append(out);
  617. }
  618. }
  619. partCRC.clear();
  620. startCurSource = idx+1;
  621. //If copying m to n, and not splitting, there may be some dummy text entries (containing nothing) on the end.
  622. //if so skip them, otherwise you'll get crc errors on part 1
  623. if (partition.isItem(startCurSource) && (partition.item(startCurSource).whichInput == 0))
  624. idx = partition.ordinality()-1;
  625. }
  626. if (failed)
  627. {
  628. if (usePullOperation())
  629. {
  630. //Need to clear progress for any partitions that copy to the same target file
  631. //However, need to do it after the crc checking, otherwise it will generate more errors...
  632. ForEachItemIn(idx, partition)
  633. {
  634. if (failedOutputs.find(partition.item(idx).whichOutput) != NotFound)
  635. {
  636. OutputProgress & cur = progress.item(idx);
  637. cur.reset();
  638. if (cur.tree)
  639. cur.save(cur.tree);
  640. }
  641. }
  642. }
  643. if (recoveryConnection)
  644. recoveryConnection->commit();
  645. throw MakeStringException(DFTERR_InputCrcMismatch, "%s", errorText.str());
  646. }
  647. }
  648. }
  649. if (isSafeMode || !usePullOperation())
  650. {
  651. unsigned numTargets = targets.ordinality();
  652. AsyncAfterTransfer async(*this);
  653. async.For(numTargets, (unsigned)sqrt((float)numTargets));
  654. }
  655. else
  656. {
  657. ForEachItemIn(idx, progress)
  658. {
  659. OutputProgress & curProgress = progress.item(idx);
  660. if (!curProgress.resultTime.isNull())
  661. targets.item(partition.item(idx).whichOutput).modifiedTime.set(curProgress.resultTime);
  662. }
  663. }
  664. }
  665. bool FileSprayer::allowSplit()
  666. {
  667. return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
  668. }
  669. void FileSprayer::assignPartitionFilenames()
  670. {
  671. ForEachItemIn(idx, partition)
  672. {
  673. PartitionPoint & cur = partition.item(idx);
  674. if (cur.whichInput != (unsigned)-1)
  675. {
  676. cur.inputName.set(sources.item(cur.whichInput).filename);
  677. setCanAccessDirectly(cur.inputName);
  678. }
  679. cur.outputName.set(targets.item(cur.whichOutput).filename);
  680. setCanAccessDirectly(cur.outputName);
  681. if (replicate)
  682. cur.modifiedTime.set(targets.item(cur.whichOutput).modifiedTime);
  683. }
  684. }
  685. class CheckExists : public CAsyncFor
  686. {
  687. public:
  688. CheckExists(TargetLocationArray & _targets, IDFPartFilter * _filter) : targets(_targets) { filter = _filter; }
  689. virtual void Do(unsigned idx)
  690. {
  691. if (!filter || filter->includePart(idx))
  692. {
  693. const RemoteFilename & cur = targets.item(idx).filename;
  694. OwnedIFile file = createIFile(cur);
  695. if (file->exists())
  696. {
  697. StringBuffer s;
  698. throwError1(DFTERR_PhysicalExistsNoOverwrite, cur.getRemotePath(s).str());
  699. }
  700. }
  701. }
  702. public:
  703. TargetLocationArray & targets;
  704. IDFPartFilter * filter;
  705. };
  706. void FileSprayer::beforeTransfer()
  707. {
  708. if (!isRecovering && !options->getPropBool("@overwrite", true))
  709. {
  710. CheckExists checker(targets, filter);
  711. checker.For(targets.ordinality(), 25, true, true);
  712. }
  713. if (!isRecovering && !usePullOperation())
  714. {
  715. try {
  716. //Should this be on an option. Shouldn't be too inefficient since push is seldom used.
  717. ForEachItemIn(idx2, targets)
  718. {
  719. if (!filter || filter->includePart(idx2))
  720. {
  721. //MORE: This does not cope with creating directories on a solaris machine.
  722. StringBuffer remoteFilename, remoteDirectory;
  723. targets.item(idx2).filename.getRemotePath(remoteFilename);
  724. splitUNCFilename(remoteFilename.str(), &remoteDirectory, &remoteDirectory, NULL, NULL);
  725. Owned<IFile> dir = createIFile(remoteDirectory.str());
  726. dir->createDirectory();
  727. }
  728. }
  729. }
  730. catch (IException *e) {
  731. FLLOG(MCexception(e), job, e, "Creating Directory");
  732. e->Release();
  733. LOG(MCdebugInfo, job, "Ignoring create directory error");
  734. }
  735. // If pushing files, and not recovering, then need to delete the target files, because the slaves might be writing in any order
  736. // for pull, the slave deletes it when creating the file.
  737. unsigned curPartition = 0;
  738. ForEachItemIn(idxTarget, targets)
  739. {
  740. if (!filter || filter->includePart(idxTarget))
  741. {
  742. if (!isSafeMode)
  743. {
  744. OwnedIFile file = createIFile(targets.item(idxTarget).filename);
  745. file->remove();
  746. }
  747. while (partition.isItem(curPartition+1) && partition.item(curPartition+1).whichOutput == idxTarget)
  748. curPartition++;
  749. PartitionPoint & lastPartition = partition.item(curPartition);
  750. offset_t lastOutputOffset = lastPartition.outputOffset + lastPartition.outputLength;
  751. RemoteFilename remote;
  752. getDfuTempName(remote, targets.item(idxTarget).filename);
  753. OwnedIFile file = createIFile(remote);
  754. OwnedIFileIO io = file->open(IFOcreate);
  755. if (!io)
  756. {
  757. StringBuffer name;
  758. remote.getPath(name);
  759. throwError1(DFTERR_CouldNotCreateOutput, name.str());
  760. }
  761. //Create the headers on the utf files.
  762. unsigned headerSize = getHeaderSize(tgtFormat.type);
  763. if (headerSize)
  764. io->write(0, headerSize, getHeaderText(tgtFormat.type));
  765. if ((lastOutputOffset != 0)&&!compressOutput)
  766. {
  767. char null = 0;
  768. io->write(lastOutputOffset-sizeof(null), sizeof(null), &null);
  769. }
  770. }
  771. }
  772. }
  773. throttleNicSpeed = options->getPropInt(ANthrottle, 0);
  774. if (throttleNicSpeed == 0 && !usePullOperation() && targets.ordinality() == 1 && sources.ordinality() > 1)
  775. {
  776. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  777. if (factory) {
  778. Owned<IConstEnvironment> env = factory->openEnvironment();
  779. if (env) {
  780. StringBuffer ipText;
  781. targets.item(0).filename.queryIP().getIpText(ipText);
  782. Owned<IConstMachineInfo> machine = env->getMachineByAddress(ipText.str());
  783. if (machine)
  784. {
  785. if (machine->getOS() == MachineOsW2K)
  786. {
  787. throttleNicSpeed = machine->getNicSpeedMbitSec();
  788. LOG(MCdebugInfo, job, "Throttle target speed to %dMbit/sec", throttleNicSpeed);
  789. }
  790. }
  791. }
  792. }
  793. }
  794. }
  795. bool FileSprayer::calcCRC()
  796. {
  797. return options->getPropBool(ANcrc, true) && !compressOutput && !copyCompressed;
  798. }
  799. bool FileSprayer::calcInputCRC()
  800. {
  801. if (!calcedInputCRC)
  802. {
  803. calcedInputCRC = true;
  804. cachedInputCRC = false;
  805. if (options->getPropBool(ANcrcCheck, true) && !compressedInput)
  806. {
  807. ForEachItemIn(idx, sources)
  808. {
  809. if (!sources.item(idx).hasCRC)
  810. return cachedInputCRC;
  811. }
  812. cachedInputCRC = true;
  813. //If keeping headers then we lose bits of the input files, so they can't be crc checked.
  814. bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
  815. if (options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag && sources.ordinality() > 1)
  816. cachedInputCRC = false;
  817. if (querySplitPrefix())
  818. cachedInputCRC = false;
  819. }
  820. }
  821. return cachedInputCRC;
  822. }
  823. void FileSprayer::calculateOne2OnePartition()
  824. {
  825. LOG(MCdebugProgressDetail, job, "Setting up one2One partition");
  826. if (sources.ordinality() != targets.ordinality())
  827. throwError(DFTERR_ReplicateNumPartsDiffer);
  828. if (!srcFormat.equals(tgtFormat))
  829. throwError(DFTERR_ReplicateSameFormat);
  830. if (compressedInput && compressOutput && (strcmp(encryptKey.sget(),decryptKey.sget())==0))
  831. setCopyCompressedRaw();
  832. ForEachItemIn(idx, sources)
  833. {
  834. FilePartInfo & cur = sources.item(idx);
  835. RemoteFilename curFilename;
  836. curFilename.set(cur.filename);
  837. setCanAccessDirectly(curFilename);
  838. partition.append(*new PartitionPoint(idx, idx, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
  839. targets.item(idx).modifiedTime.set(cur.modifiedTime);
  840. }
  841. }
  842. class AsyncExtractBlobInfo : public CAsyncFor
  843. {
  844. friend class FileSprayer;
  845. public:
  846. AsyncExtractBlobInfo(const char * _splitPrefix, FileSprayer & _sprayer) : sprayer(_sprayer)
  847. {
  848. extracted = new ExtractedBlobArray[sprayer.sources.ordinality()];
  849. splitPrefix = _splitPrefix;
  850. }
  851. ~AsyncExtractBlobInfo()
  852. {
  853. delete [] extracted;
  854. }
  855. virtual void Do(unsigned i)
  856. {
  857. if (!sprayer.sources.item(i).filename.isLocal()) {
  858. try {
  859. remoteExtractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
  860. return;
  861. }
  862. catch (IException *e) {
  863. StringBuffer path;
  864. StringBuffer err;
  865. WARNLOG("dafilesrv ExtractBlobElements(%s) failed with: %s",
  866. sprayer.sources.item(i).filename.getPath(path).str(),
  867. e->errorMessage(err).str());
  868. PROGLOG("Trying direct access (this may be slow)");
  869. e->Release();
  870. }
  871. }
  872. // try local
  873. extractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
  874. }
  875. protected:
  876. FileSprayer & sprayer;
  877. const char * splitPrefix;
  878. ExtractedBlobArray * extracted;
  879. };
  880. void FileSprayer::calculateSplitPrefixPartition(const char * splitPrefix)
  881. {
  882. if (targets.ordinality() != 1)
  883. throwError(DFTERR_SplitPrefixSingleTarget);
  884. if (!srcFormat.equals(tgtFormat))
  885. throwError(DFTERR_SplitPrefixSameFormat);
  886. LOG(MCdebugProgressDetail, job, "Setting up split prefix partition");
  887. Owned<TargetLocation> target = &targets.popGet(); // remove target, add lots of new ones
  888. RemoteFilename blobTarget;
  889. StringBuffer remoteTargetPath, remoteFilename;
  890. target->filename.getRemotePath(remoteTargetPath);
  891. char sepChar = target->filename.getPathSeparator();
  892. //Remove the tail name from the filename
  893. const char * temp = remoteTargetPath.str();
  894. remoteTargetPath.setLength(strrchr(temp, sepChar)-temp);
  895. AsyncExtractBlobInfo extractor(splitPrefix, *this);
  896. unsigned numSources = sources.ordinality();
  897. extractor.For(numSources, numParallelConnections(numSources), true, false);
  898. ForEachItemIn(idx, sources)
  899. {
  900. FilePartInfo & cur = sources.item(idx);
  901. ExtractedBlobArray & extracted = extractor.extracted[idx];
  902. ForEachItemIn(i, extracted)
  903. {
  904. ExtractedBlobInfo & curBlob = extracted.item(i);
  905. remoteFilename.clear().append(remoteTargetPath);
  906. addPathSepChar(remoteFilename, sepChar);
  907. remoteFilename.append(curBlob.filename);
  908. blobTarget.clear();
  909. blobTarget.setRemotePath(remoteFilename);
  910. targets.append(* new TargetLocation(blobTarget));
  911. partition.append(*new PartitionPoint(idx, targets.ordinality()-1, curBlob.offset, curBlob.length, curBlob.length));
  912. }
  913. }
  914. }
  915. void FileSprayer::calculateMany2OnePartition()
  916. {
  917. LOG(MCdebugProgressDetail, job, "Setting up many2one partition");
  918. ForEachItemIn(idx, sources)
  919. {
  920. FilePartInfo & cur = sources.item(idx);
  921. RemoteFilename curFilename;
  922. curFilename.set(cur.filename);
  923. setCanAccessDirectly(curFilename);
  924. partition.append(*new PartitionPoint(idx, 0, cur.headerSize, cur.size, cur.size));
  925. }
  926. }
  927. void FileSprayer::calculateNoSplitPartition()
  928. {
  929. LOG(MCdebugProgressDetail, job, "Setting up no split partition");
  930. if (!usePullOperation() && !srcFormat.equals(tgtFormat))
  931. throwError(DFTERR_NoSplitPushChangeFormat);
  932. #if 1
  933. //split by number
  934. unsigned numSources = sources.ordinality();
  935. unsigned numTargets = targets.ordinality();
  936. if (numSources < numTargets)
  937. numTargets = numSources;
  938. unsigned tally = 0;
  939. unsigned curTarget = 0;
  940. ForEachItemIn(idx, sources)
  941. {
  942. FilePartInfo & cur = sources.item(idx);
  943. partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
  944. tally += numTargets;
  945. if (tally >= numSources)
  946. {
  947. tally -= numSources;
  948. curTarget++;
  949. }
  950. }
  951. #else
  952. //split by size
  953. offset_t totalSize = 0;
  954. ForEachItemIn(i, sources)
  955. totalSize += sources.item(i).size;
  956. unsigned numTargets = targets.ordinality();
  957. offset_t chunkSize = (totalSize / numTargets);
  958. offset_t nextBoundary = chunkSize;
  959. offset_t sizeSoFar = 0;
  960. unsigned curTarget = 0;
  961. ForEachItemIn(idx, sources)
  962. {
  963. FilePartInfo & cur = sources.item(idx);
  964. offset_t nextSize = sizeSoFar + cur.size;
  965. if ((sizeSoFar >= nextBoundary) ||
  966. ((nextSize > nextBoundary) &&
  967. (nextBoundary - sizeSoFar < nextSize - nextBoundary)))
  968. {
  969. if (curTarget != numTargets-1)
  970. {
  971. curTarget++;
  972. nextBoundary += chunkSize;
  973. }
  974. }
  975. RemoteFilename curFilename;
  976. curFilename.set(cur.filename);
  977. setCanAccessDirectly(curFilename);
  978. partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, cur.size, cur.size)); // outputoffset == 0
  979. sizeSoFar = nextSize;
  980. }
  981. #endif
  982. }
  983. void FileSprayer::calculateSprayPartition()
  984. {
  985. LOG(MCdebugProgressDetail, job, "Calculating N:M partition");
  986. bool calcOutput = needToCalcOutput();
  987. FormatPartitionerArray partitioners;
  988. unsigned numParts = targets.ordinality();
  989. StringBuffer remoteFilename;
  990. StringBuffer slaveName;
  991. ForEachItemIn(idx, sources)
  992. {
  993. FilePartInfo & cur = sources.item(idx);
  994. cur.filename.getRemotePath(remoteFilename.clear());
  995. LOG(MCdebugInfoDetail, job, "Partition %d(%s)", idx, remoteFilename.str());
  996. const SocketEndpoint & ep = cur.filename.queryEndpoint();
  997. IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);
  998. RemoteFilename name;
  999. name.set(cur.filename);
  1000. setCanAccessDirectly(name);
  1001. partitioner->setPartitionRange(totalSize, cur.offset, cur.size, cur.headerSize, numParts);
  1002. partitioner->setSource(idx, name, compressedInput, decryptKey);
  1003. partitioners.append(*partitioner);
  1004. }
  1005. unsigned numProcessors = partitioners.ordinality();
  1006. unsigned maxConnections = numParallelConnections(numProcessors);
  1007. //Throttle maximum number of concurrent transfers by starting n threads, and
  1008. //then waiting for one to complete before going on to the next
  1009. Semaphore sem;
  1010. unsigned goIndex;
  1011. for (goIndex=0; goIndex < maxConnections; goIndex++)
  1012. partitioners.item(goIndex).calcPartitions(&sem);
  1013. for (; goIndex<numProcessors; goIndex++)
  1014. {
  1015. sem.wait();
  1016. partitioners.item(goIndex).calcPartitions(&sem);
  1017. }
  1018. for (unsigned waitCount=0; waitCount < maxConnections;waitCount++)
  1019. sem.wait();
  1020. ForEachItemIn(idx2, partitioners)
  1021. partitioners.item(idx2).getResults(partition);
  1022. }
  1023. void FileSprayer::calculateOutputOffsets()
  1024. {
  1025. unsigned headerSize = getHeaderSize(tgtFormat.type);
  1026. offset_t outputOffset = headerSize;
  1027. unsigned curOutput = 0;
  1028. ForEachItemIn(idx, partition)
  1029. {
  1030. PartitionPoint & cur = partition.item(idx);
  1031. if (curOutput != cur.whichOutput)
  1032. {
  1033. outputOffset = headerSize;
  1034. curOutput = cur.whichOutput;
  1035. }
  1036. cur.outputOffset = outputOffset;
  1037. outputOffset += cur.outputLength;
  1038. }
  1039. }
  1040. void FileSprayer::checkFormats()
  1041. {
  1042. if (unknownSourceFormat)
  1043. {
  1044. //If target format is specified, use that - not really very good, but...
  1045. srcFormat.set(tgtFormat);
  1046. //If format omitted, and number of parts are the same then okay to omit the format
  1047. if (sources.ordinality() == targets.ordinality() && !disallowImplicitReplicate())
  1048. replicate = true;
  1049. bool noSplit = !allowSplit();
  1050. if (!replicate && !noSplit)
  1051. {
  1052. //copy to a single target => assume same format concatenated.
  1053. if (targets.ordinality() != 1)
  1054. {
  1055. if (!unknownTargetFormat)
  1056. throwError(DFTERR_TargetFormatUnknownSource);
  1057. else
  1058. throwError(DFTERR_FormatNotSpecified);
  1059. }
  1060. }
  1061. }
  1062. FileFormatType srcType = srcFormat.type;
  1063. FileFormatType tgtType = tgtFormat.type;
  1064. if (srcType != tgtType)
  1065. {
  1066. switch (srcType)
  1067. {
  1068. case FFTfixed:
  1069. if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
  1070. throwError(DFTERR_BadSrcTgtCombination);
  1071. break;
  1072. case FFTvariable:
  1073. if ((tgtType != FFTfixed) && (tgtType != FFTblocked)&& (tgtType != FFTvariablebigendian))
  1074. throwError(DFTERR_BadSrcTgtCombination);
  1075. break;
  1076. case FFTvariablebigendian:
  1077. if ((tgtType != FFTfixed) && (tgtType != FFTblocked) && (tgtType != FFTvariable))
  1078. throwError(DFTERR_BadSrcTgtCombination);
  1079. break;
  1080. case FFTblocked:
  1081. if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
  1082. throwError(DFTERR_BadSrcTgtCombination);
  1083. break;
  1084. case FFTcsv:
  1085. throwError(DFTERR_BadSrcTgtCombination);
  1086. case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  1087. switch (tgtFormat.type)
  1088. {
  1089. case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  1090. break;
  1091. default:
  1092. throwError(DFTERR_OnlyConvertUtfUtf);
  1093. break;
  1094. }
  1095. break;
  1096. }
  1097. }
  1098. }
  1099. void FileSprayer::calibrateProgress()
  1100. {
  1101. sizeToBeRead = 0;
  1102. ForEachItemIn(idx, transferSlaves)
  1103. sizeToBeRead += transferSlaves.item(idx).getInputSize();
  1104. totalLengthRead = calcSizeReadAlready();
  1105. }
  1106. void FileSprayer::checkForOverlap()
  1107. {
  1108. unsigned num = std::min(sources.ordinality(), targets.ordinality());
  1109. for (unsigned idx = 0; idx < num; idx++)
  1110. {
  1111. RemoteFilename & srcName = sources.item(idx).filename;
  1112. RemoteFilename & tgtName = targets.item(idx).filename;
  1113. if (srcName.equals(tgtName))
  1114. {
  1115. StringBuffer x;
  1116. srcName.getPath(x);
  1117. throwError1(DFTERR_CopyFileOntoSelf, x.str());
  1118. }
  1119. }
  1120. }
  1121. void FileSprayer::cleanupRecovery()
  1122. {
  1123. progressTree->setPropBool(ANcomplete, true);
  1124. #ifdef CLEANUP_RECOVERY
  1125. progressTree->removeProp(ANhasPartition);
  1126. progressTree->removeProp(ANhasProgress);
  1127. progressTree->removeProp(ANhasRecovery);
  1128. progressTree->removeProp(PNpartition);
  1129. progressTree->removeProp(PNprogress);
  1130. progressTree->removeProp(ANpull);
  1131. #endif
  1132. }
  1133. //Several files being pulled to the same machine - only run ftslave once...
  1134. void FileSprayer::commonUpSlaves()
  1135. {
  1136. unsigned max = partition.ordinality();
  1137. bool pull = usePullOperation();
  1138. for (unsigned idx = 0; idx < max; idx++)
  1139. {
  1140. PartitionPoint & cur = partition.item(idx);
  1141. cur.whichSlave = pull ? cur.whichOutput : cur.whichInput;
  1142. if (cur.whichSlave == -1)
  1143. cur.whichSlave = 0;
  1144. }
  1145. if (options->getPropBool(ANnocommon, false))
  1146. return;
  1147. //First work out which are the same slaves, and then map the partition.
  1148. //Previously it was n^2 in partition, which is fine until you spray 100K files.
  1149. unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
  1150. unsigned * slaveMapping = new unsigned [numSlaves];
  1151. for (unsigned i = 0; i < numSlaves; i++)
  1152. slaveMapping[i] = i;
  1153. if (pull)
  1154. {
  1155. for (unsigned i1 = 1; i1 < numSlaves; i1++)
  1156. {
  1157. TargetLocation & cur = targets.item(i1);
  1158. for (unsigned i2 = 0; i2 < i1; i2++)
  1159. {
  1160. if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
  1161. {
  1162. slaveMapping[i1] = i2;
  1163. break;
  1164. }
  1165. }
  1166. }
  1167. }
  1168. else
  1169. {
  1170. for (unsigned i1 = 1; i1 < numSlaves; i1++)
  1171. {
  1172. FilePartInfo & cur = sources.item(i1);
  1173. for (unsigned i2 = 0; i2 < i1; i2++)
  1174. {
  1175. if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
  1176. {
  1177. slaveMapping[i1] = i2;
  1178. break;
  1179. }
  1180. }
  1181. }
  1182. }
  1183. for (unsigned i3 = 0; i3 < max; i3++)
  1184. {
  1185. PartitionPoint & cur = partition.item(i3);
  1186. cur.whichSlave = slaveMapping[cur.whichSlave];
  1187. }
  1188. delete [] slaveMapping;
  1189. }
  1190. void FileSprayer::analyseFileHeaders(bool setcurheadersize)
  1191. {
  1192. FileFormatType defaultFormat = FFTunknown;
  1193. switch (srcFormat.type)
  1194. {
  1195. case FFTutf:
  1196. case FFTutf8:
  1197. defaultFormat = FFTutf8n;
  1198. break;
  1199. case FFTutf16:
  1200. defaultFormat = FFTutf16be;
  1201. break;
  1202. case FFTutf32:
  1203. defaultFormat = FFTutf32be;
  1204. break;
  1205. default:
  1206. if (!srcFormat.rowTag)
  1207. return;
  1208. break;
  1209. }
  1210. FileFormatType actualType = FFTunknown;
  1211. unsigned numEmptyXml = 0;
  1212. ForEachItemIn(idx, sources)
  1213. {
  1214. FilePartInfo & cur = sources.item(idx);
  1215. StringBuffer s;
  1216. cur.filename.getPath(s);
  1217. LOG(MCdebugInfo, job, "Examine header of file %s", s.str());
  1218. Owned<IFile> file = createIFile(cur.filename);
  1219. Owned<IFileIO> io = file->open(IFOread);
  1220. if (!io)
  1221. {
  1222. StringBuffer s;
  1223. cur.filename.getRemotePath(s);
  1224. throwError1(DFTERR_CouldNotOpenFilePart, s.str());
  1225. }
  1226. if (compressedInput) {
  1227. Owned<IExpander> expander;
  1228. if (!decryptKey.isEmpty()) {
  1229. StringBuffer key;
  1230. decrypt(key,decryptKey);
  1231. expander.setown(createAESExpander256(key.length(),key.str()));
  1232. }
  1233. io.setown(createCompressedFileReader(io,expander));
  1234. }
  1235. if (defaultFormat != FFTunknown)
  1236. {
  1237. FileFormatType thisType;
  1238. unsigned char header[4];
  1239. memset(header, 255, sizeof(header)); // fill so don't get clashes if file is very small!
  1240. unsigned numRead = io->read(0, 4, header);
  1241. unsigned headerSize = 0;
  1242. if ((memcmp(header, "\xEF\xBB\xBF", 3) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf8))
  1243. {
  1244. thisType = FFTutf8n;
  1245. headerSize = 3;
  1246. }
  1247. else if ((memcmp(header, "\xFF\xFE\x00\x00", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
  1248. {
  1249. thisType = FFTutf32le;
  1250. headerSize = 4;
  1251. }
  1252. else if ((memcmp(header, "\x00\x00\xFE\xFF", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
  1253. {
  1254. thisType = FFTutf32be;
  1255. headerSize = 4;
  1256. }
  1257. else if ((memcmp(header, "\xFF\xFE", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
  1258. {
  1259. thisType = FFTutf16le;
  1260. headerSize = 2;
  1261. }
  1262. else if ((memcmp(header, "\xFE\xFF", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
  1263. {
  1264. thisType = FFTutf16be;
  1265. headerSize = 2;
  1266. }
  1267. else
  1268. {
  1269. thisType = defaultFormat;
  1270. headerSize = 0;
  1271. }
  1272. if (actualType == FFTunknown)
  1273. actualType = thisType;
  1274. else if (actualType != thisType)
  1275. throwError(DFTERR_PartsDoNotHaveSameUtfFormat);
  1276. if (setcurheadersize) {
  1277. cur.headerSize = headerSize;
  1278. cur.size -= headerSize;
  1279. }
  1280. }
  1281. if (srcFormat.rowTag&&setcurheadersize)
  1282. {
  1283. try
  1284. {
  1285. locateXmlHeader(io, cur.headerSize, cur.xmlHeaderLength, cur.xmlFooterLength);
  1286. cur.headerSize += (unsigned)cur.xmlHeaderLength;
  1287. cur.size -= (cur.xmlHeaderLength + cur.xmlFooterLength);
  1288. }
  1289. catch (IException * e)
  1290. {
  1291. if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
  1292. throw;
  1293. e->Release();
  1294. if (!replicate)
  1295. {
  1296. cur.headerSize = 0;
  1297. cur.size = 0;
  1298. }
  1299. numEmptyXml++;
  1300. }
  1301. }
  1302. }
  1303. if (numEmptyXml == sources.ordinality())
  1304. {
  1305. if (numEmptyXml == 1)
  1306. throwError(DFTERR_CannotFindFirstXmlRecord);
  1307. // else
  1308. // throwError(DFTERR_CannotFindAnyXmlRecord);
  1309. }
  1310. if (defaultFormat != FFTunknown)
  1311. srcFormat.type = actualType;
  1312. if (unknownTargetFormat)
  1313. {
  1314. tgtFormat.set(srcFormat);
  1315. if (distributedTarget)
  1316. {
  1317. DistributedFilePropertyLock lock(distributedTarget);
  1318. IPropertyTree &curProps = lock.queryAttributes();
  1319. tgtFormat.save(&curProps);
  1320. }
  1321. }
  1322. }
  1323. void FileSprayer::locateXmlHeader(IFileIO * io, unsigned headerSize, offset_t & xmlHeaderLength, offset_t & xmlFooterLength)
  1324. {
  1325. Owned<IFileIOStream> in = createIOStream(io);
  1326. XmlSplitter splitter(srcFormat);
  1327. BufferedDirectReader reader;
  1328. reader.set(in);
  1329. reader.seek(headerSize);
  1330. xmlHeaderLength = splitter.getHeaderLength(reader);
  1331. offset_t size = io->size();
  1332. offset_t endOffset = (size > srcFormat.maxRecordSize*2 + headerSize) ? size - srcFormat.maxRecordSize*2 : headerSize;
  1333. reader.seek(endOffset);
  1334. xmlFooterLength = splitter.getFooterLength(reader, size);
  1335. }
  1336. void FileSprayer::derivePartitionExtra()
  1337. {
  1338. calculateOutputOffsets();
  1339. assignPartitionFilenames();
  1340. commonUpSlaves();
  1341. IPropertyTreeIterator * iter = NULL;
  1342. if (isRecovering)
  1343. {
  1344. Owned<IPropertyTreeIterator> iter = progressTree->getElements(PNprogress);
  1345. ForEach(*iter)
  1346. {
  1347. OutputProgress & next = * new OutputProgress;
  1348. next.restore(&iter->query());
  1349. next.tree.set(&iter->query());
  1350. progress.append(next);
  1351. }
  1352. assertex(progress.ordinality() == partition.ordinality());
  1353. }
  1354. else
  1355. {
  1356. if (allowRecovery)
  1357. progressTree->setPropBool(ANhasProgress, true);
  1358. ForEachItemIn(idx, partition)
  1359. {
  1360. OutputProgress & next = * new OutputProgress;
  1361. next.whichPartition=idx;
  1362. if (allowRecovery)
  1363. {
  1364. IPropertyTree * progressInfo = progressTree->addPropTree(PNprogress, createPTree(PNprogress, ipt_caseInsensitive));
  1365. next.tree.set(progressInfo);
  1366. next.save(progressInfo);
  1367. }
  1368. progress.append(next);
  1369. }
  1370. }
  1371. }
  1372. void FileSprayer::displayPartition()
  1373. {
  1374. ForEachItemIn(idx, partition)
  1375. partition.item(idx).display();
  1376. }
  1377. void FileSprayer::extractSourceFormat(IPropertyTree * props)
  1378. {
  1379. if (srcFormat.restore(props))
  1380. unknownSourceFormat = false;
  1381. else
  1382. srcFormat.set(FFTfixed, 1);
  1383. bool blockcompressed;
  1384. if (isCompressed(*props, &blockcompressed))
  1385. {
  1386. if (!blockcompressed)
  1387. throwError(DFTERR_RowCompressedNotSupported);
  1388. compressedInput = true;
  1389. }
  1390. else if (!decryptKey.isEmpty())
  1391. compressedInput = true;
  1392. }
  1393. void FileSprayer::gatherFileSizes(bool errorIfMissing)
  1394. {
  1395. FilePartInfoArray fileSizeQueue;
  1396. LOG(MCdebugProgress, job, "Start gathering file sizes...");
  1397. ForEachItemIn(idx, sources)
  1398. {
  1399. FilePartInfo & cur = sources.item(idx);
  1400. if (cur.size == UNKNOWN_PART_SIZE)
  1401. fileSizeQueue.append(OLINK(cur));
  1402. }
  1403. gatherFileSizes(fileSizeQueue, errorIfMissing);
  1404. LOG(MCdebugProgress, job, "Finished gathering file sizes...");
  1405. }
  1406. void FileSprayer::afterGatherFileSizes()
  1407. {
  1408. if (!copyCompressed)
  1409. {
  1410. ForEachItemIn(idx2, sources)
  1411. {
  1412. FilePartInfo & cur = sources.item(idx2);
  1413. cur.offset = totalSize;
  1414. totalSize += cur.size;
  1415. if (cur.size % srcFormat.getUnitSize())
  1416. {
  1417. StringBuffer s;
  1418. if (srcFormat.isUtf())
  1419. throwError2(DFTERR_InputIsInvalidMultipleUtf, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
  1420. else
  1421. throwError2(DFTERR_InputIsInvalidMultiple, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
  1422. }
  1423. }
  1424. }
  1425. }
  1426. void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorIfMissing)
  1427. {
  1428. if (fileSizeQueue.ordinality())
  1429. {
  1430. CIArrayOf<FileSizeThread> threads;
  1431. CriticalSection fileSizeCS;
  1432. //Is this a good guess? start square root of number of files threads??
  1433. unsigned numThreads = (unsigned)sqrt((float)fileSizeQueue.ordinality());
  1434. if (numThreads>20)
  1435. numThreads = 20;
  1436. LOG(MCdebugProgress, job, "Gathering %d file sizes on %d threads", fileSizeQueue.ordinality(), numThreads);
  1437. unsigned idx;
  1438. for (idx = 0; idx < numThreads; idx++)
  1439. threads.append(*new FileSizeThread(fileSizeQueue, fileSizeCS, compressedInput&&!copyCompressed, errorIfMissing));
  1440. for (idx = 0; idx < numThreads; idx++)
  1441. threads.item(idx).start();
  1442. loop {
  1443. bool alldone = true;
  1444. StringBuffer err;
  1445. for (idx = 0; idx < numThreads; idx++) {
  1446. bool ok = threads.item(idx).wait(10*1000);
  1447. if (!ok)
  1448. alldone = false;
  1449. }
  1450. if (alldone)
  1451. break;
  1452. }
  1453. for (idx = 0; idx < numThreads; idx++)
  1454. threads.item(idx).queryThrowError();
  1455. }
  1456. }
  1457. void FileSprayer::gatherMissingSourceTarget(IFileDescriptor * source)
  1458. {
  1459. //First gather all the file sizes...
  1460. RemoteFilename filename;
  1461. FilePartInfoArray primparts;
  1462. FilePartInfoArray secparts;
  1463. UnsignedArray secstart;
  1464. FilePartInfoArray queue;
  1465. unsigned numParts = source->numParts();
  1466. for (unsigned idx1=0; idx1 < numParts; idx1++)
  1467. {
  1468. if (!filter.get() || filter->includePart(idx1))
  1469. {
  1470. unsigned numCopies = source->numCopies(idx1);
  1471. if (numCopies>=1) // only add if there is one or more replicates
  1472. {
  1473. for (unsigned copy=0; copy < numCopies; copy++)
  1474. {
  1475. FilePartInfo & next = * new FilePartInfo;
  1476. source->getFilename(idx1, copy, next.filename);
  1477. if (copy==0)
  1478. primparts.append(next);
  1479. else
  1480. {
  1481. if (copy==1)
  1482. secstart.append(secparts.ordinality());
  1483. secparts.append(next);
  1484. }
  1485. queue.append(OLINK(next));
  1486. }
  1487. }
  1488. }
  1489. }
  1490. secstart.append(secparts.ordinality());
  1491. gatherFileSizes(queue, false);
  1492. //Now process the information...
  1493. StringBuffer primaryPath, secondaryPath;
  1494. for (unsigned idx=0; idx < primparts.ordinality(); idx++)
  1495. {
  1496. FilePartInfo & primary = primparts.item(idx);
  1497. offset_t primarySize = primary.size;
  1498. primary.filename.getRemotePath(primaryPath.clear());
  1499. for (unsigned idx2=secstart.item(idx);idx2<secstart.item(idx+1);idx2++)
  1500. {
  1501. FilePartInfo & secondary = secparts.item(idx2);
  1502. offset_t secondarySize = secondary.size;
  1503. secondary.filename.getRemotePath(secondaryPath.clear());
  1504. unsigned sourceCopy = 0;
  1505. if (primarySize != secondarySize)
  1506. {
  1507. if (primarySize == -1)
  1508. {
  1509. sourceCopy = 1;
  1510. }
  1511. else if (secondarySize != -1)
  1512. {
  1513. LOG(MCwarning, unknownJob, "Replicate - primary and secondary copies have different sizes (%"I64F"d v %"I64F"d) for part %u", primarySize, secondarySize, idx);
  1514. continue; // ignore copy
  1515. }
  1516. }
  1517. else
  1518. {
  1519. if (primarySize == -1)
  1520. {
  1521. LOG(MCwarning, unknownJob, "Replicate - neither primary or secondary copies exist for part %u", idx);
  1522. primarySize = 0; // to stop later failure to gather the file size
  1523. }
  1524. continue; // ignore copy
  1525. }
  1526. RemoteFilename *dst= (sourceCopy == 0) ? &secondary.filename : &primary.filename;
  1527. // check nothing else to same destination
  1528. bool done = false;
  1529. ForEachItemIn(dsti,targets) {
  1530. TargetLocation &tgt = targets.item(dsti);
  1531. if (tgt.filename.equals(*dst)) {
  1532. done = true;
  1533. break;
  1534. }
  1535. }
  1536. if (!done) {
  1537. sources.append(* new FilePartInfo(*((sourceCopy == 0)? &primary.filename : &secondary.filename)));
  1538. targets.append(* new TargetLocation(*dst));
  1539. sources.tos().size = (sourceCopy == 0) ? primarySize : secondarySize;
  1540. }
  1541. }
  1542. }
  1543. filter.clear(); // we have already filtered
  1544. }
  1545. unsigned __int64 FileSprayer::calcSizeReadAlready()
  1546. {
  1547. unsigned __int64 sizeRead = 0;
  1548. ForEachItemIn(idx, progress)
  1549. {
  1550. OutputProgress & cur = progress.item(idx);
  1551. sizeRead += cur.inputLength;
  1552. }
  1553. return sizeRead;
  1554. }
  1555. unsigned __int64 FileSprayer::getSizeReadAlready()
  1556. {
  1557. return totalLengthRead;
  1558. }
  1559. PartitionPoint & FileSprayer::createLiteral(size32_t len, const void * data, unsigned idx)
  1560. {
  1561. PartitionPoint & next = * new PartitionPoint;
  1562. next.inputOffset = 0;
  1563. next.inputLength = len;
  1564. next.outputLength = len;
  1565. next.fixedText.set(len, data);
  1566. if (partition.isItem(idx))
  1567. {
  1568. PartitionPoint & cur = partition.item(idx);
  1569. next.whichInput = cur.whichInput;
  1570. next.whichOutput = cur.whichOutput;
  1571. }
  1572. else
  1573. {
  1574. next.whichInput = (unsigned)-1;
  1575. next.whichOutput = (unsigned)-1;
  1576. }
  1577. return next;
  1578. }
  1579. void FileSprayer::addHeaderFooter(size32_t len, const void * data, unsigned idx, bool before)
  1580. {
  1581. PartitionPoint & next = createLiteral(len, data, idx);
  1582. unsigned insertPos = before ? idx : idx+1;
  1583. partition.add(next, insertPos);
  1584. }
  1585. //MORE: This should be moved to jlib....
  1586. //MORE: I should really be doing this on unicode characters and supporting \u \U
  1587. void replaceEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid)
  1588. {
  1589. out.ensureCapacity(strlen(in)+1);
  1590. while (*in)
  1591. {
  1592. char c = *in++;
  1593. if (c == '\\')
  1594. {
  1595. char next = *in;
  1596. if (next)
  1597. {
  1598. in++;
  1599. switch (next)
  1600. {
  1601. case 'a': c = '\a'; break;
  1602. case 'b': c = '\b'; break;
  1603. case 'f': c = '\f'; break;
  1604. case 'n': c = '\n'; break;
  1605. case 'r': c = '\r'; break;
  1606. case 't': c = '\t'; break;
  1607. case 'v': c = '\v'; break;
  1608. case '\\':
  1609. case '\'':
  1610. case '?':
  1611. case '\"': break;
  1612. case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7':
  1613. {
  1614. c = next - '0';
  1615. if (*in >= '0' && *in <= '7')
  1616. {
  1617. c = c << 3 | (*in++-'0');
  1618. if (*in >= '0' && *in <= '7')
  1619. c = c << 3 | (*in++-'0');
  1620. }
  1621. break;
  1622. }
  1623. case 'x':
  1624. c = 0;
  1625. while (isxdigit(*in))
  1626. {
  1627. next = *in++;
  1628. c = c << 4;
  1629. if (next >= '0' && next <= '9') c |= (next - '0');
  1630. else if (next >= 'A' && next <= 'F') c |= (next - 'A' + 10);
  1631. else if (next >= 'a' && next <= 'f') c |= (next - 'a' + 10);
  1632. }
  1633. break;
  1634. default:
  1635. if (errorIfInvalid)
  1636. throw MakeStringException(1, "unrecognised character escape sequence '\\%c'", next);
  1637. in--; // keep it as is.
  1638. break;
  1639. }
  1640. }
  1641. }
  1642. out.append(c);
  1643. }
  1644. }
  1645. void FileSprayer::addHeaderFooter(const char * data, unsigned idx, bool before)
  1646. {
  1647. StringBuffer expanded;
  1648. //MORE: Should really expand as unicode, so can have unicode control characters.
  1649. decodeCppEscapeSequence(expanded, data, true);
  1650. MemoryBuffer translated;
  1651. convertUtf(translated, getUtfFormatType(tgtFormat.type), expanded.length(), expanded.str(), UtfReader::Utf8);
  1652. //MORE: Convert from utf-8 to target format.
  1653. addHeaderFooter(translated.length(), translated.toByteArray(), idx, before);
  1654. }
  1655. void FileSprayer::cloneHeaderFooter(unsigned idx, bool isHeader)
  1656. {
  1657. PartitionPoint & cur = partition.item(idx);
  1658. FilePartInfo & curSrc = sources.item(cur.whichInput);
  1659. PartitionPoint & next = * new PartitionPoint;
  1660. //NB: headerSize include the size of the xmlHeader; size includes neither header or footers.
  1661. if (isHeader)
  1662. next.inputOffset = curSrc.headerSize - curSrc.xmlHeaderLength;
  1663. else
  1664. next.inputOffset = curSrc.headerSize + curSrc.size;
  1665. next.inputLength = isHeader ? curSrc.xmlHeaderLength : curSrc.xmlFooterLength;
  1666. next.outputLength = needToCalcOutput() ? next.inputLength : 0;
  1667. next.whichInput = cur.whichInput;
  1668. next.whichOutput = cur.whichOutput;
  1669. if (isHeader)
  1670. partition.add(next, idx);
  1671. else
  1672. partition.add(next, idx+1);
  1673. }
  1674. void FileSprayer::insertHeaders()
  1675. {
  1676. const char * header = options->queryProp("@header");
  1677. const char * footer = options->queryProp("@footer");
  1678. const char * glue = options->queryProp("@glue");
  1679. const char * prefix = options->queryProp(ANprefix);
  1680. bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
  1681. bool keepHeader = options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag;
  1682. if (header || footer || prefix || glue)
  1683. keepHeader = false;
  1684. if (keepHeader && !canKeepHeader)
  1685. throwError(DFTERR_CannotKeepHeaderChangeFormat);
  1686. if (header || footer || keepHeader)
  1687. {
  1688. unsigned idx;
  1689. unsigned curOutput = (unsigned)-1;
  1690. bool footerPending = false;
  1691. for (idx = 0; idx < partition.ordinality(); idx++)
  1692. {
  1693. PartitionPoint & cur = partition.item(idx);
  1694. if (curOutput != cur.whichOutput)
  1695. {
  1696. if (keepHeader)
  1697. {
  1698. if (footerPending && (idx != 0))
  1699. {
  1700. footerPending = false;
  1701. cloneHeaderFooter(idx-1, false);
  1702. idx++;
  1703. }
  1704. //Don't add a header if there are no records in this part, and coming from more than one source file
  1705. //If coming from one then we'll be guaranteed to have a correct header in that part.
  1706. //If more than one, (and not replicating) then we will have failed to know where the header/footers are for this part.
  1707. if ((cur.inputLength == 0) && (sources.ordinality() > 1))
  1708. continue;
  1709. cloneHeaderFooter(idx, true);
  1710. footerPending = true;
  1711. idx++;
  1712. }
  1713. if (footer && (idx != 0))
  1714. {
  1715. addHeaderFooter(footer, idx-1, false);
  1716. idx++;
  1717. }
  1718. if (header)
  1719. {
  1720. addHeaderFooter(header, idx, true);
  1721. idx++;
  1722. }
  1723. curOutput = cur.whichOutput;
  1724. }
  1725. }
  1726. if (keepHeader && footerPending)
  1727. {
  1728. while (idx && partition.item(idx-1).inputLength == 0)
  1729. idx--;
  1730. if (idx)
  1731. {
  1732. cloneHeaderFooter(idx-1, false);
  1733. idx++;
  1734. }
  1735. }
  1736. if (footer)
  1737. {
  1738. addHeaderFooter(footer, idx-1, false);
  1739. idx++;
  1740. }
  1741. }
  1742. if (glue)
  1743. {
  1744. unsigned idx;
  1745. unsigned curInput = 0;
  1746. unsigned curOutput = 0;
  1747. for (idx = 0; idx < partition.ordinality(); idx++)
  1748. {
  1749. PartitionPoint & cur = partition.item(idx);
  1750. if ((curInput != cur.whichInput) && (curOutput == cur.whichOutput))
  1751. {
  1752. addHeaderFooter(glue, idx, true);
  1753. idx++;
  1754. }
  1755. curInput = cur.whichInput;
  1756. curOutput = cur.whichOutput;
  1757. }
  1758. }
  1759. if (prefix)
  1760. {
  1761. if (!srcFormat.equals(tgtFormat))
  1762. throwError(DFTERR_PrefixCannotTransform);
  1763. if (glue || header || footer)
  1764. throwError(DFTERR_PrefixCannotAlsoAddHeader);
  1765. MemoryBuffer filePrefix;
  1766. filePrefix.setEndian(__LITTLE_ENDIAN);
  1767. for (unsigned idx = 0; idx < partition.ordinality(); idx++)
  1768. {
  1769. PartitionPoint & cur = partition.item(idx);
  1770. filePrefix.clear();
  1771. const char * finger = prefix;
  1772. while (finger)
  1773. {
  1774. StringAttr command;
  1775. const char * comma = strchr(finger, ',');
  1776. if (comma)
  1777. {
  1778. command.set(finger, comma-finger);
  1779. finger = comma+1;
  1780. }
  1781. else
  1782. {
  1783. command.set(finger);
  1784. finger = NULL;
  1785. }
  1786. command.toUpperCase();
  1787. if (memcmp(command, "FILENAME", 8) == 0)
  1788. {
  1789. StringBuffer filename;
  1790. cur.inputName.split(NULL, NULL, &filename, &filename);
  1791. if (command[8] == ':')
  1792. {
  1793. unsigned maxLen = atoi(command+9);
  1794. filename.padTo(maxLen);
  1795. filePrefix.append(maxLen, filename.str());
  1796. }
  1797. else
  1798. {
  1799. filePrefix.append((unsigned)filename.length());
  1800. filePrefix.append(filename.length(), filename.str());
  1801. }
  1802. }
  1803. else if ((memcmp(command, "FILESIZE", 8) == 0) || (command.length() == 2))
  1804. {
  1805. const char * format = command;
  1806. if (memcmp(format, "FILESIZE", 8) == 0)
  1807. {
  1808. if (format[8] == ':')
  1809. format = format+9;
  1810. else
  1811. format = "L4";
  1812. }
  1813. bool bigEndian;
  1814. char c = format[0];
  1815. if (c == 'B')
  1816. bigEndian = true;
  1817. else if (c == 'L')
  1818. bigEndian = false;
  1819. else
  1820. throwError1(DFTERR_InvalidPrefixFormat, format);
  1821. c = format[1];
  1822. if ((c <= '0') || (c > '8'))
  1823. throwError1(DFTERR_InvalidPrefixFormat, format);
  1824. unsigned length = (c - '0');
  1825. unsigned __int64 value = cur.inputLength;
  1826. byte temp[8];
  1827. for (unsigned i=0; i<length; i++)
  1828. {
  1829. temp[i] = (byte)value;
  1830. value >>= 8;
  1831. }
  1832. if (value)
  1833. throwError(DFTERR_PrefixTooSmall);
  1834. if (bigEndian)
  1835. {
  1836. byte temp2[8];
  1837. _cpyrevn(&temp2, &temp, length);
  1838. filePrefix.append(length, &temp2);
  1839. }
  1840. else
  1841. filePrefix.append(length, &temp);
  1842. }
  1843. else
  1844. throwError1(DFTERR_InvalidPrefixFormat, command.get());
  1845. }
  1846. addHeaderFooter(filePrefix.length(), filePrefix.toByteArray(), idx, true);
  1847. idx++;
  1848. }
  1849. }
  1850. }
  1851. bool FileSprayer::needToCalcOutput()
  1852. {
  1853. return !usePullOperation() || options->getPropBool(ANverify);
  1854. }
  1855. unsigned FileSprayer::numParallelConnections(unsigned limit)
  1856. {
  1857. unsigned maxConnections = options->getPropInt(ANmaxConnections, limit);
  1858. if ((maxConnections == 0) || (maxConnections > limit)) maxConnections = limit;
  1859. return maxConnections;
  1860. }
  1861. unsigned FileSprayer::numParallelSlaves()
  1862. {
  1863. unsigned numPullers = transferSlaves.ordinality();
  1864. unsigned maxConnections = DEFAULT_MAX_CONNECTIONS;
  1865. unsigned connectOption = options->getPropInt(ANmaxConnections, 0);
  1866. if (connectOption)
  1867. maxConnections = connectOption;
  1868. else if (mirroring && (maxConnections * 3 < numPullers))
  1869. maxConnections = numPullers/3;
  1870. if (maxConnections > numPullers) maxConnections = numPullers;
  1871. return maxConnections;
  1872. }
  1873. void FileSprayer::performTransfer()
  1874. {
  1875. unsigned numSlaves = transferSlaves.ordinality();
  1876. unsigned maxConnections = numParallelSlaves();
  1877. unsigned failure = options->getPropInt("@fail", 0);
  1878. if (failure) maxConnections = 1;
  1879. calibrateProgress();
  1880. numSlavesCompleted = 0;
  1881. if (maxConnections > 1)
  1882. shuffle(transferSlaves);
  1883. if (progressReport)
  1884. progressReport->setRange(getSizeReadAlready(), sizeToBeRead, transferSlaves.ordinality());
  1885. LOG(MCdebugInfo, job, "Begin to transfer parts (%d threads)\n", maxConnections);
  1886. //Throttle maximum number of concurrent transfers by starting n threads, and
  1887. //then waiting for one to complete before going on to the next
  1888. lastProgressTick = msTick();
  1889. Semaphore sem;
  1890. unsigned goIndex;
  1891. for (goIndex=0; goIndex<maxConnections; goIndex++)
  1892. transferSlaves.item(goIndex).go(sem);
  1893. //MORE: Should abort early if we get an error on one of the transfers...
  1894. // to do that we will need a queue of completed pullers.
  1895. for (; !error && goIndex<numSlaves;goIndex++)
  1896. {
  1897. waitForTransferSem(sem);
  1898. numSlavesCompleted++;
  1899. transferSlaves.item(goIndex).go(sem);
  1900. }
  1901. for (unsigned waitCount=0; waitCount<maxConnections;waitCount++)
  1902. {
  1903. waitForTransferSem(sem);
  1904. numSlavesCompleted++;
  1905. }
  1906. if (error)
  1907. throw LINK(error);
  1908. bool ok = true;
  1909. ForEachItemIn(idx3, transferSlaves)
  1910. {
  1911. FileTransferThread & cur = transferSlaves.item(idx3);
  1912. if (!cur.ok)
  1913. ok = false;
  1914. }
  1915. if (!ok) {
  1916. if (isAborting())
  1917. throwError(DFTERR_CopyAborted);
  1918. else
  1919. throwError(DFTERR_CopyFailed);
  1920. }
  1921. }
  1922. void FileSprayer::pullParts()
  1923. {
  1924. bool needCalcCRC = calcCRC();
  1925. LOG(MCdebugInfoDetail, job, "Calculate CRC = %d", needCalcCRC);
  1926. ForEachItemIn(idx, targets)
  1927. {
  1928. FileTransferThread & next = * new FileTransferThread(*this, FTactionpull, targets.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
  1929. transferSlaves.append(next);
  1930. }
  1931. ForEachItemIn(idx3, partition)
  1932. {
  1933. PartitionPoint & cur = partition.item(idx3);
  1934. if (!filter || filter->includePart(cur.whichOutput))
  1935. transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
  1936. }
  1937. performTransfer();
  1938. }
  1939. void FileSprayer::pushParts()
  1940. {
  1941. bool needCalcCRC = calcCRC();
  1942. ForEachItemIn(idx, sources)
  1943. {
  1944. FileTransferThread & next = * new FileTransferThread(*this, FTactionpush, sources.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
  1945. transferSlaves.append(next);
  1946. }
  1947. ForEachItemIn(idx3, partition)
  1948. {
  1949. PartitionPoint & cur = partition.item(idx3);
  1950. if (!filter || filter->includePart(cur.whichOutput))
  1951. transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
  1952. }
  1953. performTransfer();
  1954. }
  1955. void FileSprayer::removeSource()
  1956. {
  1957. LOG(MCwarning, job, "Source file removal not yet implemented");
  1958. }
  1959. bool FileSprayer::restorePartition()
  1960. {
  1961. if (allowRecovery && progressTree->getPropBool(ANhasPartition))
  1962. {
  1963. IPropertyTreeIterator * iter = progressTree->getElements(PNpartition);
  1964. ForEach(*iter)
  1965. {
  1966. PartitionPoint & next = * new PartitionPoint;
  1967. next.restore(&iter->query());
  1968. partition.append(next);
  1969. }
  1970. iter->Release();
  1971. return (partition.ordinality() != 0);
  1972. }
  1973. return false;
  1974. }
  1975. void FileSprayer::savePartition()
  1976. {
  1977. if (allowRecovery)
  1978. {
  1979. ForEachItemIn(idx, partition)
  1980. {
  1981. IPropertyTree * child = createPTree(PNpartition, ipt_caseInsensitive);
  1982. partition.item(idx).save(child);
  1983. progressTree->addPropTree(PNpartition, child);
  1984. }
  1985. progressTree->setPropBool(ANhasPartition, true);
  1986. }
  1987. }
  1988. void FileSprayer::setCopyCompressedRaw()
  1989. {
  1990. assertex(compressedInput && compressOutput);
  1991. // encrypt/decrypt keys should be same
  1992. compressedInput = false;
  1993. compressOutput = false;
  1994. calcedInputCRC = true;
  1995. cachedInputCRC = false;
  1996. copyCompressed = true;
  1997. }
  1998. void FileSprayer::setError(const SocketEndpoint & ep, IException * e)
  1999. {
  2000. CriticalBlock lock(errorCS);
  2001. if (!error)
  2002. {
  2003. StringBuffer url;
  2004. ep.getUrlStr(url);
  2005. error.setown(MakeStringException(e->errorCode(), "%s", e->errorMessage(url.append(": ")).str()));
  2006. }
  2007. }
  2008. void FileSprayer::setPartFilter(IDFPartFilter * _filter)
  2009. {
  2010. filter.set(_filter);
  2011. }
  2012. void FileSprayer::setProgress(IDaftProgress * _progress)
  2013. {
  2014. progressReport = _progress;
  2015. }
  2016. void FileSprayer::setAbort(IAbortRequestCallback * _abort)
  2017. {
  2018. abortChecker = _abort;
  2019. }
  2020. void FileSprayer::setReplicate(bool _replicate)
  2021. {
  2022. replicate = _replicate;
  2023. }
  2024. void FileSprayer::setSource(IDistributedFile * source)
  2025. {
  2026. distributedSource.set(source);
  2027. srcAttr.setown(createPTreeFromIPT(&source->queryAttributes()));
  2028. extractSourceFormat(srcAttr);
  2029. unsigned numParts = source->numParts();
  2030. for (unsigned idx=0; idx < numParts; idx++)
  2031. {
  2032. Owned<IDistributedFilePart> curPart = source->getPart(idx);
  2033. RemoteFilename rfn;
  2034. FilePartInfo & next = * new FilePartInfo(curPart->getFilename(rfn));
  2035. next.extractExtra(*curPart);
  2036. if (curPart->numCopies()>1)
  2037. next.mirrorFilename.set(curPart->getFilename(rfn,1));
  2038. // don't set the following here - force to check disk
  2039. //next.size = curPart->getFileSize(true,false);
  2040. //next.psize = curPart->getDiskSize();
  2041. sources.append(next);
  2042. }
  2043. gatherFileSizes(false);
  2044. }
  2045. void FileSprayer::setSource(IFileDescriptor * source)
  2046. {
  2047. setSource(source, 0, 1);
  2048. //Now get the size of the files directly (to check they exist). If they don't exist then switch to the backup instead.
  2049. gatherFileSizes(false);
  2050. }
  2051. void FileSprayer::setSource(IFileDescriptor * source, unsigned copy, unsigned mirrorCopy)
  2052. {
  2053. IPropertyTree *attr = &source->queryProperties();
  2054. extractSourceFormat(attr);
  2055. srcAttr.setown(createPTreeFromIPT(&source->queryProperties()));
  2056. extractSourceFormat(srcAttr);
  2057. RemoteFilename filename;
  2058. unsigned numParts = source->numParts();
  2059. for (unsigned idx=0; idx < numParts; idx++)
  2060. {
  2061. if (source->isMulti(idx))
  2062. {
  2063. RemoteMultiFilename multi;
  2064. source->getMultiFilename(idx, copy, multi);
  2065. multi.expandWild();
  2066. ForEachItemIn(i, multi)
  2067. {
  2068. RemoteFilename &rfn = multi.item(i);
  2069. FilePartInfo & next = * new FilePartInfo(rfn);
  2070. Owned<IPartDescriptor> part = source->getPart(idx);
  2071. next.extractExtra(*part);
  2072. next.size = multi.getSize(i);
  2073. sources.append(next);
  2074. }
  2075. //MORE: Need to extract the backup filenames for mirror files.
  2076. }
  2077. else
  2078. {
  2079. source->getFilename(idx, copy, filename);
  2080. FilePartInfo & next = * new FilePartInfo(filename);
  2081. Owned<IPartDescriptor> part = source->getPart(idx);
  2082. next.extractExtra(*part);
  2083. if (mirrorCopy != (unsigned)-1)
  2084. source->getFilename(idx, mirrorCopy, next.mirrorFilename);
  2085. sources.append(next);
  2086. }
  2087. }
  2088. if (sources.ordinality() == 0)
  2089. LOG(MCuserWarning, unknownJob, "The wildcarded source did not match any filenames");
  2090. // throwError(DFTERR_NoFilesMatchWildcard);
  2091. }
  2092. void FileSprayer::setSource(IDistributedFilePart * part)
  2093. {
  2094. tgtFormat.set(FFTfixed, 1);
  2095. unsigned copy = 0;
  2096. RemoteFilename rfn;
  2097. sources.append(* new FilePartInfo(part->getFilename(rfn,copy)));
  2098. if (compressedInput)
  2099. {
  2100. calcedInputCRC = true;
  2101. cachedInputCRC = false;
  2102. }
  2103. }
  2104. void FileSprayer::setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode)
  2105. {
  2106. extractSourceFormat(&fd->queryProperties());
  2107. tgtFormat.set(srcFormat);
  2108. if (options->getPropBool(ANcrcDiffers, false))
  2109. throwError1(DFTERR_ReplicateOptionNoSupported, "crcDiffers");
  2110. if (options->getPropBool(ANsizedate, false))
  2111. throwError1(DFTERR_ReplicateOptionNoSupported, "sizedate");
  2112. switch (mode)
  2113. {
  2114. case DRMreplicatePrimary: // doesn't work for multi copies
  2115. setSource(fd, 0);
  2116. setTarget(fd, 1);
  2117. break;
  2118. case DRMreplicateSecondary: // doesn't work for multi copies
  2119. setSource(fd, 1);
  2120. setTarget(fd, 0);
  2121. break;
  2122. case DRMcreateMissing: // this does though (but I am not sure works with mult-files)
  2123. gatherMissingSourceTarget(fd);
  2124. break;
  2125. }
  2126. isSafeMode = false;
  2127. mirroring = true;
  2128. replicate = true;
  2129. //Optimize replicating compressed - copy it raw, but it means we can't check the input crc
  2130. assertex(compressOutput == compressedInput);
  2131. if (compressedInput)
  2132. setCopyCompressedRaw();
  2133. }
  2134. void FileSprayer::setTarget(IDistributedFile * target)
  2135. {
  2136. distributedTarget.set(target);
  2137. compressOutput = !encryptKey.isEmpty()||target->isCompressed();
  2138. if (tgtFormat.restore(&target->queryAttributes()))
  2139. unknownTargetFormat = false;
  2140. else
  2141. {
  2142. tgtFormat.set(srcFormat);
  2143. if (!unknownSourceFormat)
  2144. {
  2145. DistributedFilePropertyLock lock(target);
  2146. IPropertyTree &curProps = lock.queryAttributes();
  2147. tgtFormat.save(&curProps);
  2148. }
  2149. }
  2150. unsigned copy = 0;
  2151. unsigned numParts = target->numParts();
  2152. if (numParts == 0)
  2153. throwError(DFTERR_NoPartsInDestination);
  2154. for (unsigned idx=0; idx < numParts; idx++)
  2155. {
  2156. Owned<IDistributedFilePart> curPart(target->getPart(idx));
  2157. RemoteFilename rfn;
  2158. TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy));
  2159. targets.append(next);
  2160. }
  2161. }
  2162. void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
  2163. {
  2164. if (tgtFormat.restore(&target->queryProperties()))
  2165. unknownTargetFormat = false;
  2166. else
  2167. tgtFormat.set(srcFormat);
  2168. compressOutput = !encryptKey.isEmpty()||target->isCompressed();
  2169. unsigned numParts = target->numParts();
  2170. if (numParts == 0)
  2171. throwError(DFTERR_NoPartsInDestination);
  2172. RemoteFilename filename;
  2173. for (unsigned idx=0; idx < numParts; idx++)
  2174. {
  2175. target->getFilename(idx, copy, filename);
  2176. targets.append(*new TargetLocation(filename));
  2177. }
  2178. }
  2179. void FileSprayer::updateProgress(const OutputProgress & newProgress)
  2180. {
  2181. CriticalBlock block(soFarCrit);
  2182. lastProgressTick = msTick();
  2183. OutputProgress & curProgress = progress.item(newProgress.whichPartition);
  2184. totalLengthRead += (newProgress.inputLength - curProgress.inputLength);
  2185. curProgress.set(newProgress);
  2186. if (curProgress.tree)
  2187. curProgress.save(curProgress.tree);
  2188. if (newProgress.status != OutputProgress::StatusRenamed)
  2189. updateSizeRead();
  2190. }
  2191. void FileSprayer::updateSizeRead()
  2192. {
  2193. if (progressDone)
  2194. return;
  2195. unsigned nowTick = msTick();
  2196. //MORE: This call shouldn't need to be done so often...
  2197. unsigned __int64 sizeReadSoFar = getSizeReadAlready();
  2198. bool done = sizeReadSoFar == sizeToBeRead;
  2199. if (progressReport)
  2200. {
  2201. // A cheat to get 100% saying all the slaves have completed - should really
  2202. // pass completed information in the progress info, or return the last progress
  2203. // info when a slave is done.
  2204. unsigned numCompleted = (sizeReadSoFar == sizeToBeRead) ? transferSlaves.ordinality() : numSlavesCompleted;
  2205. if (done || (nowTick - lastOperatorTick >= operatorUpdateFrequency))
  2206. {
  2207. progressReport->onProgress(sizeReadSoFar, sizeToBeRead, numCompleted);
  2208. lastOperatorTick = nowTick;
  2209. progressDone = done;
  2210. }
  2211. }
  2212. if (allowRecovery && recoveryConnection)
  2213. {
  2214. if (done || (nowTick - lastSDSTick >= sdsUpdateFrequency))
  2215. {
  2216. recoveryConnection->commit();
  2217. lastSDSTick = nowTick;
  2218. progressDone = done;
  2219. }
  2220. }
  2221. }
  2222. void FileSprayer::waitForTransferSem(Semaphore & sem)
  2223. {
  2224. while (!sem.wait(EXPECTED_RESPONSE_TIME))
  2225. {
  2226. unsigned timeSinceProgress = msTick() - lastProgressTick;
  2227. if (timeSinceProgress > EXPECTED_RESPONSE_TIME)
  2228. {
  2229. LOG(MCwarning, unknownJob, "No response from any slaves in last %d seconds.", timeSinceProgress/1000);
  2230. CriticalBlock block(soFarCrit);
  2231. StringBuffer list;
  2232. ForEachItemIn(i, transferSlaves)
  2233. transferSlaves.item(i).logIfRunning(list);
  2234. if (timeSinceProgress>RESPONSE_TIME_TIMEOUT)
  2235. {
  2236. //Set an error - the transfer threads will check it after a couple of minutes, and then terminate gracefully
  2237. CriticalBlock lock(errorCS);
  2238. if (!error)
  2239. error.setown(MakeStringException(RFSERR_TimeoutWaitSlave, RFSERR_TimeoutWaitSlave_Text, list.str()));
  2240. }
  2241. }
  2242. }
  2243. }
  2244. void FileSprayer::addTarget(unsigned idx, INode * node)
  2245. {
  2246. RemoteFilename filename;
  2247. filename.set(sources.item(idx).filename);
  2248. filename.setEp(node->endpoint());
  2249. targets.append(* new TargetLocation(filename));
  2250. }
  2251. bool FileSprayer::isAborting()
  2252. {
  2253. if (aborting || error)
  2254. return true;
  2255. unsigned nowTick = msTick();
  2256. if (abortChecker && (nowTick - lastAbortCheckTick >= abortCheckFrequency))
  2257. {
  2258. if (abortChecker->abortRequested())
  2259. {
  2260. LOG(MCdebugInfo, unknownJob, "Abort requested via callback");
  2261. aborting = true;
  2262. }
  2263. lastAbortCheckTick = nowTick;
  2264. }
  2265. return aborting || error;
  2266. }
  2267. const char * FileSprayer::querySplitPrefix()
  2268. {
  2269. const char * ret = options->queryProp(ANsplitPrefix);
  2270. if (ret && *ret)
  2271. return ret;
  2272. return NULL;
  2273. }
  2274. const char * FileSprayer::querySlaveExecutable(const IpAddress &ip, StringBuffer &ret)
  2275. {
  2276. const char * slave = queryFixedSlave();
  2277. try {
  2278. queryFtSlaveExecutable(ip, ret);
  2279. if (ret.length())
  2280. return ret.str();
  2281. }
  2282. catch (IException * e) {
  2283. if (!slave||!*slave)
  2284. throw;
  2285. e->Release();
  2286. }
  2287. if (slave)
  2288. ret.append(slave);
  2289. return ret.str();
  2290. }
  2291. const char * FileSprayer::queryFixedSlave()
  2292. {
  2293. return options->queryProp("@slave");
  2294. }
  2295. void FileSprayer::setTarget(IGroup * target)
  2296. {
  2297. tgtFormat.set(srcFormat);
  2298. if (sources.ordinality() != target->ordinality())
  2299. throwError(DFTERR_ReplicateNumPartsDiffer);
  2300. ForEachItemIn(idx, sources)
  2301. addTarget(idx, &target->queryNode(idx));
  2302. }
  2303. void FileSprayer::setTarget(INode * target)
  2304. {
  2305. tgtFormat.set(srcFormat);
  2306. if (sources.ordinality() != 1)
  2307. throwError(DFTERR_ReplicateNumPartsDiffer);
  2308. addTarget(0, target);
  2309. }
  2310. inline bool nonempty(IPropertyTree *pt, const char *p) { const char *s = pt->queryProp(p); return s&&*s; }
  2311. bool FileSprayer::disallowImplicitReplicate()
  2312. {
  2313. return options->getPropBool(ANsplit) ||
  2314. querySplitPrefix() ||
  2315. nonempty(options,"@header") ||
  2316. nonempty(options,"@footer") ||
  2317. nonempty(options,"@glue") ||
  2318. nonempty(options,ANprefix) ||
  2319. nonempty(options,ANencryptKey) ||
  2320. nonempty(options,ANdecryptKey);
  2321. }
  2322. void FileSprayer::spray()
  2323. {
  2324. if (!allowSplit() && querySplitPrefix())
  2325. throwError(DFTERR_SplitNoSplitClash);
  2326. LocalAbortHandler localHandler(daftAbortHandler);
  2327. if (allowRecovery && progressTree->getPropBool(ANcomplete))
  2328. {
  2329. LOG(MCdebugInfo, job, "Command completed successfully in previous invocation");
  2330. return;
  2331. }
  2332. checkFormats();
  2333. checkForOverlap();
  2334. progressTree->setPropBool(ANpull, usePullOperation());
  2335. const char * splitPrefix = querySplitPrefix();
  2336. bool pretendreplicate = false;
  2337. if (!replicate && (sources.ordinality() == targets.ordinality()))
  2338. {
  2339. if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate()) {
  2340. pretendreplicate = true;
  2341. replicate = true;
  2342. }
  2343. }
  2344. if (compressOutput&&!replicate) {
  2345. PROGLOG("Compress output forcing pull");
  2346. options->setPropBool(ANpull, true);
  2347. allowRecovery = false;
  2348. }
  2349. gatherFileSizes(true);
  2350. if (!replicate||pretendreplicate)
  2351. analyseFileHeaders(!pretendreplicate); // if pretending replicate don't want to remove headers
  2352. afterGatherFileSizes();
  2353. if (compressOutput && !usePullOperation() && !replicate)
  2354. throwError(DFTERR_CannotPushAndCompress);
  2355. if (restorePartition())
  2356. {
  2357. LOG(MCdebugProgress, job, "Partition restored from recovery information");
  2358. }
  2359. else
  2360. {
  2361. LOG(MCdebugProgress, job, "Calculate partition information");
  2362. if (replicate)
  2363. calculateOne2OnePartition();
  2364. else if (!allowSplit())
  2365. calculateNoSplitPartition();
  2366. else if (splitPrefix && *splitPrefix)
  2367. calculateSplitPrefixPartition(splitPrefix);
  2368. else if ((targets.ordinality() == 1) && srcFormat.equals(tgtFormat))
  2369. calculateMany2OnePartition();
  2370. else
  2371. calculateSprayPartition();
  2372. if (partition.ordinality() > PARTITION_RECOVERY_LIMIT)
  2373. allowRecovery = false;
  2374. savePartition();
  2375. }
  2376. assignPartitionFilenames(); // assign source filenames - used in insertHeaders..
  2377. if (!replicate)
  2378. insertHeaders();
  2379. addEmptyFilesToPartition();
  2380. derivePartitionExtra();
  2381. displayPartition();
  2382. if (isRecovering)
  2383. displayProgress(progress);
  2384. throwExceptionIfAborting();
  2385. beforeTransfer();
  2386. if (usePullOperation())
  2387. pullParts();
  2388. else
  2389. pushParts();
  2390. afterTransfer();
  2391. //If got here then we have succeeded
  2392. updateTargetProperties();
  2393. StringBuffer copyEventText; // [logical-source] > [logical-target]
  2394. if (distributedSource)
  2395. copyEventText.append(distributedSource->queryLogicalName());
  2396. copyEventText.append(">");
  2397. if (distributedTarget && distributedTarget->queryLogicalName())
  2398. copyEventText.append(distributedTarget->queryLogicalName());
  2399. //MORE: use new interface to send 'file copied' event
  2400. //LOG(MCevent, unknownJob, EVENT_FILECOPIED, copyEventText.str());
  2401. cleanupRecovery();
  2402. }
  2403. void FileSprayer::updateTargetProperties()
  2404. {
  2405. Owned<IException> error;
  2406. if (distributedTarget)
  2407. {
  2408. StringBuffer failedParts;
  2409. CRC32Merger partCRC;
  2410. offset_t partLength = 0;
  2411. CRC32Merger totalCRC;
  2412. offset_t totalLength = 0;
  2413. ForEachItemIn(idx, partition)
  2414. {
  2415. PartitionPoint & cur = partition.item(idx);
  2416. OutputProgress & curProgress = progress.item(idx);
  2417. partCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
  2418. totalCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
  2419. offset_t physPartLength = curProgress.outputLength;
  2420. if (copyCompressed) {
  2421. FilePartInfo & curSource = sources.item(cur.whichInput);
  2422. partLength = curSource.size;
  2423. totalLength += partLength;
  2424. }
  2425. else {
  2426. partLength += curProgress.outputLength; // AFAICS this might as well be =
  2427. totalLength += curProgress.outputLength;
  2428. }
  2429. if (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput)
  2430. {
  2431. Owned<IDistributedFilePart> curPart = distributedTarget->getPart(cur.whichOutput);
  2432. // TODO: Create DistributedFilePropertyLock for parts
  2433. curPart->lockProperties();
  2434. IPropertyTree& curProps = curPart->queryAttributes();
  2435. if (calcCRC())
  2436. {
  2437. curProps.setPropInt(FAcrc, partCRC.get());
  2438. if (cur.whichInput != (unsigned)-1)
  2439. {
  2440. FilePartInfo & curSource = sources.item(cur.whichInput);
  2441. if (replicate && curSource.hasCRC)
  2442. {
  2443. if ((partCRC.get() != curSource.crc)&&(compressedInput==compressOutput)) // if expanding will be different!
  2444. {
  2445. if (failedParts.length())
  2446. failedParts.append(", ");
  2447. else
  2448. failedParts.append("Output CRC failed to match expected: ");
  2449. curSource.filename.getPath(failedParts);
  2450. failedParts.appendf("(%x,%x)",partCRC.get(),curSource.crc);
  2451. }
  2452. }
  2453. }
  2454. }
  2455. else if (compressOutput || copyCompressed)
  2456. curProps.setPropInt(FAcrc, (int)COMPRESSEDFILECRC);
  2457. if (copyCompressed) // don't know if just compress
  2458. curProps.setPropInt64(FAcompressedSize, physPartLength);
  2459. curProps.setPropInt64(FAsize, partLength);
  2460. TargetLocation & curTarget = targets.item(cur.whichOutput);
  2461. if (!curTarget.modifiedTime.isNull())
  2462. {
  2463. CDateTime temp;
  2464. StringBuffer timestr;
  2465. temp.set(curTarget.modifiedTime);
  2466. unsigned hour, min, sec, nanosec;
  2467. temp.getTime(hour, min, sec, nanosec);
  2468. temp.setTime(hour, min, sec, 0);
  2469. curProps.setProp("@modified", temp.getString(timestr).str());
  2470. }
  2471. if (replicate && (distributedSource != distributedTarget) )
  2472. {
  2473. assertex(cur.whichInput != (unsigned)-1);
  2474. FilePartInfo & curSource = sources.item(cur.whichInput);
  2475. if (curSource.properties)
  2476. {
  2477. Owned<IAttributeIterator> aiter = curSource.properties->getAttributes();
  2478. //At the moment only clone the topLevelKey indicator (stored in kind), but make it easy to add others.
  2479. ForEach(*aiter) {
  2480. const char *aname = aiter->queryName();
  2481. if (strieq(aname,"@kind")
  2482. ) {
  2483. if (!curProps.hasProp(aname))
  2484. curProps.setProp(aname,aiter->queryValue());
  2485. }
  2486. }
  2487. }
  2488. }
  2489. curPart->unlockProperties();
  2490. partCRC.clear();
  2491. partLength = 0;
  2492. }
  2493. }
  2494. if (failedParts.length())
  2495. error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));
  2496. DistributedFilePropertyLock lock(distributedTarget);
  2497. IPropertyTree &curProps = lock.queryAttributes();
  2498. if (calcCRC())
  2499. curProps.setPropInt(FAcrc, totalCRC.get());
  2500. curProps.setPropInt64(FAsize, totalLength);
  2501. unsigned rs = curProps.getPropInt(FArecordSize); // set by user
  2502. bool gotrc = false;
  2503. if (rs && (totalLength%rs == 0)) {
  2504. curProps.setPropInt64(FArecordCount,totalLength/(offset_t)rs);
  2505. gotrc = true;
  2506. }
  2507. if (srcAttr.get() && !mirroring) {
  2508. StringBuffer s;
  2509. // copy some attributes (do as iterator in case we want to change to *exclude* some
  2510. Owned<IAttributeIterator> aiter = srcAttr->getAttributes();
  2511. ForEach(*aiter) {
  2512. const char *aname = aiter->queryName();
  2513. if (!curProps.hasProp(aname)&&
  2514. ((stricmp(aname,"@job")==0)||
  2515. (stricmp(aname,"@workunit")==0)||
  2516. (stricmp(aname,"@description")==0)||
  2517. (stricmp(aname,"@eclCRC")==0)||
  2518. (stricmp(aname,"@formatCrc")==0)||
  2519. (stricmp(aname,"@owner")==0)||
  2520. ((stricmp(aname,FArecordCount)==0)&&!gotrc))
  2521. )
  2522. curProps.setProp(aname,aiter->queryValue());
  2523. }
  2524. // and simple (top level) elements
  2525. Owned<IPropertyTreeIterator> iter = srcAttr->getElements("*");
  2526. ForEach(*iter) {
  2527. curProps.addPropTree(iter->query().queryName(),createPTreeFromIPT(&iter->query()));
  2528. }
  2529. }
  2530. }
  2531. if (error)
  2532. throw error.getClear();
  2533. }
  2534. bool FileSprayer::usePullOperation()
  2535. {
  2536. if (!calcedPullPush)
  2537. {
  2538. calcedPullPush = true;
  2539. cachedUsePull = calcUsePull();
  2540. }
  2541. return cachedUsePull;
  2542. }
  2543. bool FileSprayer::canLocateSlaveForNode(const IpAddress &ip)
  2544. {
  2545. try
  2546. {
  2547. StringBuffer ret;
  2548. querySlaveExecutable(ip, ret);
  2549. return true;
  2550. }
  2551. catch (IException * e)
  2552. {
  2553. e->Release();
  2554. }
  2555. return false;
  2556. }
  2557. bool FileSprayer::calcUsePull()
  2558. {
  2559. if (allowRecovery && progressTree->hasProp(ANpull))
  2560. {
  2561. bool usePull = progressTree->getPropBool(ANpull);
  2562. LOG(MCdebugInfo, job, "Pull = %d from recovery", (int)usePull);
  2563. return usePull;
  2564. }
  2565. if (sources.ordinality() == 0)
  2566. return true;
  2567. if (options->getPropBool(ANpull, false))
  2568. {
  2569. LOG(MCdebugInfo, job, "Use pull since explicitly specified");
  2570. return true;
  2571. }
  2572. if (options->getPropBool(ANpush, false))
  2573. {
  2574. LOG(MCdebugInfo, job, "Use push since explicitly specified");
  2575. return false;
  2576. }
  2577. ForEachItemIn(idx2, sources)
  2578. {
  2579. if (!sources.item(idx2).canPush())
  2580. {
  2581. StringBuffer s;
  2582. sources.item(idx2).filename.queryIP().getIpText(s);
  2583. LOG(MCdebugInfo, job, "Use pull operation because %s cannot push", s.str());
  2584. return true;
  2585. }
  2586. }
  2587. if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
  2588. {
  2589. StringBuffer s;
  2590. sources.item(0).filename.queryIP().getIpText(s);
  2591. LOG(MCdebugInfo, job, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
  2592. return true;
  2593. }
  2594. ForEachItemIn(idx, targets)
  2595. {
  2596. if (!targets.item(idx).canPull())
  2597. {
  2598. StringBuffer s;
  2599. targets.item(idx).queryIP().getIpText(s);
  2600. LOG(MCdebugInfo, job, "Use push operation because %s cannot pull", s.str());
  2601. return false;
  2602. }
  2603. }
  2604. if (!canLocateSlaveForNode(targets.item(0).queryIP()))
  2605. {
  2606. StringBuffer s;
  2607. targets.item(0).queryIP().getIpText(s);
  2608. LOG(MCdebugInfo, job, "Use push operation because %s doesn't appear to have an ftslave", s.str());
  2609. return false;
  2610. }
  2611. //Use push if going to a single node.
  2612. if ((targets.ordinality() == 1) && (sources.ordinality() > 1))
  2613. {
  2614. LOG(MCdebugInfo, job, "Use push operation because going to a single node from many");
  2615. return false;
  2616. }
  2617. LOG(MCdebugInfo, job, "Use pull operation as default");
  2618. return true;
  2619. }
  2620. extern DALIFT_API IFileSprayer * createFileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * recoveryConnection, const char *wuid)
  2621. {
  2622. return new FileSprayer(_options, _progress, recoveryConnection, wuid);
  2623. }
  2624. /*
  2625. Parameters:
  2626. 1. A list of target locations (machine+drive?) (and possibly a number for each)
  2627. 2. A list of source locations [derived from logical file]
  2628. 3. Information on the source and target formats
  2629. 3. A mask for the parts that need to be copied. [recovery is special case of this]
  2630. Need to
  2631. a) Start servers on machines that cannot be accessed directly [have to be running anyway]
  2632. b) Work out how the file is going to be partioned
  2633. 1. Find out the sizes of all the files.
  2634. 2. Calculation partion points -
  2635. For each source file pass [thisoffset, totalsize, thissize, startPoint?], and returns a list of
  2636. numbered partion points.
  2637. Two calls: calcPartion() and retreivePartion() to allow for multithreading on variable length.
  2638. A. If variable length
  2639. Start servers on each of the source machines
  2640. Query each server for partion information (which walks file).
  2641. * If N->N copy don't need to calculate the partion, can do it one a 1:1 mapping.
  2642. E.g. copy variable to blocked format with one block per variable.
  2643. c) Save partion information for quick/consistent recovery
  2644. d) Start servers on each of the targets or source for push to non-accessible
  2645. e) Start pulling/pushing
  2646. Each saves flag when complete for recovery
  2647. */
  2648. //----------------------------------------------------------------------------
  2649. void testPartitions()
  2650. {
  2651. unsigned sizes[] = { 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  2652. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  2653. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  2654. 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
  2655. 10,
  2656. };
  2657. unsigned parts = _elements_in(sizes);
  2658. unsigned offsets[_elements_in(sizes)];
  2659. unsigned targetParts = 20;
  2660. unsigned recordSize = 20;
  2661. unsigned totalSize =0;
  2662. unsigned idx;
  2663. for (idx = 0; idx < parts; idx++)
  2664. {
  2665. offsets[idx] = totalSize;
  2666. totalSize += sizes[idx];
  2667. }
  2668. PartitionPointArray results;
  2669. for (idx = 0; idx < parts; idx++)
  2670. {
  2671. CFixedPartitioner partitioner(recordSize);
  2672. partitioner.setPartitionRange(totalSize, offsets[idx], sizes[idx], 0, targetParts);
  2673. partitioner.calcPartitions(NULL);
  2674. partitioner.getResults(results);
  2675. }
  2676. ForEachItemIn(idx2, results)
  2677. results.item(idx2).display();
  2678. }
  2679. /*
  2680. MORE:
  2681. * What about multiple parts for a source file - what should we do with them?
  2682. Ignore? Try if
  2683. * Pushing - how do we manage it?
  2684. A. Copy all at once.
  2685. 1. For simple non-translation easy to copy all at once.
  2686. 2. For others, could hook up a translator so it only calculates the target size.
  2687. Problem is it is a reasonably complex interaction with the partitioner.
  2688. Easier to implement, but not as efficient, as a separate pass.
  2689. - Optimize for variable to VBX.
  2690. B. Copy a chunk at a time
  2691. 1. The first source for each chunk write in parallel, followed by the next.
  2692. - okay if not all writing to a single large file.
  2693. * Unreachable machines
  2694. 1. Can I use message passing?
  2695. 2. Mock up + test code [ need multi threading access ].
  2696. 3. Implement an exists primitive.
  2697. 4. How do I distinguish machines?
  2698. * Main program needs to survive if slave nodes die.
  2699. * Asynchronus calls + avoiding the thread switching for notifications?
  2700. * Code for replicating parts
  2701. - set up as a copy from fixed1 to fixed1, which partition matching sources.
  2702. */