123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jliball.hpp"
- #include "platform.h"
- #include <algorithm>
- #include "jlib.hpp"
- #include "jio.hpp"
- #include <math.h>
- #include "jmutex.hpp"
- #include "jfile.hpp"
- #include "jsocket.hpp"
- #include "jdebug.hpp"
- #include "fterror.hpp"
- #include "dadfs.hpp"
- #include "rmtspawn.hpp"
- #include "filecopy.ipp"
- #include "jptree.hpp"
- #include "daft.hpp"
- #include "daftcfg.hpp"
- #include "fterror.hpp"
- #include "daftformat.hpp"
- #include "daftmc.hpp"
- #include "dasds.hpp"
- #include "jlog.hpp"
- #include "dalienv.hpp"
- #include "ftbase.ipp"
- #ifdef _CONTAINERIZED
- //Temporary see HPCC-25822
- inline bool canAccessFilesDirectly(const RemoteFilename & file)
- {
- if (file.queryEndpoint().port!=0)
- return false;
- const IpAddress & ip = file.queryIP();
- if (ip.isLocal()||ip.isNull()) // the isNull check is probably an error but saves time
- return true; // I think usually already checked, but another can't harm
- return false;
- }
- inline void setCanAccessDirectly(RemoteFilename & file)
- {
- setCanAccessDirectly(file,canAccessFilesDirectly(file));
- }
- #endif
- #define DEFAULT_MAX_CONNECTIONS 800
- #define PARTITION_RECOVERY_LIMIT 1000
- #define EXPECTED_RESPONSE_TIME (60 * 1000)
- #define RESPONSE_TIME_TIMEOUT (60 * 60 * 1000)
- #define DEFAULT_MAX_XML_RECORD_SIZE 0x100000
- //#define CLEANUP_RECOVERY
- //Use hash defines for properties so I can't mis-spell them....
- #define ANcomplete "@complete"
- #define ANcompress "@compress"
- #define ANcrc "@crc"
- #define ANcrcCheck "@crcCheck"
- #define ANcrcDiffers "@crcDiffers"
- #define ANdone "@done"
- #define ANhasPartition "@hasPartition"
- #define ANhasProgress "@hasProgress"
- #define ANhasRecovery "@hasRecovery"
- #define ANmaxConnections "@maxConnections"
- #define ANnocommon "@noCommon"
- #define ANnoRecover "@noRecover"
- #define ANnosplit "@nosplit"
- #define ANnosplit2 "@noSplit"
- #define ANprefix "@prefix"
- #define ANpull "@pull"
- #define ANpush "@push"
- #define ANsafe "@safe"
- #define ANsizedate "@sizedate"
- #define ANsplit "@split"
- #define ANsplitPrefix "@splitPrefix"
- #define ANthrottle "@throttle"
- #define ANverify "@verify"
- #define ANtransferBufferSize "@transferBufferSize"
- #define ANencryptKey "@encryptKey"
- #define ANdecryptKey "@decryptKey"
- #define ANumask "@umask"
- #define PNpartition "partition"
- #define PNprogress "progress"
- //File attributes
- #define FArecordSize "@recordSize"
- #define FArecordCount "@recordCount"
- #define FAformat "@format"
- #define FAcrc "@fileCrc"
- #define FAsize "@size"
- #define FAcompressedSize "@compressedSize"
- const unsigned operatorUpdateFrequency = 5000; // time between updates in ms
- const unsigned abortCheckFrequency = 20000; // time between updates in ms
- const unsigned sdsUpdateFrequency = 20000; // time between updates in ms
- const unsigned maxSlaveUpdateFrequency = 1000; // time between updates in ms - small number of nodes.
- const unsigned minSlaveUpdateFrequency = 5000; // time between updates in ms - large number of nodes.
- bool TargetLocation::canPull()
- {
- return queryOS(filename.queryIP()) != MachineOsSolaris;
- }
- //----------------------------------------------------------------------------
- FilePartInfo::FilePartInfo(const RemoteFilename & _filename)
- {
- filename.set(_filename);
- init();
- }
- FilePartInfo::FilePartInfo()
- {
- init();
- }
- bool FilePartInfo::canPush()
- {
- return queryOS(filename.queryIP()) != MachineOsSolaris;
- }
- void FilePartInfo::extractExtra(IPartDescriptor &part)
- {
- unsigned _crc;
- hasCRC = part.getCrc(_crc);
- if (hasCRC)
- crc = _crc;
- properties.set(&part.queryProperties());
- if (part.queryProperties().hasProp("@modified"))
- modifiedTime.setString(part.queryProperties().queryProp("@modified"));
- }
- void FilePartInfo::extractExtra(IDistributedFilePart &part)
- {
- unsigned _crc;
- hasCRC = part.getCrc(_crc);
- if (hasCRC)
- crc = _crc;
- properties.set(&part.queryAttributes());
- if (part.queryAttributes().hasProp("@modified"))
- modifiedTime.setString(part.queryAttributes().queryProp("@modified"));
- }
- void FilePartInfo::init()
- {
- offset = 0;
- size = UNKNOWN_PART_SIZE;
- psize = UNKNOWN_PART_SIZE;
- headerSize = 0;
- hasCRC = false;
- xmlHeaderLength = 0;
- xmlFooterLength = 0;
- }
- //----------------------------------------------------------------------------
- void shuffle(CIArray & array)
- {
- //Use our own seeded random number generator, so that multiple dfu at the same time are less likely to clash.
- Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
- random->seed(123456789);
- unsigned i = array.ordinality();
- while (i > 1)
- {
- unsigned j = random->next() % i;
- i--;
- array.swap(i, j);
- }
- }
- //----------------------------------------------------------------------------
- FileTransferThread::FileTransferThread(FileSprayer & _sprayer, byte _action, const SocketEndpoint & _ep, bool _calcCRC, const char *_wuid)
- : Thread("pullThread"), sprayer(_sprayer), wuid(_wuid)
- {
- calcCRC = _calcCRC;
- action = _action;
- ep.set(_ep);
- // progressInfo = _progressInfo;
- sem = NULL;
- ok = false;
- job = unknownJob;
- allDone = false;
- started = false;
- }
- void FileTransferThread::addPartition(PartitionPoint & nextPartition, OutputProgress & nextProgress)
- {
- partition.append(OLINK(nextPartition));
- progress.append(OLINK(nextProgress));
- }
- unsigned __int64 FileTransferThread::getInputSize()
- {
- unsigned __int64 inputSize = 0;
- ForEachItemIn(idx, partition)
- inputSize += partition.item(idx).inputLength;
- return inputSize;
- }
- void FileTransferThread::go(Semaphore & _sem)
- {
- sem = &_sem;
- if (partition.empty())
- transferAndSignal(); // do nothing, but don't start a new thread
- else
- {
- #ifdef RUN_SLAVES_ON_THREADS
- start();
- #else
- transferAndSignal();
- #endif
- }
- }
- bool FileTransferThread::isAborting()
- {
- return sprayer.isAborting() || ::isAborting();
- }
- void FileTransferThread::logIfRunning(StringBuffer &list)
- {
- if (started && !allDone && !error)
- {
- StringBuffer url;
- ep.getUrlStr(url);
- DBGLOG("Still waiting for slave %s", url.str());
- if (list.length())
- list.append(',');
- list.append(url);
- }
- }
- bool FileTransferThread::catchReadBuffer(ISocket * socket, MemoryBuffer & msg, unsigned timeout)
- {
- unsigned nowTime = msTick();
- unsigned abortCheckTimeout = 120*1000;
- for (;;)
- {
- try
- {
- readBuffer(socket, msg, abortCheckTimeout);
- return true;
- }
- catch (IException * e)
- {
- switch (e->errorCode())
- {
- case JSOCKERR_graceful_close:
- break;
- case JSOCKERR_timeout_expired:
- if (isAborting())
- break;
- if (msTick() - nowTime < timeout)
- {
- e->Release();
- continue;
- }
- break;
- default:
- EXCLOG(e,"FileTransferThread::catchReadBuffer");
- break;
- }
- e->Release();
- return false;
- }
- }
- }
- bool FileTransferThread::performTransfer()
- {
- bool ok = false;
- StringBuffer url;
- ep.getUrlStr(url);
- LOG(MCdebugProgress, job, "Transferring part %s [%p]", url.str(), this);
- started = true;
- allDone = true;
- if (sprayer.isSafeMode || action == FTactionpush)
- {
- ForEachItemIn(i, progress)
- {
- if (progress.item(i).status != OutputProgress::StatusCopied)
- allDone = false;
- }
- }
- else
- {
- unsigned whichOutput = (unsigned)-1;
- ForEachItemIn(i, progress)
- {
- PartitionPoint & curPartition = partition.item(i);
- OutputProgress & curProgress = progress.item(i);
- //pull should rename as well as copy the files.
- if (curPartition.whichOutput != whichOutput)
- {
- if (curProgress.status != OutputProgress::StatusRenamed)
- allDone = false;
- whichOutput = curPartition.whichOutput;
- }
- }
- }
- if (allDone)
- {
- LOG(MCdebugInfo, job, "Creation of part %s already completed", url.str());
- return true;
- }
- if (partition.empty())
- {
- LOG(MCdebugInfo, job, "No elements to transfer for this slave");
- return true;
- }
- LOG(MCdebugProgressDetail, job, "Start generate part %s [%p]", url.str(), this);
- StringBuffer tmp;
- Owned<ISocket> socket = spawnRemoteChild(SPAWNdfu, sprayer.querySlaveExecutable(ep, tmp), ep, DAFT_VERSION, queryFtSlaveLogDir(), this, wuid);
- if (socket)
- {
- MemoryBuffer msg;
- msg.setEndian(__BIG_ENDIAN);
- //MORE: Allow this to be configured by an option.
- unsigned slaveUpdateFrequency = minSlaveUpdateFrequency;
- if (sprayer.numParallelSlaves() < 5)
- slaveUpdateFrequency = maxSlaveUpdateFrequency;
- //Send message and wait for response...
- msg.append(action);
- // send 0 for password info that was in <= 7.6 versions
- unsigned zero = 0;
- msg.append(zero);
- ep.serialize(msg);
- sprayer.srcFormat.serialize(msg);
- sprayer.tgtFormat.serialize(msg);
- msg.append(sprayer.calcInputCRC());
- msg.append(calcCRC);
- serialize(partition, msg);
- msg.append(sprayer.numParallelSlaves());
- msg.append(slaveUpdateFrequency);
- msg.append(sprayer.replicate); // NB: controls whether FtSlave copies source timestamp
- msg.append(sprayer.mirroring);
- msg.append(sprayer.isSafeMode);
- msg.append(progress.ordinality());
- ForEachItemIn(i, progress)
- progress.item(i).serializeCore(msg);
- msg.append(sprayer.throttleNicSpeed);
- msg.append(sprayer.compressedInput);
- msg.append(sprayer.compressOutput);
- msg.append(sprayer.copyCompressed);
- msg.append(sprayer.transferBufferSize);
- msg.append(sprayer.encryptKey);
- msg.append(sprayer.decryptKey);
- sprayer.srcFormat.serializeExtra(msg, 1);
- sprayer.tgtFormat.serializeExtra(msg, 1);
- ForEachItemIn(i2, progress)
- progress.item(i2).serializeExtra(msg, 1);
- //NB: Any extra data must be appended at the end...
- msg.append(sprayer.fileUmask);
- if (!catchWriteBuffer(socket, msg))
- throwError1(RFSERR_TimeoutWaitConnect, url.str());
- bool done;
- for (;;)
- {
- msg.clear();
- if (!catchReadBuffer(socket, msg, FTTIME_PROGRESS))
- throwError1(RFSERR_TimeoutWaitSlave, url.str());
- msg.setEndian(__BIG_ENDIAN);
- msg.read(done);
- if (done)
- break;
- OutputProgress newProgress;
- newProgress.deserializeCore(msg);
- newProgress.deserializeExtra(msg, 1);
- sprayer.updateProgress(newProgress);
- LOG(MCdebugProgress(10000), job, "Update %s: %d %" I64F "d->%" I64F "d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);
- if (isAborting())
- {
- if (!sendRemoteAbort(socket))
- throwError1(RFSERR_TimeoutWaitSlave, url.str());
- }
- }
- msg.read(ok);
- setErrorOwn(deserializeException(msg));
- LOG(MCdebugProgressDetail, job, "Finished generating part %s [%p] ok(%d) error(%d)", url.str(), this, (int)ok, (int)(error!=NULL));
- msg.clear().append(true);
- catchWriteBuffer(socket, msg);
- if (sprayer.options->getPropInt("@fail", 0))
- throwError(DFTERR_CopyFailed);
- }
- else
- {
- throwError1(DFTERR_FailedStartSlave, url.str());
- }
- LOG(MCdebugProgressDetail, job, "Stopped generate part %s [%p]", url.str(), this);
- allDone = true;
- return ok;
- }
- void FileTransferThread::setErrorOwn(IException * e)
- {
- error.setown(e);
- if (error)
- sprayer.setError(ep, error);
- }
- bool FileTransferThread::transferAndSignal()
- {
- ok = false;
- if (!isAborting())
- {
- try
- {
- ok = performTransfer();
- }
- catch (IException * e)
- {
- FLLOG(MCexception(e), job, e, "Transferring files");
- setErrorOwn(e);
- }
- }
- sem->signal();
- return ok;
- }
- int FileTransferThread::run()
- {
- transferAndSignal();
- return 0;
- }
- //----------------------------------------------------------------------------
- FileSizeThread::FileSizeThread(FilePartInfoArray & _queue, CriticalSection & _cs, bool _isCompressed, bool _errorIfMissing) : Thread("fileSizeThread"), queue(_queue), cs(_cs)
- {
- isCompressed = _isCompressed;
- errorIfMissing = _errorIfMissing;
- }
- bool FileSizeThread::wait(unsigned timems)
- {
- while (!sem.wait(timems))
- { // report every time
- StringBuffer rfn;
- {
- CriticalBlock lock(cs);
- if (cur.get())
- {
- if (copy)
- {
- if (!cur->mirrorFilename.isNull())
- cur->mirrorFilename.getRemotePath(rfn);
- }
- else
- {
- cur->filename.getRemotePath(rfn);
- }
- }
- }
- if (!rfn.isEmpty())
- {
- OWARNLOG("Waiting for file: %s",rfn.str());
- return false;
- }
- }
- sem.signal(); // if called again
- return true;
- }
- int FileSizeThread::run()
- {
- try
- {
- RemoteFilename remoteFilename;
- for (;;)
- {
- {
- CriticalBlock lock(cs);
- cur.clear();
- if (queue.ordinality())
- cur.setown(&queue.popGet());
- }
- if (!cur.get())
- break;
- copy=0;
- for (copy = 0;copy<2;copy++)
- {
- if (copy)
- {
- if (cur->mirrorFilename.isNull())
- continue; // not break
- remoteFilename.set(cur->mirrorFilename);
- }
- else
- remoteFilename.set(cur->filename);
- OwnedIFile thisFile = createIFile(remoteFilename);
- offset_t thisSize = thisFile->size();
- if (thisSize == -1)
- {
- if (errorIfMissing)
- {
- StringBuffer s;
- throwError1(DFTERR_CouldNotOpenFile, remoteFilename.getRemotePath(s).str());
- }
- continue;
- }
- cur->psize = thisSize;
- if (isCompressed)
- {
- Owned<IFileIO> io = createCompressedFileReader(thisFile); //check succeeded?
- if (!io)
- {
- StringBuffer s;
- throwError1(DFTERR_CouldNotOpenCompressedFile, remoteFilename.getRemotePath(s).str());
- }
- thisSize = io->size();
- }
- cur->size = thisSize;
- break;
- }
- if (copy==1)
- { // need to set primary
- CriticalBlock lock(cs);
- cur->mirrorFilename.set(cur->filename);
- cur->filename.set(remoteFilename);
- }
- }
- }
- catch (IException * e)
- {
- error.setown(e);
- }
- sem.signal();
- return 0;
- }
- //----------------------------------------------------------------------------
- FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * _recoveryConnection, const char *_wuid)
- : wuid(_wuid), fileSprayerAbortChecker(*this)
- {
- totalSize = 0;
- replicate = false;
- copySource = false;
- unknownSourceFormat = true;
- unknownTargetFormat = true;
- progressTree.set(_progress);
- recoveryConnection = _recoveryConnection;
- options.set(_options);
- if (!options)
- options.setown(createPTree());
- if (!progressTree)
- progressTree.setown(createPTree("progress", ipt_caseInsensitive));
- //split prefix messes up recovery because the target filenames aren't saved in the recovery info.
- allowRecovery = !options->getPropBool(ANnoRecover) && !querySplitPrefix();
- isRecovering = allowRecovery && progressTree->hasProp(ANhasProgress);
- isSafeMode = options->getPropBool(ANsafe);
- job = unknownJob;
- progressReport = NULL;
- abortChecker = NULL;
- sizeToBeRead = 0;
- calcedPullPush = false;
- mirroring = false;
- lastAbortCheckTick = lastSDSTick = lastOperatorTick = msTick();
- calcedInputCRC = false;
- aborting = false;
- totalLengthRead = 0;
- totalNumReads = 0;
- totalNumWrites = 0;
- throttleNicSpeed = 0;
- compressedInput = false;
- compressOutput = options->getPropBool(ANcompress);
- copyCompressed = false;
- transferBufferSize = options->getPropInt(ANtransferBufferSize);
- if (transferBufferSize)
- LOG(MCdebugProgressDetail, job, "Using transfer buffer size %d", transferBufferSize);
- else // zero is default
- transferBufferSize = DEFAULT_STD_BUFFER_SIZE;
- progressDone = false;
- encryptKey.set(options->queryProp(ANencryptKey));
- decryptKey.set(options->queryProp(ANdecryptKey));
- fileUmask = -1;
- const char *umaskStr = options->queryProp(ANumask);
- if (umaskStr)
- {
- char *eptr = NULL;
- errno = 0;
- fileUmask = (int)strtol(umaskStr, &eptr, 8);
- if (errno || *eptr != '\0')
- {
- LOG(MCdebugInfo, job, "Invalid umask value <%s> ignored", umaskStr);
- fileUmask = -1;
- }
- else
- {
- // never strip off owner
- fileUmask &= 077;
- }
- }
- }
- class AsyncAfterTransfer : public CAsyncFor
- {
- public:
- AsyncAfterTransfer(FileSprayer & _sprayer) : sprayer(_sprayer) {}
- virtual void Do(unsigned idxTarget)
- {
- TargetLocation & cur = sprayer.targets.item(idxTarget);
- if (!sprayer.filter || sprayer.filter->includePart(idxTarget))
- {
- RemoteFilename & targetFilename = cur.filename;
- if (sprayer.isSafeMode)
- {
- OwnedIFile file = createIFile(targetFilename);
- file->remove();
- }
- renameDfuTempToFinal(targetFilename);
- if (sprayer.replicate && !sprayer.mirroring)
- {
- OwnedIFile file = createIFile(targetFilename);
- file->setTime(NULL, &cur.modifiedTime, NULL);
- }
- else if (cur.modifiedTime.isNull())
- {
- OwnedIFile file = createIFile(targetFilename);
- file->getTime(NULL, &cur.modifiedTime, NULL);
- }
- }
- }
- protected:
- FileSprayer & sprayer;
- };
- void FileSprayer::addEmptyFilesToPartition(unsigned from, unsigned to)
- {
- for (unsigned i = from; i < to ; i++)
- {
- LOG(MCdebugProgressDetail, job, "Insert a dummy entry for target %d", i);
- PartitionPoint & next = createLiteral(0, NULL, 0);
- next.whichOutput = i;
- partition.append(next);
- }
- }
- void FileSprayer::addEmptyFilesToPartition()
- {
- unsigned lastOutput = (unsigned)-1;;
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & cur = partition.item(idx);
- if (cur.whichOutput != lastOutput)
- {
- if (cur.whichOutput != lastOutput+1)
- addEmptyFilesToPartition(lastOutput+1, cur.whichOutput);
- lastOutput = cur.whichOutput;
- }
- }
- if (lastOutput != targets.ordinality()-1)
- addEmptyFilesToPartition(lastOutput+1, targets.ordinality());
- }
- void FileSprayer::afterTransfer()
- {
- if (calcInputCRC())
- {
- LOG(MCdebugProgressDetail, job, "Checking input CRCs");
- CRC32Merger partCRC;
- unsigned startCurSource = 0;
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & curPartition = partition.item(idx);
- OutputProgress & curProgress = progress.item(idx);
- if (!curProgress.hasInputCRC)
- {
- LOG(MCdebugProgressDetail, job, "Could not calculate input CRCs - cannot check");
- break;
- }
- partCRC.addChildCRC(curProgress.inputLength, curProgress.inputCRC, false);
- StringBuffer errorText;
- bool failed = false;
- UnsignedArray failedOutputs;
- if (idx+1 == partition.ordinality() || partition.item(idx+1).whichInput != curPartition.whichInput)
- {
- FilePartInfo & curSource = sources.item(curPartition.whichInput);
- if (curSource.crc != partCRC.get())
- {
- StringBuffer name;
- if (!failed)
- errorText.append("Input CRCs do not match for part ");
- else
- errorText.append(", ");
- curSource.filename.getPath(errorText);
- failed = true;
- //Need to copy anything that involves this part of the file again.
- //pulling it will be the whole file, if pushing we can cope with single parts
- //in the middle of the partition.
- for (unsigned i = startCurSource; i <= idx; i++)
- {
- OutputProgress & cur = progress.item(i);
- cur.reset();
- if (cur.tree)
- cur.save(cur.tree);
- unsigned out = partition.item(i).whichOutput;
- if (failedOutputs.find(out) == NotFound)
- failedOutputs.append(out);
- }
- }
- partCRC.clear();
- startCurSource = idx+1;
- //If copying m to n, and not splitting, there may be some dummy text entries (containing nothing) on the end.
- //if so skip them, otherwise you'll get crc errors on part 1
- if (partition.isItem(startCurSource) && (partition.item(startCurSource).whichInput == 0))
- idx = partition.ordinality()-1;
- }
- if (failed)
- {
- if (usePullOperation())
- {
- //Need to clear progress for any partitions that copy to the same target file
- //However, need to do it after the crc checking, otherwise it will generate more errors...
- ForEachItemIn(idx, partition)
- {
- if (failedOutputs.find(partition.item(idx).whichOutput) != NotFound)
- {
- OutputProgress & cur = progress.item(idx);
- cur.reset();
- if (cur.tree)
- cur.save(cur.tree);
- }
- }
- }
- if (recoveryConnection)
- recoveryConnection->commit();
- throw MakeStringException(DFTERR_InputCrcMismatch, "%s", errorText.str());
- }
- }
- }
- //For safe mode and push mode the temporary files need to be renamed once everything has completed.
- if (isSafeMode || usePushOperation())
- {
- unsigned numTargets = targets.ordinality();
- AsyncAfterTransfer async(*this);
- async.For(numTargets, (unsigned)sqrt((float)numTargets));
- }
- else
- {
- ForEachItemIn(idx, progress)
- {
- OutputProgress & curProgress = progress.item(idx);
- if (!curProgress.resultTime.isNull())
- targets.item(partition.item(idx).whichOutput).modifiedTime.set(curProgress.resultTime);
- }
- }
- }
- bool FileSprayer::allowSplit()
- {
- return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
- }
- void FileSprayer::assignPartitionFilenames()
- {
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & cur = partition.item(idx);
- if (cur.whichInput != (unsigned)-1)
- {
- cur.inputName.set(sources.item(cur.whichInput).filename);
- setCanAccessDirectly(cur.inputName);
- }
- cur.outputName.set(targets.item(cur.whichOutput).filename);
- setCanAccessDirectly(cur.outputName);
- // NB: partition (cur) is serialized to ftslave and it's this modifiedTime is used if present
- if (replicate)
- cur.modifiedTime.set(targets.item(cur.whichOutput).modifiedTime);
- }
- }
- class CheckExists : public CAsyncFor
- {
- public:
- CheckExists(TargetLocationArray & _targets, IDFPartFilter * _filter) : targets(_targets) { filter = _filter; }
- virtual void Do(unsigned idx)
- {
- if (!filter || filter->includePart(idx))
- {
- const RemoteFilename & cur = targets.item(idx).filename;
- OwnedIFile file = createIFile(cur);
- if (file->exists())
- {
- StringBuffer s;
- throwError1(DFTERR_PhysicalExistsNoOverwrite, cur.getRemotePath(s).str());
- }
- }
- }
- public:
- TargetLocationArray & targets;
- IDFPartFilter * filter;
- };
- void FileSprayer::beforeTransfer()
- {
- if (!isRecovering && !options->getPropBool("@overwrite", true))
- {
- CheckExists checker(targets, filter);
- checker.For(targets.ordinality(), 25, true, true);
- }
- if (!isRecovering && writeFromMultipleSlaves())
- {
- try {
- //Should this be on an option. Shouldn't be too inefficient since push is seldom used.
- ForEachItemIn(idx2, targets)
- {
- if (!filter || filter->includePart(idx2))
- {
- //MORE: This does not cope with creating directories on a solaris machine.
- StringBuffer remoteFilename, remoteDirectory;
- targets.item(idx2).filename.getRemotePath(remoteFilename);
- splitUNCFilename(remoteFilename.str(), &remoteDirectory, &remoteDirectory, NULL, NULL);
- Owned<IFile> dir = createIFile(remoteDirectory.str());
- if (!dir->exists())
- {
- dir->createDirectory();
- if (fileUmask != -1)
- dir->setFilePermissions(~fileUmask&0777);
- }
- }
- }
- }
- catch (IException *e) {
- FLLOG(MCexception(e), job, e, "Creating Directory");
- e->Release();
- LOG(MCdebugInfo, job, "Ignoring create directory error");
- }
- // If pushing files, and not recovering, then need to delete the target files, because the slaves might be writing in any order
- // for pull, the slave deletes it when creating the file.
- unsigned curPartition = 0;
- ForEachItemIn(idxTarget, targets)
- {
- if (!filter || filter->includePart(idxTarget))
- {
- if (!isSafeMode)
- {
- OwnedIFile file = createIFile(targets.item(idxTarget).filename);
- file->remove();
- }
- //unsigned firstPartition = curPartition;
- while (partition.isItem(curPartition+1) && partition.item(curPartition+1).whichOutput == idxTarget)
- curPartition++;
- //MORE: If 1:N mapping then don't extend to the maximum length - it is a waste of time, and messes up
- //And should generate the file header on the push machine - would always be more efficient.
- //Possibly conditional on whether it is worth pre-extending on the target os.
- //if (curPartition == firstPartition)
- // continue;
- PartitionPoint & lastPartition = partition.item(curPartition);
- offset_t lastOutputOffset = lastPartition.outputOffset + lastPartition.outputLength;
- RemoteFilename remote;
- getDfuTempName(remote, targets.item(idxTarget).filename);
- OwnedIFile file = createIFile(remote);
- OwnedIFileIO io = file->open(IFOcreate);
- if (!io)
- {
- StringBuffer name;
- remote.getPath(name);
- throwError1(DFTERR_CouldNotCreateOutput, name.str());
- }
- if (fileUmask != -1)
- file->setFilePermissions(~fileUmask&0666);
- //Create the headers on the utf files.
- unsigned headerSize = getHeaderSize(tgtFormat.type);
- if (headerSize)
- io->write(0, headerSize, getHeaderText(tgtFormat.type));
- if ((lastOutputOffset != 0)&&!compressOutput)
- {
- char null = 0;
- io->write(lastOutputOffset-sizeof(null), sizeof(null), &null);
- }
- }
- }
- }
- throttleNicSpeed = options->getPropInt(ANthrottle, 0);
- #ifndef _CONTAINERIZED
- //MORE: This is very old windows support code. We could add support for per-plane throttling if it is required.
- if (throttleNicSpeed == 0 && !usePullOperation() && targets.ordinality() == 1 && sources.ordinality() > 1)
- {
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- StringBuffer ipText;
- targets.item(0).filename.queryIP().getIpText(ipText);
- Owned<IConstMachineInfo> machine = env->getMachineByAddress(ipText.str());
- if (machine)
- {
- if (machine->getOS() == MachineOsW2K)
- {
- throttleNicSpeed = machine->getNicSpeedMbitSec();
- LOG(MCdebugInfo, job, "Throttle target speed to %dMbit/sec", throttleNicSpeed);
- }
- }
- }
- #endif
- }
- bool FileSprayer::calcCRC()
- {
- return options->getPropBool(ANcrc, true) && !compressOutput && !copyCompressed;
- }
- bool FileSprayer::calcInputCRC()
- {
- if (!calcedInputCRC)
- {
- calcedInputCRC = true;
- cachedInputCRC = false;
- if (options->getPropBool(ANcrcCheck, true) && !compressedInput)
- {
- ForEachItemIn(idx, sources)
- {
- if (!sources.item(idx).hasCRC)
- return cachedInputCRC;
- }
- cachedInputCRC = true;
- //If keeping headers then we lose bits of the input files, so they can't be crc checked.
- bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
- if (options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag && sources.ordinality() > 1)
- cachedInputCRC = false;
- if (querySplitPrefix())
- cachedInputCRC = false;
- }
- }
- return cachedInputCRC;
- }
- void FileSprayer::calculateOne2OnePartition()
- {
- LOG(MCdebugProgressDetail, job, "Setting up one2One partition");
- if (sources.ordinality() != targets.ordinality())
- throwError(DFTERR_ReplicateNumPartsDiffer);
- if (!srcFormat.equals(tgtFormat))
- throwError(DFTERR_ReplicateSameFormat);
- if (compressedInput && compressOutput && (strcmp(encryptKey.str(),decryptKey.str())==0))
- setCopyCompressedRaw();
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- RemoteFilename curFilename;
- curFilename.set(cur.filename);
- setCanAccessDirectly(curFilename);
- partition.append(*new PartitionPoint(idx, idx, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
- targets.item(idx).modifiedTime.set(cur.modifiedTime);
- }
- if (srcFormat.isCsv())
- examineCsvStructure();
- }
- class AsyncExtractBlobInfo : public CAsyncFor
- {
- friend class FileSprayer;
- public:
- AsyncExtractBlobInfo(const char * _splitPrefix, FileSprayer & _sprayer) : sprayer(_sprayer)
- {
- extracted = new ExtractedBlobArray[sprayer.sources.ordinality()];
- splitPrefix = _splitPrefix;
- }
- ~AsyncExtractBlobInfo()
- {
- delete [] extracted;
- }
- virtual void Do(unsigned i)
- {
- if (!sprayer.sources.item(i).filename.isLocal()) {
- try {
- remoteExtractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
- return;
- }
- catch (IException *e) {
- StringBuffer path;
- StringBuffer err;
- OWARNLOG("dafilesrv ExtractBlobElements(%s) failed with: %s",
- sprayer.sources.item(i).filename.getPath(path).str(),
- e->errorMessage(err).str());
- PROGLOG("Trying direct access (this may be slow)");
- e->Release();
- }
- }
- // try local
- extractBlobElements(splitPrefix, sprayer.sources.item(i).filename, extracted[i]);
- }
- protected:
- FileSprayer & sprayer;
- const char * splitPrefix;
- ExtractedBlobArray * extracted;
- };
- void FileSprayer::calculateSplitPrefixPartition(const char * splitPrefix)
- {
- if (targets.ordinality() != 1)
- throwError(DFTERR_SplitPrefixSingleTarget);
- if (!srcFormat.equals(tgtFormat))
- throwError(DFTERR_SplitPrefixSameFormat);
- LOG(MCdebugProgressDetail, job, "Setting up split prefix partition");
- Owned<TargetLocation> target = &targets.popGet(); // remove target, add lots of new ones
- RemoteFilename blobTarget;
- StringBuffer remoteTargetPath, remoteFilename;
- target->filename.getRemotePath(remoteTargetPath);
- char sepChar = target->filename.getPathSeparator();
- //Remove the tail name from the filename
- const char * temp = remoteTargetPath.str();
- remoteTargetPath.setLength(strrchr(temp, sepChar)-temp);
- AsyncExtractBlobInfo extractor(splitPrefix, *this);
- unsigned numSources = sources.ordinality();
- extractor.For(numSources, numParallelConnections(numSources), true, false);
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- ExtractedBlobArray & extracted = extractor.extracted[idx];
- ForEachItemIn(i, extracted)
- {
- ExtractedBlobInfo & curBlob = extracted.item(i);
- remoteFilename.clear().append(remoteTargetPath);
- addPathSepChar(remoteFilename, sepChar);
- remoteFilename.append(curBlob.filename);
- blobTarget.clear();
- blobTarget.setRemotePath(remoteFilename);
- targets.append(* new TargetLocation(blobTarget));
- partition.append(*new PartitionPoint(idx, targets.ordinality()-1, curBlob.offset, curBlob.length, curBlob.length));
- }
- }
- }
- void FileSprayer::calculateMany2OnePartition()
- {
- LOG(MCdebugProgressDetail, job, "Setting up many2one partition");
- const char *partSeparator = srcFormat.getPartSeparatorString();
- offset_t partSeparatorLength = ( partSeparator == nullptr ? 0 : strlen(partSeparator));
- offset_t lastContentLength = 0;
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- RemoteFilename curFilename;
- curFilename.set(cur.filename);
- setCanAccessDirectly(curFilename);
- if (cur.size)
- {
- if (partSeparator)
- {
- if (lastContentLength)
- {
- PartitionPoint &part = createLiteral(1, partSeparator, (unsigned) -1);
- part.whichOutput = 0;
- partition.append(part);
- }
- lastContentLength = cur.size;
- }
- partition.append(*new PartitionPoint(idx, 0, cur.headerSize, cur.size, cur.size));
- }
- }
- if (srcFormat.isCsv())
- examineCsvStructure();
- }
- void FileSprayer::calculateNoSplitPartition()
- {
- LOG(MCdebugProgressDetail, job, "Setting up no split partition");
- if (!usePullOperation() && !srcFormat.equals(tgtFormat))
- throwError(DFTERR_NoSplitPushChangeFormat);
- #if 1
- //split by number
- unsigned numSources = sources.ordinality();
- unsigned numTargets = targets.ordinality();
- if (numSources < numTargets)
- numTargets = numSources;
- unsigned tally = 0;
- unsigned curTarget = 0;
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, copyCompressed?cur.psize:cur.size, copyCompressed?cur.psize:cur.size)); // outputoffset == 0
- tally += numTargets;
- if (tally >= numSources)
- {
- tally -= numSources;
- curTarget++;
- }
- }
- #else
- //split by size
- offset_t totalSize = 0;
- ForEachItemIn(i, sources)
- totalSize += sources.item(i).size;
- unsigned numTargets = targets.ordinality();
- offset_t chunkSize = (totalSize / numTargets);
- offset_t nextBoundary = chunkSize;
- offset_t sizeSoFar = 0;
- unsigned curTarget = 0;
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- offset_t nextSize = sizeSoFar + cur.size;
- if ((sizeSoFar >= nextBoundary) ||
- ((nextSize > nextBoundary) &&
- (nextBoundary - sizeSoFar < nextSize - nextBoundary)))
- {
- if (curTarget != numTargets-1)
- {
- curTarget++;
- nextBoundary += chunkSize;
- }
- }
- RemoteFilename curFilename;
- curFilename.set(cur.filename);
- setCanAccessDirectly(curFilename);
- partition.append(*new PartitionPoint(idx, curTarget, cur.headerSize, cur.size, cur.size)); // outputoffset == 0
- sizeSoFar = nextSize;
- }
- #endif
- if (srcFormat.isCsv())
- examineCsvStructure();
- }
- void FileSprayer::calculateSprayPartition()
- {
- LOG(MCdebugProgressDetail, job, "Calculating N:M partition");
- bool calcOutput = needToCalcOutput();
- FormatPartitionerArray partitioners;
- unsigned numParts = targets.ordinality();
- StringBuffer remoteFilename;
- ForEachItemIn(idx, sources)
- {
- IFormatPartitioner * partitioner = createPartitioner(idx, calcOutput, numParts);
- partitioner->setAbort(&fileSprayerAbortChecker);
- partitioners.append(*partitioner);
- }
- unsigned numProcessors = partitioners.ordinality();
- unsigned maxConnections = numParallelConnections(numProcessors);
- //Throttle maximum number of concurrent transfers by starting n threads, and
- //then waiting for one to complete before going on to the next
- Semaphore sem;
- unsigned goIndex;
- for (goIndex=0; goIndex < maxConnections; goIndex++)
- partitioners.item(goIndex).calcPartitions(&sem);
- for (; goIndex<numProcessors; goIndex++)
- {
- sem.wait();
- partitioners.item(goIndex).calcPartitions(&sem);
- }
- for (unsigned waitCount=0; waitCount < maxConnections;waitCount++)
- sem.wait();
- ForEachItemIn(idx2, partitioners)
- partitioners.item(idx2).getResults(partition);
- if ((partitioners.ordinality() > 0) && !srcAttr->hasProp("ECL"))
- {
- // Store discovered CSV record structure into target logical file.
- storeCsvRecordStructure(partitioners.item(0));
- }
- if (compressedInput && compressOutput && streq(encryptKey.str(),decryptKey.str()))
- copyCompressed = true;
- }
- void FileSprayer::storeCsvRecordStructure(IFormatPartitioner &partitioner)
- {
- StringBuffer recStru;
- partitioner.getRecordStructure(recStru);
- if ((recStru.length() > 0) && strstr(recStru.str(),"END;"))
- {
- if (distributedTarget)
- distributedTarget->setECL(recStru.str());
- }
- }
- IFormatPartitioner * FileSprayer::createPartitioner(aindex_t index, bool calcOutput, unsigned numParts)
- {
- StringBuffer remoteFilename;
- FilePartInfo & cur = sources.item(index);
- cur.filename.getRemotePath(remoteFilename.clear());
- LOG(MCdebugInfoDetail, job, "Partition %d(%s)", index, remoteFilename.str());
- srcFormat.quotedTerminator = options->getPropBool("@quotedTerminator", true);
- const SocketEndpoint & ep = cur.filename.queryEndpoint();
- IFormatPartitioner * partitioner = createFormatPartitioner(ep, srcFormat, tgtFormat, calcOutput, queryFixedSlave(), wuid);
- // CSV record structure discovery of the first source
- bool isRecordStructurePresent = options->getPropBool("@recordStructurePresent", false);
- partitioner->setRecordStructurePresent(isRecordStructurePresent);
- RemoteFilename name;
- name.set(cur.filename);
- setCanAccessDirectly(name);
- partitioner->setPartitionRange(totalSize, cur.offset, cur.size, cur.headerSize, numParts);
- partitioner->setSource(index, name, compressedInput, decryptKey);
- return partitioner;
- }
- void FileSprayer::examineCsvStructure()
- {
- if (srcAttr && srcAttr->hasProp("ECL"))
- // Already has, keep it.
- return;
- bool calcOutput = needToCalcOutput();
- if (sources.ordinality())
- {
- Owned<IFormatPartitioner> partitioner = createPartitioner(0, calcOutput, targets.ordinality());
- storeCsvRecordStructure(*partitioner);
- }
- else
- LOG(MCdebugInfoDetail, job, "No source CSV file to examine.");
- }
- void FileSprayer::calculateOutputOffsets()
- {
- unsigned headerSize = getHeaderSize(tgtFormat.type);
- offset_t outputOffset = headerSize;
- unsigned curOutput = 0;
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & cur = partition.item(idx);
- if (curOutput != cur.whichOutput)
- {
- outputOffset = headerSize;
- curOutput = cur.whichOutput;
- }
- cur.outputOffset = outputOffset;
- outputOffset += cur.outputLength;
- }
- }
- void FileSprayer::checkFormats()
- {
- if (unknownSourceFormat)
- {
- //If target format is specified, use that - not really very good, but...
- srcFormat.set(tgtFormat);
- //If format omitted, and number of parts are the same then okay to omit the format
- if (sources.ordinality() == targets.ordinality() && !disallowImplicitReplicate())
- copySource = true;
- bool noSplit = !allowSplit();
- if (!replicate && !copySource && !noSplit)
- {
- //copy to a single target => assume same format concatenated.
- if (targets.ordinality() != 1)
- {
- if (!unknownTargetFormat)
- throwError(DFTERR_TargetFormatUnknownSource);
- else
- throwError(DFTERR_FormatNotSpecified);
- }
- }
- }
- FileFormatType srcType = srcFormat.type;
- FileFormatType tgtType = tgtFormat.type;
- if (srcType != tgtType)
- {
- switch (srcType)
- {
- case FFTfixed:
- if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
- throwError(DFTERR_BadSrcTgtCombination);
- break;
- case FFTvariable:
- if ((tgtType != FFTfixed) && (tgtType != FFTblocked)&& (tgtType != FFTvariablebigendian))
- throwError(DFTERR_BadSrcTgtCombination);
- break;
- case FFTvariablebigendian:
- if ((tgtType != FFTfixed) && (tgtType != FFTblocked) && (tgtType != FFTvariable))
- throwError(DFTERR_BadSrcTgtCombination);
- break;
- case FFTblocked:
- if ((tgtType != FFTvariable)&&(tgtType != FFTvariablebigendian))
- throwError(DFTERR_BadSrcTgtCombination);
- break;
- case FFTcsv:
- throwError(DFTERR_BadSrcTgtCombination);
- case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
- switch (tgtFormat.type)
- {
- case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
- break;
- default:
- throwError(DFTERR_OnlyConvertUtfUtf);
- break;
- }
- break;
- }
- }
- switch (srcType)
- {
- case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
- if (srcFormat.rowTag)
- {
- srcFormat.maxRecordSize = srcFormat.maxRecordSize > DEFAULT_MAX_XML_RECORD_SIZE ? srcFormat.maxRecordSize : DEFAULT_MAX_XML_RECORD_SIZE;
- }
- break;
- default:
- break;
- }
- }
- void FileSprayer::calibrateProgress()
- {
- sizeToBeRead = 0;
- ForEachItemIn(idx, transferSlaves)
- sizeToBeRead += transferSlaves.item(idx).getInputSize();
- totalLengthRead = calcSizeReadAlready();
- }
- void FileSprayer::checkForOverlap()
- {
- unsigned num = std::min(sources.ordinality(), targets.ordinality());
- for (unsigned idx = 0; idx < num; idx++)
- {
- RemoteFilename & srcName = sources.item(idx).filename;
- RemoteFilename & tgtName = targets.item(idx).filename;
- if (srcName.equals(tgtName))
- {
- StringBuffer x;
- srcName.getPath(x);
- throwError1(DFTERR_CopyFileOntoSelf, x.str());
- }
- }
- }
- void FileSprayer::cleanupRecovery()
- {
- progressTree->setPropBool(ANcomplete, true);
- #ifdef CLEANUP_RECOVERY
- progressTree->removeProp(ANhasPartition);
- progressTree->removeProp(ANhasProgress);
- progressTree->removeProp(ANhasRecovery);
- progressTree->removeProp(PNpartition);
- progressTree->removeProp(PNprogress);
- progressTree->removeProp(ANpull);
- #endif
- }
- bool FileSprayer::usePushWholeOperation() const
- {
- return targets.item(0).filename.isUrl();
- }
- bool FileSprayer::canRenameOutput() const
- {
- return targets.item(0).filename.queryFileSystemProperties().canRename;
- }
- void FileSprayer::checkSprayOptions()
- {
- if (isSafeMode && !canRenameOutput())
- {
- isSafeMode = false;
- UWARNLOG("Safe mode is disable because the target cannot be renamed");
- }
- }
- //Several files being pulled to the same machine - only run ftslave once...
- void FileSprayer::commonUpSlaves()
- {
- unsigned max = partition.ordinality();
- bool pull = usePullOperation();
- bool pushWhole = usePushWholeOperation();
- bool slaveMatchesOutput = pull || pushWhole; // One slave per target if a url
- for (unsigned idx = 0; idx < max; idx++)
- {
- PartitionPoint & cur = partition.item(idx);
- cur.whichSlave = slaveMatchesOutput ? cur.whichOutput : cur.whichInput;
- if (cur.whichSlave == -1)
- cur.whichSlave = 0;
- }
- if (options->getPropBool(ANnocommon, true) || pushWhole)
- return;
- //First work out which are the same slaves, and then map the partition.
- //Previously it was n^2 in partition, which is fine until you spray 100K files.
- unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
- unsigned * slaveMapping = new unsigned [numSlaves];
- for (unsigned i = 0; i < numSlaves; i++)
- slaveMapping[i] = i;
- if (pull)
- {
- for (unsigned i1 = 1; i1 < numSlaves; i1++)
- {
- TargetLocation & cur = targets.item(i1);
- for (unsigned i2 = 0; i2 < i1; i2++)
- {
- if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
- {
- slaveMapping[i1] = i2;
- break;
- }
- }
- }
- }
- else
- {
- for (unsigned i1 = 1; i1 < numSlaves; i1++)
- {
- FilePartInfo & cur = sources.item(i1);
- for (unsigned i2 = 0; i2 < i1; i2++)
- {
- if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
- {
- slaveMapping[i1] = i2;
- break;
- }
- }
- }
- }
- for (unsigned i3 = 0; i3 < max; i3++)
- {
- PartitionPoint & cur = partition.item(i3);
- cur.whichSlave = slaveMapping[cur.whichSlave];
- }
- delete [] slaveMapping;
- }
- void FileSprayer::analyseFileHeaders(bool setcurheadersize)
- {
- FileFormatType defaultFormat = FFTunknown;
- switch (srcFormat.type)
- {
- case FFTutf:
- case FFTutf8:
- defaultFormat = FFTutf8n;
- break;
- case FFTutf16:
- defaultFormat = FFTutf16be;
- break;
- case FFTutf32:
- defaultFormat = FFTutf32be;
- break;
- default:
- if (!srcFormat.rowTag)
- return;
- break;
- }
- FileFormatType actualType = FFTunknown;
- unsigned numEmptyXml = 0;
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- StringBuffer s;
- cur.filename.getPath(s);
- LOG(MCdebugInfo, job, "Examine header of file %s", s.str());
- Owned<IFile> file = createIFile(cur.filename);
- Owned<IFileIO> io = file->open(IFOread);
- if (!io)
- {
- StringBuffer s;
- cur.filename.getRemotePath(s);
- throwError1(DFTERR_CouldNotOpenFilePart, s.str());
- }
- if (compressedInput) {
- Owned<IExpander> expander;
- if (!decryptKey.isEmpty()) {
- StringBuffer key;
- decrypt(key,decryptKey);
- expander.setown(createAESExpander256(key.length(),key.str()));
- }
- io.setown(createCompressedFileReader(io,expander));
- }
- if (defaultFormat != FFTunknown)
- {
- FileFormatType thisType;
- unsigned char header[4];
- memset(header, 255, sizeof(header)); // fill so don't get clashes if file is very small!
- unsigned numRead = io->read(0, 4, header);
- unsigned headerSize = 0;
- if ((memcmp(header, "\xEF\xBB\xBF", 3) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf8))
- {
- thisType = FFTutf8n;
- headerSize = 3;
- }
- else if ((memcmp(header, "\xFF\xFE\x00\x00", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
- {
- thisType = FFTutf32le;
- headerSize = 4;
- }
- else if ((memcmp(header, "\x00\x00\xFE\xFF", 4) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf32))
- {
- thisType = FFTutf32be;
- headerSize = 4;
- }
- else if ((memcmp(header, "\xFF\xFE", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
- {
- thisType = FFTutf16le;
- headerSize = 2;
- }
- else if ((memcmp(header, "\xFE\xFF", 2) == 0) && (srcFormat.type == FFTutf || srcFormat.type == FFTutf16))
- {
- thisType = FFTutf16be;
- headerSize = 2;
- }
- else
- {
- thisType = defaultFormat;
- headerSize = 0;
- }
- if (actualType == FFTunknown)
- actualType = thisType;
- else if (actualType != thisType)
- throwError(DFTERR_PartsDoNotHaveSameUtfFormat);
- if (setcurheadersize) {
- cur.headerSize = headerSize;
- cur.size -= headerSize;
- }
- }
- if (srcFormat.rowTag&&setcurheadersize)
- {
- try
- {
- if (distributedSource)
- {
- // Despray from distributed file
- // Check XMLheader/footer in file level
- DistributedFilePropertyLock lock(distributedSource);
- IPropertyTree &curProps = lock.queryAttributes();
- if (curProps.hasProp(FPheaderLength) && curProps.hasProp(FPfooterLength))
- {
- cur.xmlHeaderLength = curProps.getPropInt(FPheaderLength, 0);
- cur.xmlFooterLength = curProps.getPropInt(FPfooterLength, 0);
- }
- else
- {
- // Try it in file part level
- Owned<IDistributedFilePart> curPart = distributedSource->getPart(idx);
- IPropertyTree& curPartProps = curPart->queryAttributes();
- cur.xmlHeaderLength = curPartProps.getPropInt(FPheaderLength, 0);
- cur.xmlFooterLength = curPartProps.getPropInt(FPfooterLength, 0);
- }
- }
- else
- {
- // Spray from file
- if (srcFormat.headerLength == (unsigned)-1 || srcFormat.footerLength == (unsigned)-1)
- locateContentHeader(io, cur.headerSize, cur.xmlHeaderLength, cur.xmlFooterLength);
- else
- {
- cur.xmlHeaderLength = srcFormat.headerLength;
- cur.xmlFooterLength = srcFormat.footerLength;
- }
- }
- cur.headerSize += (unsigned)cur.xmlHeaderLength;
- if (cur.size >= cur.xmlHeaderLength + cur.xmlFooterLength)
- {
- cur.size -= (cur.xmlHeaderLength + cur.xmlFooterLength);
- if (cur.size <= srcFormat.rowTag.length()) // implies there's a header and footer but no rows (whitespace only)
- cur.size = 0;
- }
- else
- throwError3(DFTERR_InvalidXmlPartSize, cur.size, cur.xmlHeaderLength, cur.xmlFooterLength);
- }
- catch (IException * e)
- {
- if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
- throw;
- e->Release();
- if (!replicate)
- {
- cur.headerSize = 0;
- cur.size = 0;
- }
- numEmptyXml++;
- }
- }
- }
- if (numEmptyXml == sources.ordinality())
- {
- if (numEmptyXml == 1)
- throwError(DFTERR_CannotFindFirstXmlRecord);
- // else
- // throwError(DFTERR_CannotFindAnyXmlRecord);
- }
- if (defaultFormat != FFTunknown)
- srcFormat.type = actualType;
- if (unknownTargetFormat)
- {
- tgtFormat.set(srcFormat);
- if (distributedTarget)
- {
- DistributedFilePropertyLock lock(distributedTarget);
- IPropertyTree &curProps = lock.queryAttributes();
- tgtFormat.save(&curProps);
- }
- }
- }
- void FileSprayer::locateXmlHeader(IFileIO * io, unsigned headerSize, offset_t & xmlHeaderLength, offset_t & xmlFooterLength)
- {
- Owned<IFileIOStream> in = createIOStream(io);
- XmlSplitter splitter(srcFormat);
- BufferedDirectReader reader;
- reader.set(in);
- reader.seek(headerSize);
- if (xmlHeaderLength == 0)
- {
- try
- {
- xmlHeaderLength = splitter.getHeaderLength(reader);
- }
- catch (IException * e)
- {
- if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
- throw;
- e->Release();
- xmlHeaderLength = 0;
- }
- }
- offset_t size = io->size();
- offset_t endOffset = (size > srcFormat.maxRecordSize*2 + headerSize) ? size - srcFormat.maxRecordSize*2 : headerSize;
- reader.seek(endOffset);
- if (xmlFooterLength == 0)
- {
- try
- {
- xmlFooterLength = splitter.getFooterLength(reader, size);
- }
- catch (IException * e)
- {
- if (e->errorCode() != DFTERR_CannotFindLastXmlRecord)
- throw;
- e->Release();
- xmlFooterLength= 0;
- }
- }
- }
- void FileSprayer::locateJsonHeader(IFileIO * io, unsigned headerSize, offset_t & headerLength, offset_t & footerLength)
- {
- Owned<IFileIOStream> in = createIOStream(io);
- JsonSplitter jsplitter(srcFormat, *in);
- headerLength = jsplitter.getHeaderLength();
- footerLength = jsplitter.getFooterLength();
- }
- void FileSprayer::locateContentHeader(IFileIO * io, unsigned headerSize, offset_t & headerLength, offset_t & footerLength)
- {
- if (srcFormat.markup == FMTjson)
- locateJsonHeader(io, headerSize, headerLength, footerLength);
- else
- locateXmlHeader(io, headerSize, headerLength, footerLength);
- }
- void FileSprayer::derivePartitionExtra()
- {
- calculateOutputOffsets();
- assignPartitionFilenames();
- commonUpSlaves();
- IPropertyTreeIterator * iter = NULL;
- if (isRecovering)
- {
- Owned<IPropertyTreeIterator> iter = progressTree->getElements(PNprogress);
- ForEach(*iter)
- {
- OutputProgress & next = * new OutputProgress;
- next.restore(&iter->query());
- next.tree.set(&iter->query());
- progress.append(next);
- }
- assertex(progress.ordinality() == partition.ordinality());
- }
- else
- {
- if (allowRecovery)
- progressTree->setPropBool(ANhasProgress, true);
- ForEachItemIn(idx, partition)
- {
- OutputProgress & next = * new OutputProgress;
- next.whichPartition=idx;
- if (allowRecovery)
- {
- IPropertyTree * progressInfo = progressTree->addPropTree(PNprogress, createPTree(PNprogress, ipt_caseInsensitive));
- next.tree.set(progressInfo);
- next.save(progressInfo);
- }
- progress.append(next);
- }
- }
- }
- void FileSprayer::displayPartition()
- {
- ForEachItemIn(idx, partition)
- {
- partition.item(idx).display();
- #ifdef _DEBUG
- if ((partition.item(idx).whichInput >= 0) && (partition.item(idx).whichInput < sources.ordinality()) )
- LOG(MCdebugInfoDetail, unknownJob,
- " Header size: %" I64F "u, XML header size: %" I64F "u, XML footer size: %" I64F "u",
- sources.item(partition.item(idx).whichInput).headerSize,
- sources.item(partition.item(idx).whichInput).xmlHeaderLength,
- sources.item(partition.item(idx).whichInput).xmlFooterLength
- );
- else
- LOG(MCdebugInfoDetail, unknownJob," No source file for this partition");
- #endif
- }
- }
- void FileSprayer::extractSourceFormat(IPropertyTree * props)
- {
- if (srcFormat.restore(props))
- unknownSourceFormat = false;
- else
- srcFormat.set(FFTfixed, 1);
- bool blockcompressed;
- if (isCompressed(*props, &blockcompressed))
- {
- if (!blockcompressed)
- throwError(DFTERR_RowCompressedNotSupported);
- compressedInput = true;
- }
- else if (!decryptKey.isEmpty())
- compressedInput = true;
- }
- void FileSprayer::gatherFileSizes(bool errorIfMissing)
- {
- FilePartInfoArray fileSizeQueue;
- LOG(MCdebugProgress, job, "Start gathering file sizes...");
- ForEachItemIn(idx, sources)
- {
- FilePartInfo & cur = sources.item(idx);
- if (cur.size == UNKNOWN_PART_SIZE)
- fileSizeQueue.append(OLINK(cur));
- }
- gatherFileSizes(fileSizeQueue, errorIfMissing);
- LOG(MCdebugProgress, job, "Finished gathering file sizes...");
- }
- void FileSprayer::afterGatherFileSizes()
- {
- if (!copyCompressed)
- {
- StringBuffer tailStr;
- ForEachItemIn(idx2, sources)
- {
- FilePartInfo & cur = sources.item(idx2);
- LOG(MCdebugProgress, job, "%9u:%s (size: %llu bytes)",
- idx2, cur.filename.getTail(tailStr.clear()).str(), cur.size
- );
- cur.offset = totalSize;
- totalSize += cur.size;
- if (cur.size % srcFormat.getUnitSize())
- {
- StringBuffer s;
- if (srcFormat.isUtf())
- throwError2(DFTERR_InputIsInvalidMultipleUtf, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
- else
- throwError2(DFTERR_InputIsInvalidMultiple, cur.filename.getRemotePath(s).str(), srcFormat.getUnitSize());
- }
- }
- LOG(MCdebugProgress, job, "----------------------------------------------");
- LOG(MCdebugProgress, job, "All together: %llu bytes in %u file(s)", totalSize, sources.ordinality());
- }
- }
- void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorIfMissing)
- {
- if (fileSizeQueue.ordinality())
- {
- CIArrayOf<FileSizeThread> threads;
- CriticalSection fileSizeCS;
- //Is this a good guess? start square root of number of files threads??
- unsigned numThreads = (unsigned)sqrt((float)fileSizeQueue.ordinality());
- if (numThreads>20)
- numThreads = 20;
- LOG(MCdebugProgress, job, "Gathering %d file sizes on %d threads", fileSizeQueue.ordinality(), numThreads);
- unsigned idx;
- for (idx = 0; idx < numThreads; idx++)
- threads.append(*new FileSizeThread(fileSizeQueue, fileSizeCS, compressedInput&&!copyCompressed, errorIfMissing));
- for (idx = 0; idx < numThreads; idx++)
- threads.item(idx).start();
- for (;;) {
- bool alldone = true;
- StringBuffer err;
- for (idx = 0; idx < numThreads; idx++) {
- bool ok = threads.item(idx).wait(10*1000);
- if (!ok)
- alldone = false;
- }
- if (alldone)
- break;
- }
- for (idx = 0; idx < numThreads; idx++)
- threads.item(idx).queryThrowError();
- }
- }
- void FileSprayer::gatherMissingSourceTarget(IFileDescriptor * source)
- {
- //First gather all the file sizes...
- RemoteFilename filename;
- FilePartInfoArray primparts;
- FilePartInfoArray secparts;
- UnsignedArray secstart;
- FilePartInfoArray queue;
- unsigned numParts = source->numParts();
- for (unsigned idx1=0; idx1 < numParts; idx1++)
- {
- if (!filter.get() || filter->includePart(idx1))
- {
- unsigned numCopies = source->numCopies(idx1);
- if (numCopies>=1) // only add if there is one or more replicates
- {
- for (unsigned copy=0; copy < numCopies; copy++)
- {
- FilePartInfo & next = * new FilePartInfo;
- source->getFilename(idx1, copy, next.filename);
- if (copy==0)
- primparts.append(next);
- else
- {
- if (copy==1)
- secstart.append(secparts.ordinality());
- secparts.append(next);
- }
- queue.append(OLINK(next));
- }
- }
- }
- }
- secstart.append(secparts.ordinality());
- gatherFileSizes(queue, false);
- //Now process the information...
- StringBuffer primaryPath, secondaryPath;
- for (unsigned idx=0; idx < primparts.ordinality(); idx++)
- {
- FilePartInfo & primary = primparts.item(idx);
- offset_t primarySize = primary.size;
- primary.filename.getRemotePath(primaryPath.clear());
- for (unsigned idx2=secstart.item(idx);idx2<secstart.item(idx+1);idx2++)
- {
- FilePartInfo & secondary = secparts.item(idx2);
- offset_t secondarySize = secondary.size;
- secondary.filename.getRemotePath(secondaryPath.clear());
- unsigned sourceCopy = 0;
- if (primarySize != secondarySize)
- {
- if (primarySize == -1)
- {
- sourceCopy = 1;
- }
- else if (secondarySize != -1)
- {
- LOG(MCwarning, unknownJob, "Replicate - primary and secondary copies have different sizes (%" I64F "d v %" I64F "d) for part %u", primarySize, secondarySize, idx);
- continue; // ignore copy
- }
- }
- else
- {
- if (primarySize == -1)
- {
- LOG(MCwarning, unknownJob, "Replicate - neither primary or secondary copies exist for part %u", idx);
- primarySize = 0; // to stop later failure to gather the file size
- }
- continue; // ignore copy
- }
- RemoteFilename *dst= (sourceCopy == 0) ? &secondary.filename : &primary.filename;
- // check nothing else to same destination
- bool done = false;
- ForEachItemIn(dsti,targets) {
- TargetLocation &tgt = targets.item(dsti);
- if (tgt.filename.equals(*dst)) {
- done = true;
- break;
- }
- }
- if (!done) {
- sources.append(* new FilePartInfo(*((sourceCopy == 0)? &primary.filename : &secondary.filename)));
- targets.append(* new TargetLocation(*dst));
- sources.tos().size = (sourceCopy == 0) ? primarySize : secondarySize;
- }
- }
- }
- filter.clear(); // we have already filtered
- }
- unsigned __int64 FileSprayer::calcSizeReadAlready()
- {
- unsigned __int64 sizeRead = 0;
- ForEachItemIn(idx, progress)
- {
- OutputProgress & cur = progress.item(idx);
- sizeRead += cur.inputLength;
- }
- return sizeRead;
- }
- unsigned __int64 FileSprayer::getSizeReadAlready()
- {
- return totalLengthRead;
- }
- PartitionPoint & FileSprayer::createLiteral(size32_t len, const void * data, unsigned idx)
- {
- PartitionPoint & next = * new PartitionPoint;
- next.inputOffset = 0;
- next.inputLength = len;
- next.outputLength = len;
- next.fixedText.set(len, data);
- if (partition.isItem(idx))
- {
- PartitionPoint & cur = partition.item(idx);
- next.whichInput = cur.whichInput;
- next.whichOutput = cur.whichOutput;
- }
- else
- {
- next.whichInput = (unsigned)-1;
- next.whichOutput = (unsigned)-1;
- }
- return next;
- }
- void FileSprayer::addHeaderFooter(size32_t len, const void * data, unsigned idx, bool before)
- {
- PartitionPoint & next = createLiteral(len, data, idx);
- unsigned insertPos = before ? idx : idx+1;
- partition.add(next, insertPos);
- }
- //MORE: This should be moved to jlib....
- //MORE: I should really be doing this on unicode characters and supporting \u \U
- void replaceEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid)
- {
- out.ensureCapacity(strlen(in)+1);
- while (*in)
- {
- char c = *in++;
- if (c == '\\')
- {
- char next = *in;
- if (next)
- {
- in++;
- switch (next)
- {
- case 'a': c = '\a'; break;
- case 'b': c = '\b'; break;
- case 'f': c = '\f'; break;
- case 'n': c = '\n'; break;
- case 'r': c = '\r'; break;
- case 't': c = '\t'; break;
- case 'v': c = '\v'; break;
- case '\\':
- case '\'':
- case '?':
- case '\"': break;
- case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7':
- {
- c = next - '0';
- if (*in >= '0' && *in <= '7')
- {
- c = c << 3 | (*in++-'0');
- if (*in >= '0' && *in <= '7')
- c = c << 3 | (*in++-'0');
- }
- break;
- }
- case 'x':
- c = 0;
- while (isxdigit(*in))
- {
- next = *in++;
- c = c << 4;
- if (next >= '0' && next <= '9') c |= (next - '0');
- else if (next >= 'A' && next <= 'F') c |= (next - 'A' + 10);
- else if (next >= 'a' && next <= 'f') c |= (next - 'a' + 10);
- }
- break;
- default:
- if (errorIfInvalid)
- throw MakeStringException(1, "unrecognised character escape sequence '\\%c'", next);
- in--; // keep it as is.
- break;
- }
- }
- }
- out.append(c);
- }
- }
- void FileSprayer::addHeaderFooter(const char * data, unsigned idx, bool before)
- {
- StringBuffer expanded;
- //MORE: Should really expand as unicode, so can have unicode control characters.
- decodeCppEscapeSequence(expanded, data, true);
- MemoryBuffer translated;
- convertUtf(translated, getUtfFormatType(tgtFormat.type), expanded.length(), expanded.str(), UtfReader::Utf8);
- //MORE: Convert from utf-8 to target format.
- addHeaderFooter(translated.length(), translated.toByteArray(), idx, before);
- }
- void FileSprayer::cloneHeaderFooter(unsigned idx, bool isHeader)
- {
- PartitionPoint & cur = partition.item(idx);
- FilePartInfo & curSrc = sources.item(cur.whichInput);
- PartitionPoint & next = * new PartitionPoint;
- //NB: headerSize include the size of the xmlHeader; size includes neither header or footers.
- if (isHeader)
- // Set offset to the XML header
- next.inputOffset = curSrc.headerSize - curSrc.xmlHeaderLength;
- else
- //Set offset to the XML footer
- next.inputOffset = curSrc.headerSize + curSrc.size;
- next.inputLength = isHeader ? curSrc.xmlHeaderLength : curSrc.xmlFooterLength;
- next.outputLength = needToCalcOutput() ? next.inputLength : 0;
- next.whichInput = cur.whichInput;
- next.whichOutput = cur.whichOutput;
- if (isHeader)
- partition.add(next, idx);
- else
- partition.add(next, idx+1);
- }
- void FileSprayer::addPrefix(size32_t len, const void * data, unsigned idx, PartitionPointArray & partitionWork)
- {
- //Merge header and original partition item into partitionWork array
- PartitionPoint & header = createLiteral(len, data, idx);
- partitionWork.append(header);
- PartitionPoint & partData = partition.item(idx);
- partitionWork.append(OLINK(partData));
- }
- void FileSprayer::insertHeaders()
- {
- const char * header = options->queryProp("@header");
- const char * footer = options->queryProp("@footer");
- const char * glue = options->queryProp("@glue");
- const char * prefix = options->queryProp(ANprefix);
- bool canKeepHeader = srcFormat.equals(tgtFormat) || !needToCalcOutput();
- bool keepHeader = options->getPropBool("@keepHeader", canKeepHeader) && srcFormat.rowTag;
- if (header || footer || prefix || glue)
- keepHeader = false;
- if (keepHeader && !canKeepHeader)
- throwError(DFTERR_CannotKeepHeaderChangeFormat);
- if (header || footer || keepHeader)
- {
- unsigned idx;
- unsigned curOutput = (unsigned)-1;
- bool footerPending = false;
- for (idx = 0; idx < partition.ordinality(); idx++)
- {
- PartitionPoint & cur = partition.item(idx);
- if (curOutput != cur.whichOutput)
- {
- if (keepHeader)
- {
- if (footerPending && (idx != 0))
- {
- footerPending = false;
- cloneHeaderFooter(idx-1, false);
- idx++;
- }
- //Don't add a header if there are no records in this part, and coming from more than one source file
- //If coming from one then we'll be guaranteed to have a correct header in that part.
- //If more than one, (and not replicating) then we will have failed to know where the header/footers are for this part.
- if ((cur.inputLength == 0) && (sources.ordinality() > 1))
- continue;
- cloneHeaderFooter(idx, true);
- footerPending = true;
- idx++;
- }
- if (footer && (idx != 0))
- {
- addHeaderFooter(footer, idx-1, false);
- idx++;
- }
- if (header)
- {
- addHeaderFooter(header, idx, true);
- idx++;
- }
- curOutput = cur.whichOutput;
- }
- }
- if (keepHeader && footerPending)
- {
- while (idx && partition.item(idx-1).inputLength == 0)
- idx--;
- if (idx)
- {
- cloneHeaderFooter(idx-1, false);
- idx++;
- }
- }
- if (footer)
- {
- addHeaderFooter(footer, idx-1, false);
- idx++;
- }
- }
- if (glue)
- {
- unsigned idx;
- unsigned curInput = 0;
- unsigned curOutput = 0;
- for (idx = 0; idx < partition.ordinality(); idx++)
- {
- PartitionPoint & cur = partition.item(idx);
- if ((curInput != cur.whichInput) && (curOutput == cur.whichOutput))
- {
- addHeaderFooter(glue, idx, true);
- idx++;
- }
- curInput = cur.whichInput;
- curOutput = cur.whichOutput;
- }
- }
- if (prefix)
- {
- if (!srcFormat.equals(tgtFormat))
- throwError(DFTERR_PrefixCannotTransform);
- if (glue || header || footer)
- throwError(DFTERR_PrefixCannotAlsoAddHeader);
- PartitionPointArray partitionWork;
- MemoryBuffer filePrefix;
- filePrefix.setEndian(__LITTLE_ENDIAN);
- for (unsigned idx = 0; idx < partition.ordinality(); idx++)
- {
- PartitionPoint & cur = partition.item(idx);
- filePrefix.clear();
- const char * finger = prefix;
- while (finger)
- {
- StringAttr command;
- const char * comma = strchr(finger, ',');
- if (comma)
- {
- command.set(finger, comma-finger);
- finger = comma+1;
- }
- else
- {
- command.set(finger);
- finger = NULL;
- }
- command.toUpperCase();
- if (memcmp(command, "FILENAME", 8) == 0)
- {
- StringBuffer filename;
- cur.inputName.split(NULL, NULL, &filename, &filename);
- if (command[8] == ':')
- {
- unsigned maxLen = atoi(command+9);
- filename.padTo(maxLen);
- filePrefix.append(maxLen, filename.str());
- }
- else
- {
- filePrefix.append((unsigned)filename.length());
- filePrefix.append(filename.length(), filename.str());
- }
- }
- else if ((memcmp(command, "FILESIZE", 8) == 0) || (command.length() == 2))
- {
- const char * format = command;
- if (memcmp(format, "FILESIZE", 8) == 0)
- {
- if (format[8] == ':')
- format = format+9;
- else
- format = "L4";
- }
- bool bigEndian;
- char c = format[0];
- if (c == 'B')
- bigEndian = true;
- else if (c == 'L')
- bigEndian = false;
- else
- throwError1(DFTERR_InvalidPrefixFormat, format);
- c = format[1];
- if ((c <= '0') || (c > '8'))
- throwError1(DFTERR_InvalidPrefixFormat, format);
- unsigned length = (c - '0');
- unsigned __int64 value = cur.inputLength;
- byte temp[8];
- for (unsigned i=0; i<length; i++)
- {
- temp[i] = (byte)value;
- value >>= 8;
- }
- if (value)
- throwError(DFTERR_PrefixTooSmall);
- if (bigEndian)
- {
- byte temp2[8];
- _cpyrevn(&temp2, &temp, length);
- filePrefix.append(length, &temp2);
- }
- else
- filePrefix.append(length, &temp);
- }
- else
- throwError1(DFTERR_InvalidPrefixFormat, command.get());
- }
- addPrefix(filePrefix.length(), filePrefix.toByteArray(), idx, partitionWork);
- }
- LOG(MCdebugProgress, job, "Publish headers");
- partition.swapWith(partitionWork);
- }
- }
- bool FileSprayer::needToCalcOutput()
- {
- return !usePullOperation() || options->getPropBool(ANverify);
- }
- unsigned FileSprayer::numParallelConnections(unsigned limit)
- {
- unsigned maxConnections = options->getPropInt(ANmaxConnections, limit);
- if ((maxConnections == 0) || (maxConnections > limit)) maxConnections = limit;
- return maxConnections;
- }
- unsigned FileSprayer::numParallelSlaves()
- {
- unsigned numPullers = transferSlaves.ordinality();
- unsigned maxConnections = DEFAULT_MAX_CONNECTIONS;
- unsigned connectOption = options->getPropInt(ANmaxConnections, 0);
- if (connectOption)
- maxConnections = connectOption;
- else if (mirroring && (maxConnections * 3 < numPullers))
- maxConnections = numPullers/3;
- if (maxConnections > numPullers) maxConnections = numPullers;
- return maxConnections;
- }
- void FileSprayer::performTransfer()
- {
- unsigned numSlaves = transferSlaves.ordinality();
- unsigned maxConnections = numParallelSlaves();
- unsigned failure = options->getPropInt("@fail", 0);
- if (failure) maxConnections = 1;
- calibrateProgress();
- numSlavesCompleted = 0;
- if (maxConnections > 1)
- shuffle(transferSlaves);
- if (progressReport)
- progressReport->setRange(getSizeReadAlready(), sizeToBeRead, transferSlaves.ordinality());
- LOG(MCdebugInfo, job, "Begin to transfer parts (%d threads)\n", maxConnections);
- //Throttle maximum number of concurrent transfers by starting n threads, and
- //then waiting for one to complete before going on to the next
- lastProgressTick = msTick();
- Semaphore sem;
- unsigned goIndex;
- for (goIndex=0; goIndex<maxConnections; goIndex++)
- transferSlaves.item(goIndex).go(sem);
- //MORE: Should abort early if we get an error on one of the transfers...
- // to do that we will need a queue of completed pullers.
- for (; !error && goIndex<numSlaves;goIndex++)
- {
- waitForTransferSem(sem);
- numSlavesCompleted++;
- transferSlaves.item(goIndex).go(sem);
- }
- for (unsigned waitCount=0; waitCount<maxConnections;waitCount++)
- {
- waitForTransferSem(sem);
- numSlavesCompleted++;
- }
- if (error)
- throw LINK(error);
- bool ok = true;
- ForEachItemIn(idx3, transferSlaves)
- {
- FileTransferThread & cur = transferSlaves.item(idx3);
- if (!cur.ok)
- ok = false;
- }
- if (!ok) {
- if (isAborting())
- throwError(DFTERR_CopyAborted);
- else
- throwError(DFTERR_CopyFailed);
- }
- }
- void FileSprayer::pullParts()
- {
- bool needCalcCRC = calcCRC();
- LOG(MCdebugInfoDetail, job, "Calculate CRC = %d", needCalcCRC);
- ForEachItemIn(idx, targets)
- {
- FileTransferThread & next = * new FileTransferThread(*this, FTactionpull, targets.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
- transferSlaves.append(next);
- }
- ForEachItemIn(idx3, partition)
- {
- PartitionPoint & cur = partition.item(idx3);
- if (!filter || filter->includePart(cur.whichOutput))
- transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
- }
- performTransfer();
- }
- //Execute a parallel write to a remote part, but each slave writes the entire contents of the file
- void FileSprayer::pushWholeParts()
- {
- bool needCalcCRC = calcCRC();
- LOG(MCdebugInfoDetail, job, "Calculate CRC = %d", needCalcCRC);
- //Create a slave for each of the target files, but execute it on the node corresponding to the first source file
- //For container mode this will need to execute on this node, or on a load balanced service
- ForEachItemIn(idx, targets)
- {
- TargetLocation & cur = targets.item(idx);
- SocketEndpoint ep;
- ForEachItemIn(idx3, partition)
- {
- PartitionPoint & cur = partition.item(idx3);
- if (cur.whichOutput == idx)
- {
- ep = sources.item(cur.whichInput).filename.queryEndpoint();
- break;
- }
- }
- FileTransferThread & next = * new FileTransferThread(*this, FTactionpull, ep, needCalcCRC, wuid);
- transferSlaves.append(next);
- }
- ForEachItemIn(idx3, partition)
- {
- PartitionPoint & cur = partition.item(idx3);
- if (!filter || filter->includePart(cur.whichOutput))
- transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
- }
- performTransfer();
- }
- void FileSprayer::pushParts()
- {
- bool needCalcCRC = calcCRC();
- ForEachItemIn(idx, sources)
- {
- FileTransferThread & next = * new FileTransferThread(*this, FTactionpush, sources.item(idx).filename.queryEndpoint(), needCalcCRC, wuid);
- transferSlaves.append(next);
- }
- ForEachItemIn(idx3, partition)
- {
- PartitionPoint & cur = partition.item(idx3);
- if (!filter || filter->includePart(cur.whichOutput))
- transferSlaves.item(cur.whichSlave).addPartition(cur, progress.item(idx3));
- }
- performTransfer();
- }
- void FileSprayer::removeSource()
- {
- LOG(MCwarning, job, "Source file removal not yet implemented");
- }
- bool FileSprayer::restorePartition()
- {
- if (allowRecovery && progressTree->getPropBool(ANhasPartition))
- {
- IPropertyTreeIterator * iter = progressTree->getElements(PNpartition);
- ForEach(*iter)
- {
- PartitionPoint & next = * new PartitionPoint;
- next.restore(&iter->query());
- partition.append(next);
- }
- iter->Release();
- return (partition.ordinality() != 0);
- }
- return false;
- }
- void FileSprayer::savePartition()
- {
- if (allowRecovery)
- {
- ForEachItemIn(idx, partition)
- {
- IPropertyTree * child = createPTree(PNpartition, ipt_caseInsensitive);
- partition.item(idx).save(child);
- progressTree->addPropTree(PNpartition, child);
- }
- progressTree->setPropBool(ANhasPartition, true);
- }
- }
- void FileSprayer::setCopyCompressedRaw()
- {
- assertex(compressedInput && compressOutput);
- // encrypt/decrypt keys should be same
- compressedInput = false;
- compressOutput = false;
- calcedInputCRC = true;
- cachedInputCRC = false;
- copyCompressed = true;
- }
- void FileSprayer::setError(const SocketEndpoint & ep, IException * e)
- {
- CriticalBlock lock(errorCS);
- if (!error)
- {
- StringBuffer url;
- ep.getUrlStr(url);
- error.setown(MakeStringException(e->errorCode(), "%s", e->errorMessage(url.append(": ")).str()));
- }
- }
- void FileSprayer::setPartFilter(IDFPartFilter * _filter)
- {
- filter.set(_filter);
- }
- void FileSprayer::setProgress(IDaftProgress * _progress)
- {
- progressReport = _progress;
- }
- void FileSprayer::setAbort(IAbortRequestCallback * _abort)
- {
- abortChecker = _abort;
- }
- void FileSprayer::setReplicate(bool _replicate)
- {
- replicate = _replicate;
- }
- void FileSprayer::setSource(IDistributedFile * source)
- {
- distributedSource.set(source);
- srcAttr.setown(createPTreeFromIPT(&source->queryAttributes()));
- IPropertyTree *history = source->queryHistory();
- if (history)
- srcHistory.setown(createPTreeFromIPT(history));
- compressedInput = source->isCompressed();
- extractSourceFormat(srcAttr);
- unsigned numParts = source->numParts();
- for (unsigned idx=0; idx < numParts; idx++)
- {
- Owned<IDistributedFilePart> curPart = source->getPart(idx);
- RemoteFilename rfn;
- FilePartInfo & next = * new FilePartInfo(curPart->getFilename(rfn));
- next.extractExtra(*curPart);
- if (curPart->numCopies()>1)
- next.mirrorFilename.set(curPart->getFilename(rfn,1));
- // don't set the following here - force to check disk
- //next.size = curPart->getFileSize(true,false);
- //next.psize = curPart->getDiskSize(true,false);
- sources.append(next);
- }
- gatherFileSizes(false);
- }
- void FileSprayer::setSource(IFileDescriptor * source)
- {
- setSource(source, 0, 1);
- //Now get the size of the files directly (to check they exist). If they don't exist then switch to the backup instead.
- gatherFileSizes(false);
- }
- void FileSprayer::setSource(IFileDescriptor * source, unsigned copy, unsigned mirrorCopy)
- {
- IPropertyTree *attr = &source->queryProperties();
- compressedInput = source->isCompressed();
- extractSourceFormat(attr);
- srcAttr.setown(createPTreeFromIPT(&source->queryProperties()));
- IPropertyTree *history = source->queryHistory();
- if (history)
- srcHistory.setown(createPTreeFromIPT(history));
- extractSourceFormat(srcAttr);
- RemoteFilename filename;
- unsigned numParts = source->numParts();
- for (unsigned idx=0; idx < numParts; idx++)
- {
- if (source->isMulti(idx))
- {
- RemoteMultiFilename multi;
- source->getMultiFilename(idx, copy, multi);
- multi.expandWild();
- ForEachItemIn(i, multi)
- {
- const RemoteFilename &rfn = multi.item(i);
- FilePartInfo & next = * new FilePartInfo(rfn);
- Owned<IPartDescriptor> part = source->getPart(idx);
- next.extractExtra(*part);
- // If size doesn't set here it will be forced to check the file size on disk (expensive)
- next.size = multi.getSize(i);
- sources.append(next);
- }
- //MORE: Need to extract the backup filenames for mirror files.
- }
- else
- {
- source->getFilename(idx, copy, filename);
- FilePartInfo & next = * new FilePartInfo(filename);
- Owned<IPartDescriptor> part = source->getPart(idx);
- next.extractExtra(*part);
- if (mirrorCopy != (unsigned)-1)
- source->getFilename(idx, mirrorCopy, next.mirrorFilename);
- sources.append(next);
- }
- }
- if (sources.ordinality() == 0)
- LOG(MCuserWarning, unknownJob, "The wildcarded source did not match any filenames");
- // throwError(DFTERR_NoFilesMatchWildcard);
- //Now get the size of the files directly (to check they exist). If they don't exist then switch to the backup instead.
- gatherFileSizes(false);
- }
- void FileSprayer::setSource(IDistributedFilePart * part)
- {
- tgtFormat.set(FFTfixed, 1);
- unsigned copy = 0;
- RemoteFilename rfn;
- sources.append(* new FilePartInfo(part->getFilename(rfn,copy)));
- if (compressedInput)
- {
- calcedInputCRC = true;
- cachedInputCRC = false;
- }
- }
- void FileSprayer::setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode)
- {
- extractSourceFormat(&fd->queryProperties());
- tgtFormat.set(srcFormat);
- if (options->getPropBool(ANcrcDiffers, false))
- throwError1(DFTERR_ReplicateOptionNoSupported, "crcDiffers");
- if (options->getPropBool(ANsizedate, false))
- throwError1(DFTERR_ReplicateOptionNoSupported, "sizedate");
- switch (mode)
- {
- case DRMreplicatePrimary: // doesn't work for multi copies
- setSource(fd, 0);
- setTarget(fd, 1);
- break;
- case DRMreplicateSecondary: // doesn't work for multi copies
- setSource(fd, 1);
- setTarget(fd, 0);
- break;
- case DRMcreateMissing: // this does though (but I am not sure works with mult-files)
- gatherMissingSourceTarget(fd);
- break;
- }
- isSafeMode = false;
- mirroring = true;
- replicate = true;
- //Optimize replicating compressed - copy it raw, but it means we can't check the input crc
- assertex(compressOutput == compressedInput);
- if (compressedInput)
- setCopyCompressedRaw();
- }
- void FileSprayer::setTarget(IDistributedFile * target)
- {
- distributedTarget.set(target);
- compressOutput = target->isCompressed();
- LOG(MCdebugInfo, unknownJob, "FileSprayer::setTarget: compressedInput:%s, compressOutput:%s",
- boolToStr(compressedInput),
- boolToStr(compressOutput));
- if (tgtFormat.restore(&target->queryAttributes()))
- unknownTargetFormat = false;
- else
- {
- const char* separator = srcFormat.separate.get();
- if (separator && (strcmp(separator, ",") == 0))
- srcFormat.separate.set("\\,");
- tgtFormat.set(srcFormat);
- if (!unknownSourceFormat)
- {
- DistributedFilePropertyLock lock(target);
- IPropertyTree &curProps = lock.queryAttributes();
- tgtFormat.save(&curProps);
- }
- }
- unsigned copy = 0;
- unsigned numParts = target->numParts();
- if (numParts == 0)
- throwError(DFTERR_NoPartsInDestination);
- for (unsigned idx=0; idx < numParts; idx++)
- {
- Owned<IDistributedFilePart> curPart(target->getPart(idx));
- RemoteFilename rfn;
- TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy));
- targets.append(next);
- }
- checkSprayOptions();
- }
- void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
- {
- if (tgtFormat.restore(&target->queryProperties()))
- unknownTargetFormat = false;
- else
- tgtFormat.set(srcFormat);
- compressOutput = !encryptKey.isEmpty()||target->isCompressed();
- unsigned numParts = target->numParts();
- if (numParts == 0)
- throwError(DFTERR_NoPartsInDestination);
- RemoteFilename filename;
- for (unsigned idx=0; idx < numParts; idx++)
- {
- target->getFilename(idx, copy, filename);
- targets.append(*new TargetLocation(filename));
- }
- checkSprayOptions();
- }
- void FileSprayer::updateProgress(const OutputProgress & newProgress)
- {
- CriticalBlock block(soFarCrit);
- lastProgressTick = msTick();
- OutputProgress & curProgress = progress.item(newProgress.whichPartition);
- totalLengthRead += (newProgress.inputLength - curProgress.inputLength);
- totalNumReads += (newProgress.numReads - curProgress.numReads);
- totalNumWrites += (newProgress.numWrites - curProgress.numWrites);
- curProgress.set(newProgress);
- if (curProgress.tree)
- curProgress.save(curProgress.tree);
- if (newProgress.status != OutputProgress::StatusRenamed)
- updateSizeRead();
- }
- void FileSprayer::updateSizeRead()
- {
- if (progressDone)
- return;
- unsigned nowTick = msTick();
- //MORE: This call shouldn't need to be done so often...
- unsigned __int64 sizeReadSoFar = getSizeReadAlready();
- bool done = sizeReadSoFar == sizeToBeRead;
- if (progressReport)
- {
- // A cheat to get 100% saying all the slaves have completed - should really
- // pass completed information in the progress info, or return the last progress
- // info when a slave is done.
- unsigned numCompleted = (sizeReadSoFar == sizeToBeRead) ? transferSlaves.ordinality() : numSlavesCompleted;
- if (done || (nowTick - lastOperatorTick >= operatorUpdateFrequency))
- {
- progressReport->onProgress(sizeReadSoFar, sizeToBeRead, numCompleted, totalNumReads, totalNumWrites);
- lastOperatorTick = nowTick;
- progressDone = done;
- }
- }
- if (allowRecovery && recoveryConnection)
- {
- if (done || (nowTick - lastSDSTick >= sdsUpdateFrequency))
- {
- recoveryConnection->commit();
- lastSDSTick = nowTick;
- progressDone = done;
- }
- }
- }
- void FileSprayer::waitForTransferSem(Semaphore & sem)
- {
- while (!sem.wait(EXPECTED_RESPONSE_TIME))
- {
- unsigned timeSinceProgress = msTick() - lastProgressTick;
- if (timeSinceProgress > EXPECTED_RESPONSE_TIME)
- {
- LOG(MCwarning, unknownJob, "No response from any slaves in last %d seconds.", timeSinceProgress/1000);
- CriticalBlock block(soFarCrit);
- StringBuffer list;
- ForEachItemIn(i, transferSlaves)
- transferSlaves.item(i).logIfRunning(list);
- if (timeSinceProgress>RESPONSE_TIME_TIMEOUT)
- {
- //Set an error - the transfer threads will check it after a couple of minutes, and then terminate gracefully
- CriticalBlock lock(errorCS);
- if (!error)
- error.setown(MakeStringException(RFSERR_TimeoutWaitSlave, RFSERR_TimeoutWaitSlave_Text, list.str()));
- }
- }
- }
- }
- void FileSprayer::addTarget(unsigned idx, INode * node)
- {
- RemoteFilename filename;
- filename.set(sources.item(idx).filename);
- filename.setEp(node->endpoint());
- targets.append(* new TargetLocation(filename));
- checkSprayOptions();
- }
- bool FileSprayer::isAborting()
- {
- if (aborting || error)
- return true;
- unsigned nowTick = msTick();
- if (abortChecker && (nowTick - lastAbortCheckTick >= abortCheckFrequency))
- {
- if (abortChecker->abortRequested())
- {
- LOG(MCdebugInfo, unknownJob, "Abort requested via callback");
- aborting = true;
- }
- lastAbortCheckTick = nowTick;
- }
- return aborting || error;
- }
- const char * FileSprayer::querySplitPrefix()
- {
- const char * ret = options->queryProp(ANsplitPrefix);
- if (ret && *ret)
- return ret;
- return NULL;
- }
- const char * FileSprayer::querySlaveExecutable(const IpAddress &ip, StringBuffer &ret) const
- {
- #ifdef _CONTAINERIZED
- return ret.append("ftslave").str();
- #else
- const char * slave = queryFixedSlave();
- try {
- queryFtSlaveExecutable(ip, ret);
- if (ret.length())
- return ret.str();
- }
- catch (IException * e) {
- if (!slave||!*slave)
- throw;
- e->Release();
- }
- if (slave)
- ret.append(slave);
- return ret.str();
- #endif
- }
- const char * FileSprayer::queryFixedSlave() const
- {
- return options->queryProp("@slave");
- }
- void FileSprayer::setTarget(IGroup * target)
- {
- tgtFormat.set(srcFormat);
- if (sources.ordinality() != target->ordinality())
- throwError(DFTERR_ReplicateNumPartsDiffer);
- ForEachItemIn(idx, sources)
- addTarget(idx, &target->queryNode(idx));
- }
- void FileSprayer::setTarget(INode * target)
- {
- tgtFormat.set(srcFormat);
- if (sources.ordinality() != 1)
- throwError(DFTERR_ReplicateNumPartsDiffer);
- addTarget(0, target);
- }
- inline bool nonempty(IPropertyTree *pt, const char *p) { const char *s = pt->queryProp(p); return s&&*s; }
- bool FileSprayer::disallowImplicitReplicate()
- {
- return options->getPropBool(ANsplit) ||
- options->getPropBool(ANnosplit) ||
- querySplitPrefix() ||
- nonempty(options,"@header") ||
- nonempty(options,"@footer") ||
- nonempty(options,"@glue") ||
- nonempty(options,ANprefix) ||
- nonempty(options,ANencryptKey) ||
- nonempty(options,ANdecryptKey);
- }
- void FileSprayer::spray()
- {
- if (!allowSplit() && querySplitPrefix())
- throwError(DFTERR_SplitNoSplitClash);
- aindex_t sourceSize = sources.ordinality();
- bool failIfNoSourceFile = options->getPropBool("@failIfNoSourceFile");
- if (sourceSize == 0)
- {
- if (failIfNoSourceFile)
- throwError(DFTERR_NoFilesMatchWildcard);
- else
- progressTree->setPropBool("@noFileMatch", true);
- }
- LOG(MCdebugInfo, job, "compressedInput:%d, compressOutput:%d", compressedInput, compressOutput);
- LocalAbortHandler localHandler(daftAbortHandler);
- if (allowRecovery && progressTree->getPropBool(ANcomplete))
- {
- LOG(MCdebugInfo, job, "Command completed successfully in previous invocation");
- return;
- }
- checkFormats();
- checkForOverlap();
- progressTree->setPropBool(ANpull, usePullOperation());
- const char * splitPrefix = querySplitPrefix();
- if (!replicate && (sources.ordinality() == targets.ordinality()))
- {
- if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
- copySource = true;
- }
- if (compressOutput&&!replicate&&!copySource)
- {
- PROGLOG("Compress output forcing pull");
- options->setPropBool(ANpull, true);
- allowRecovery = false;
- }
- gatherFileSizes(true);
- if (!replicate||copySource) // NB: When copySource=true, analyseFileHeaders mainly just sets srcFormat.type
- analyseFileHeaders(!copySource); // if pretending replicate don't want to remove headers
- afterGatherFileSizes();
- if (compressOutput && !usePullOperation() && !replicate && !copySource)
- throwError(DFTERR_CannotPushAndCompress);
- if (restorePartition())
- {
- LOG(MCdebugProgress, job, "Partition restored from recovery information");
- }
- else
- {
- LOG(MCdebugProgress, job, "Calculate partition information");
- if (replicate || copySource)
- calculateOne2OnePartition();
- else if (!allowSplit())
- calculateNoSplitPartition();
- else if (splitPrefix && *splitPrefix)
- calculateSplitPrefixPartition(splitPrefix);
- else if ((targets.ordinality() == 1) && srcFormat.equals(tgtFormat))
- calculateMany2OnePartition();
- else
- calculateSprayPartition();
- if (partition.ordinality() > PARTITION_RECOVERY_LIMIT)
- allowRecovery = false;
- savePartition();
- }
- assignPartitionFilenames(); // assign source filenames - used in insertHeaders..
- if (!replicate && !copySource)
- {
- LOG(MCdebugProgress, job, "Insert headers");
- insertHeaders();
- }
- addEmptyFilesToPartition();
- derivePartitionExtra();
- if (partition.ordinality() < 1000)
- displayPartition();
- if (isRecovering)
- displayProgress(progress);
- throwExceptionIfAborting();
- beforeTransfer();
- if (usePushWholeOperation())
- pushWholeParts();
- else if (usePullOperation())
- pullParts();
- else
- pushParts();
- afterTransfer();
- //If got here then we have succeeded
- updateTargetProperties();
- //Calculate and store file access cost
- double fileAccessCost = 0.0;
- if (distributedTarget)
- {
- StringBuffer cluster;
- distributedTarget->getClusterName(0, cluster);
- if (!cluster.isEmpty())
- fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
- }
- if (distributedSource && distributedSource->querySuperFile()==nullptr)
- {
- StringBuffer cluster;
- distributedSource->getClusterName(0, cluster);
- if (!cluster.isEmpty())
- fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
- }
- progressReport->setFileAccessCost(fileAccessCost);
- StringBuffer copyEventText; // [logical-source] > [logical-target]
- if (distributedSource)
- copyEventText.append(distributedSource->queryLogicalName());
- copyEventText.append(">");
- if (distributedTarget && distributedTarget->queryLogicalName())
- copyEventText.append(distributedTarget->queryLogicalName());
- //MORE: use new interface to send 'file copied' event
- //LOG(MCevent, unknownJob, EVENT_FILECOPIED, copyEventText.str());
- cleanupRecovery();
- }
- bool FileSprayer::isSameSizeHeaderFooter()
- {
- bool retVal = true;
- if (sources.ordinality() == 0)
- return retVal;
- unsigned whichHeaderInput = 0;
- headerSize = sources.item(whichHeaderInput).xmlHeaderLength;
- footerSize = sources.item(whichHeaderInput).xmlFooterLength;
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & cur = partition.item(idx);
- if (cur.inputLength && (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput))
- {
- if (headerSize != sources.item(whichHeaderInput).xmlHeaderLength)
- {
- retVal = false;
- break;
- }
- if (footerSize != sources.item(cur.whichInput).xmlFooterLength)
- {
- retVal = false;
- break;
- }
- if ( idx+1 != partition.ordinality() )
- whichHeaderInput = partition.item(idx+1).whichInput;
- }
- }
- return retVal;
- }
- void FileSprayer::updateTargetProperties()
- {
- TimeSection timer("FileSprayer::updateTargetProperties() time");
- Owned<IException> error;
- if (distributedTarget)
- {
- StringBuffer failedParts;
- CRC32Merger partCRC;
- offset_t partLength = 0;
- CRC32Merger totalCRC;
- offset_t totalLength = 0;
- offset_t totalCompressedSize = 0;
- unsigned whichHeaderInput = 0;
- bool sameSizeHeaderFooter = isSameSizeHeaderFooter();
- bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts());
- offset_t partCompressedLength = 0;
- ForEachItemIn(idx, partition)
- {
- PartitionPoint & cur = partition.item(idx);
- OutputProgress & curProgress = progress.item(idx);
- partCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
- totalCRC.addChildCRC(curProgress.outputLength, curProgress.outputCRC, false);
- if (copyCompressed && sameSizeSourceTarget) {
- FilePartInfo & curSource = sources.item(cur.whichInput);
- partLength = curSource.size;
- totalLength += partLength;
- }
- else {
- partLength += curProgress.outputLength; // AFAICS this might as well be =
- totalLength += curProgress.outputLength;
- }
- if (compressOutput)
- partCompressedLength += curProgress.compressedPartSize;
- if (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput)
- {
- Owned<IDistributedFilePart> curPart = distributedTarget->getPart(cur.whichOutput);
- // TODO: Create DistributedFilePropertyLock for parts
- curPart->lockProperties();
- IPropertyTree& curProps = curPart->queryAttributes();
- if (!sameSizeHeaderFooter)
- {
- FilePartInfo & curHeaderSource = sources.item(whichHeaderInput);
- curProps.setPropInt64(FPheaderLength, curHeaderSource.xmlHeaderLength);
- FilePartInfo & curFooterSource = sources.item(cur.whichInput);
- curProps.setPropInt64(FPfooterLength, curFooterSource.xmlFooterLength);
- if ( idx+1 != partition.ordinality() )
- whichHeaderInput = partition.item(idx+1).whichInput;
- }
- if (calcCRC())
- {
- curProps.setPropInt(FAcrc, partCRC.get());
- if (cur.whichInput != (unsigned)-1)
- {
- FilePartInfo & curSource = sources.item(cur.whichInput);
- if (replicate && curSource.hasCRC)
- {
- if ((partCRC.get() != curSource.crc)&&(compressedInput==compressOutput)) // if expanding will be different!
- {
- if (failedParts.length())
- failedParts.append(", ");
- else
- failedParts.append("Output CRC failed to match expected: ");
- curSource.filename.getPath(failedParts);
- failedParts.appendf("(%x,%x)",partCRC.get(),curSource.crc);
- }
- }
- }
- }
- else if (compressOutput || copyCompressed)
- curProps.setPropInt(FAcrc, (int)COMPRESSEDFILECRC);
- curProps.setPropInt64(FAsize, partLength);
- if (compressOutput)
- {
- curProps.setPropInt64(FAcompressedSize, partCompressedLength);
- totalCompressedSize += partCompressedLength;
- } else if (copyCompressed)
- {
- curProps.setPropInt64(FAcompressedSize, curProgress.outputLength);
- totalCompressedSize += curProgress.outputLength;
- }
- TargetLocation & curTarget = targets.item(cur.whichOutput);
- if (!curTarget.modifiedTime.isNull())
- {
- CDateTime temp;
- StringBuffer timestr;
- temp.set(curTarget.modifiedTime);
- unsigned hour, min, sec, nanosec;
- temp.getTime(hour, min, sec, nanosec);
- temp.setTime(hour, min, sec, 0);
- curProps.setProp("@modified", temp.getString(timestr).str());
- }
- if ((distributedSource != distributedTarget) && (cur.whichInput != (unsigned)-1))
- {
- FilePartInfo & curSource = sources.item(cur.whichInput);
- if (curSource.properties)
- {
- Owned<IAttributeIterator> aiter = curSource.properties->getAttributes();
- ForEach(*aiter)
- {
- const char *aname = aiter->queryName();
- if ( !( strieq(aname,"@fileCrc") ||
- strieq(aname,"@modified") ||
- strieq(aname,"@node") ||
- strieq(aname,"@num") ||
- strieq(aname,"@size") ||
- strieq(aname,"@name") ) ||
- ( strieq(aname,"@recordCount") && (sources.ordinality() == targets.ordinality()) )
- )
- curProps.setProp(aname,aiter->queryValue());
- }
- }
- }
- curPart->unlockProperties();
- partCRC.clear();
- partLength = 0;
- partCompressedLength = 0;
- }
- }
- if (failedParts.length())
- error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));
- DistributedFilePropertyLock lock(distributedTarget);
- IPropertyTree &curProps = lock.queryAttributes();
- curProps.setPropInt64("@numDiskWrites", totalNumWrites);
- if (calcCRC())
- curProps.setPropInt(FAcrc, totalCRC.get());
- curProps.setPropInt64(FAsize, totalLength);
- if (totalCompressedSize != 0)
- curProps.setPropInt64(FAcompressedSize, totalCompressedSize);
- unsigned rs = curProps.getPropInt(FArecordSize); // set by user
- bool gotrc = false;
- if (rs && (totalLength%rs == 0)) {
- curProps.setPropInt64(FArecordCount,totalLength/(offset_t)rs);
- gotrc = true;
- }
- if (sameSizeHeaderFooter && ((srcFormat.markup == FMTjson ) || (srcFormat.markup == FMTxml)))
- {
- curProps.setPropInt64(FPheaderLength, headerSize);
- curProps.setPropInt64(FPfooterLength, footerSize);
- }
- if (srcAttr.get() && !mirroring) {
- StringBuffer s;
- // copy some attributes (do as iterator in case we want to change to *exclude* some
- Owned<IAttributeIterator> aiter = srcAttr->getAttributes();
- ForEach(*aiter) {
- const char *aname = aiter->queryName();
- if (!curProps.hasProp(aname)&&
- ((stricmp(aname,"@job")==0)||
- (stricmp(aname,"@workunit")==0)||
- (stricmp(aname,"@description")==0)||
- (stricmp(aname,"@eclCRC")==0)||
- (stricmp(aname,"@formatCrc")==0)||
- (stricmp(aname,"@owner")==0)||
- ((stricmp(aname,FArecordCount)==0)&&!gotrc) ||
- ((stricmp(aname,"@blockCompressed")==0)&©Compressed) ||
- ((stricmp(aname,"@rowCompressed")==0)&©Compressed)||
- (stricmp(aname,"@local")==0)||
- (stricmp(aname,"@recordCount")==0)
- )
- )
- curProps.setProp(aname,aiter->queryValue());
- }
- // Keep source kind
- if (srcAttr->hasProp(FPkind))
- {
- curProps.setProp(FPkind, srcAttr->queryProp(FPkind));
- if (srcAttr->hasProp(FPformat))
- curProps.setProp(FPformat, srcAttr->queryProp(FPformat));
- }
- else
- {
- const char * targetKind = nullptr;
- if (tgtFormat.markup == FMTxml)
- targetKind = "xml";
- else if (tgtFormat.markup == FMTjson)
- targetKind = "json";
- const char * targetFormat = nullptr;
- switch (tgtFormat.type)
- {
- case FFTfixed:
- case FFTvariable:
- case FFTblocked:
- targetKind = "flat";
- break;
- case FFTcsv:
- targetKind = "csv";
- break;
- case FFTutf:
- targetFormat = "utf8n";
- break;
- case FFTutf8:
- targetFormat = "utf8";
- break;
- case FFTutf16:
- targetFormat = "utf16";
- break;
- case FFTutf16be:
- targetFormat = "utf16be";
- break;
- case FFTutf16le:
- targetFormat = "utf16le";
- break;
- case FFTutf32:
- targetFormat = "utf32";
- break;
- case FFTutf32be:
- targetFormat = "utf32be";
- break;
- case FFTutf32le:
- targetFormat = "utf32le";
- break;
- case FFTrecfmvb:
- targetFormat = "recfmvb";
- break;
- case FFTrecfmv:
- targetFormat = "recfmv";
- break;
- case FFTvariablebigendian:
- targetFormat = "variablebigendian";
- break;
- }
- if (targetKind)
- curProps.setProp(FPkind, targetKind);
- if (targetFormat)
- curProps.setProp(FPformat, targetFormat);
- }
- // and simple (top level) elements
- // History copied as well
- Owned<IPropertyTreeIterator> iter = srcAttr->getElements("*");
- ForEach(*iter)
- {
- const char *aname = iter->query().queryName();
- if (stricmp(aname, "Protect") != 0)
- curProps.addPropTree(aname, createPTreeFromIPT(&iter->query()));
- }
- //
- // Add new History record
- //
- IPropertyTree * curHistory = curProps.queryPropTree("History");
- // If there wasn't previous History (like Spray/Import)
- if (!curHistory)
- curHistory = curProps.setPropTree("History", createPTree());
- // Add new record about this operation
- Owned<IPropertyTree> newRecord = createPTree();
- CDateTime temp;
- temp.setNow();
- unsigned hour, min, sec, nanosec;
- temp.getTime(hour, min, sec, nanosec);
- temp.setTime(hour, min, sec, 0);
- StringBuffer timestr;
- newRecord->setProp("@timestamp",temp.getString(timestr).str());
- newRecord->setProp("@owner", srcAttr->queryProp("@owner"));
- if (srcAttr->hasProp("@workunit"))
- newRecord->setProp("@workunit", srcAttr->queryProp("@workunit"));
- newRecord->setProp("@operation", getOperationTypeString());
- // In Spray case there is not distributedSource
- if (distributedSource)
- {
- // add original file name from a single distributed source (like Copy)
- if (distributedSource->numParts())
- {
- RemoteFilename remoteFile;
- distributedSource->queryPart(0).getFilename(remoteFile, 0);
- splitAndCollectFileInfo(newRecord, remoteFile);
- }
- }
- else if (sources.ordinality())
- {
- FilePartInfo & firstSource = sources.item((aindex_t)0);
- RemoteFilename &remoteFile = firstSource.filename;
- splitAndCollectFileInfo(newRecord, remoteFile, false);
- }
- curHistory->addPropTree("Origin",newRecord.getClear());
- }
- int expireDays = options->getPropInt("@expireDays", -1);
- if (expireDays != -1)
- curProps.setPropInt("@expireDays", expireDays);
- }
- if (distributedSource)
- {
- if (distributedSource->querySuperFile()==nullptr)
- distributedSource->addAttrValue("@numDiskReads", totalNumReads);
- }
- if (error)
- throw error.getClear();
- }
- void FileSprayer::splitAndCollectFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
- bool isDistributedSource)
- {
- StringBuffer drive;
- StringBuffer path;
- StringBuffer tail;
- StringBuffer ext;
- remoteFileName.split(&drive, &path, &tail, &ext);
- if (drive.isEmpty())
- {
- remoteFileName.queryIP().getIpText(drive.clear());
- newRecord->setProp("@ip", drive.str());
- }
- else
- newRecord->setProp("@drive", drive.str());
- newRecord->setProp("@path", path.str());
- // We don't want to store distributed file parts name extension
- if (!isDistributedSource && ext.length())
- tail.append(ext);
- if (sources.ordinality()>1)
- newRecord->setProp("@name", "[MULTI]");
- else
- newRecord->setProp("@name", tail.str());
- }
- void FileSprayer::setOperation(dfu_operation op)
- {
- operation = op;
- }
- dfu_operation FileSprayer::getOperation() const
- {
- return operation;
- }
- const char * FileSprayer::getOperationTypeString() const
- {
- return DfuOperationStr[operation];
- }
- bool FileSprayer::usePullOperation() const
- {
- if (!calcedPullPush)
- {
- calcedPullPush = true;
- cachedUsePull = calcUsePull();
- }
- return cachedUsePull;
- }
- bool FileSprayer::usePushOperation() const
- {
- return !usePullOperation() && !usePushWholeOperation();
- }
- bool FileSprayer::canLocateSlaveForNode(const IpAddress &ip) const
- {
- try
- {
- StringBuffer ret;
- querySlaveExecutable(ip, ret);
- return true;
- }
- catch (IException * e)
- {
- e->Release();
- }
- return false;
- }
- bool FileSprayer::calcUsePull() const
- {
- if (allowRecovery && progressTree->hasProp(ANpull))
- {
- bool usePull = progressTree->getPropBool(ANpull);
- LOG(MCdebugInfo, job, "Pull = %d from recovery", (int)usePull);
- return usePull;
- }
- if (sources.ordinality() == 0)
- return true;
- if (options->getPropBool(ANpull, false))
- {
- LOG(MCdebugInfo, job, "Use pull since explicitly specified");
- return true;
- }
- if (options->getPropBool(ANpush, false))
- {
- LOG(MCdebugInfo, job, "Use push since explicitly specified");
- return false;
- }
- ForEachItemIn(idx2, sources)
- {
- if (!sources.item(idx2).canPush())
- {
- StringBuffer s;
- sources.item(idx2).filename.queryIP().getIpText(s);
- LOG(MCdebugInfo, job, "Use pull operation because %s cannot push", s.str());
- return true;
- }
- }
- if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
- {
- StringBuffer s;
- sources.item(0).filename.queryIP().getIpText(s);
- LOG(MCdebugInfo, job, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
- return true;
- }
- ForEachItemIn(idx, targets)
- {
- if (!targets.item(idx).canPull())
- {
- StringBuffer s;
- targets.item(idx).queryIP().getIpText(s);
- LOG(MCdebugInfo, job, "Use push operation because %s cannot pull", s.str());
- return false;
- }
- }
- if (!canLocateSlaveForNode(targets.item(0).queryIP()))
- {
- StringBuffer s;
- targets.item(0).queryIP().getIpText(s);
- LOG(MCdebugInfo, job, "Use push operation because %s doesn't appear to have an ftslave", s.str());
- return false;
- }
- //Use push if going to a single node.
- if ((targets.ordinality() == 1) && (sources.ordinality() > 1))
- {
- LOG(MCdebugInfo, job, "Use push operation because going to a single node from many");
- return false;
- }
- LOG(MCdebugInfo, job, "Use pull operation as default");
- return true;
- }
- extern DALIFT_API IFileSprayer * createFileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * recoveryConnection, const char *wuid)
- {
- return new FileSprayer(_options, _progress, recoveryConnection, wuid);
- }
- /*
- Parameters:
- 1. A list of target locations (machine+drive?) (and possibly a number for each)
- 2. A list of source locations [derived from logical file]
- 3. Information on the source and target formats
- 3. A mask for the parts that need to be copied. [recovery is special case of this]
- Need to
- a) Start servers on machines that cannot be accessed directly [have to be running anyway]
- b) Work out how the file is going to be partioned
- 1. Find out the sizes of all the files.
- 2. Calculation partion points -
- For each source file pass [thisoffset, totalsize, thissize, startPoint?], and returns a list of
- numbered partion points.
- Two calls: calcPartion() and retreivePartion() to allow for multithreading on variable length.
- A. If variable length
- Start servers on each of the source machines
- Query each server for partion information (which walks file).
- * If N->N copy don't need to calculate the partion, can do it one a 1:1 mapping.
- E.g. copy variable to blocked format with one block per variable.
- c) Save partion information for quick/consistent recovery
- d) Start servers on each of the targets or source for push to non-accessible
- e) Start pulling/pushing
- Each saves flag when complete for recovery
- */
- //----------------------------------------------------------------------------
- void testPartitions()
- {
- unsigned sizes[] = { 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
- 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
- 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
- 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
- 10,
- };
- unsigned parts = _elements_in(sizes);
- unsigned offsets[_elements_in(sizes)];
- unsigned targetParts = 20;
- unsigned recordSize = 20;
- unsigned totalSize =0;
- unsigned idx;
- for (idx = 0; idx < parts; idx++)
- {
- offsets[idx] = totalSize;
- totalSize += sizes[idx];
- }
- PartitionPointArray results;
- for (idx = 0; idx < parts; idx++)
- {
- CFixedPartitioner partitioner(recordSize);
- partitioner.setPartitionRange(totalSize, offsets[idx], sizes[idx], 0, targetParts);
- partitioner.calcPartitions(NULL);
- partitioner.getResults(results);
- }
- ForEachItemIn(idx2, results)
- results.item(idx2).display();
- }
- /*
- MORE:
- * What about multiple parts for a source file - what should we do with them?
- Ignore? Try if
- * Pushing - how do we manage it?
- A. Copy all at once.
- 1. For simple non-translation easy to copy all at once.
- 2. For others, could hook up a translator so it only calculates the target size.
- Problem is it is a reasonably complex interaction with the partitioner.
- Easier to implement, but not as efficient, as a separate pass.
- - Optimize for variable to VBX.
- B. Copy a chunk at a time
- 1. The first source for each chunk write in parallel, followed by the next.
- - okay if not all writing to a single large file.
- * Unreachable machines
- 1. Can I use message passing?
- 2. Mock up + test code [ need multi threading access ].
- 3. Implement an exists primitive.
- 4. How do I distinguish machines?
- * Main program needs to survive if slave nodes die.
- * Asynchronus calls + avoiding the thread switching for notifications?
- * Code for replicating parts
- - set up as a copy from fixed1 to fixed1, which partition matching sources.
- */
|