dfurun.cpp 78 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. /* todo
  14. test multiclusteradd
  15. test multiclusteradd with replicate
  16. */
  17. #include <platform.h>
  18. #include <stdio.h>
  19. #include <limits.h>
  20. #include "jexcept.hpp"
  21. #include "jptree.hpp"
  22. #include "jsocket.hpp"
  23. #include "jstring.hpp"
  24. #include "jmisc.hpp"
  25. #include "jprop.hpp"
  26. #include "daclient.hpp"
  27. #include "dadfs.hpp"
  28. #include "dautils.hpp"
  29. #include "dasds.hpp"
  30. #include "dasess.hpp"
  31. #include "dalienv.hpp"
  32. #include "daaudit.hpp"
  33. #include "jio.hpp"
  34. #include "daft.hpp"
  35. #include "daftcfg.hpp"
  36. #include "fterror.hpp"
  37. #include "rmtfile.hpp"
  38. #include "daftprogress.hpp"
  39. #include "dfuwu.hpp"
  40. #include "dfurun.hpp"
  41. #include "eventqueue.hpp"
  42. #include "wujobq.hpp"
  43. #include "dameta.hpp"
  44. #define SDS_CONNECT_TIMEOUT (5*60*100)
  45. extern ILogMsgHandler * fileMsgHandler;
  46. extern void doKeyDiff(IFileDescriptor *oldf,IFileDescriptor *newf,IFileDescriptor *patchf); // patchf out
  47. extern void doKeyPatch(IFileDescriptor *oldf,IFileDescriptor *newf,IFileDescriptor *patchf);
  48. static void LOGXML(const char *trc,const IPropertyTree *pt)
  49. {
  50. StringBuffer s;
  51. toXML(pt,s);
  52. PROGLOG("%s:\n%s",trc,s.str());
  53. }
  54. class CDFUengine: public CInterface, implements IDFUengine
  55. {
  56. StringBuffer dfuServerName;
  57. size32_t defaultTransferBufferSize;
  58. void setDefaultTransferBufferSize(size32_t size)
  59. {
  60. defaultTransferBufferSize = size;
  61. }
  62. void setDFUServerName(const char* name)
  63. {
  64. dfuServerName = name;
  65. }
  66. void Audit(const char *func,IUserDescriptor *userdesc,const char *lfn1, const char *lfn2)
  67. {
  68. SocketEndpoint ep;
  69. ep.setLocalHost(0);
  70. StringBuffer aln;
  71. aln.append(",FileAccess,DfuPlus,").append(func).append(',');
  72. ep.getUrlStr(aln);
  73. aln.append(',');
  74. if (userdesc)
  75. userdesc->getUserName(aln);
  76. if (lfn1&&*lfn1) {
  77. aln.append(',').append(lfn1);
  78. if (lfn2&&*lfn2) {
  79. aln.append(',').append(lfn2);
  80. }
  81. }
  82. LOG(MCauditInfo,"%s",aln.str());
  83. }
  84. class cProgressReporter : public DaftProgress
  85. {
  86. IDFUprogress *progress;
  87. unsigned start;
  88. public:
  89. enum { REPnone, REPbefore, REPduring } repmode;
  90. cProgressReporter(IDFUprogress *_progress)
  91. {
  92. progress = _progress;
  93. repmode = REPnone;
  94. start = msTick();
  95. }
  96. void displayProgress(unsigned percentDone, unsigned secsLeft, const char * timeLeft,
  97. unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
  98. unsigned kbPerSecondAve, unsigned kbPerSecondRate,
  99. unsigned slavesDone, unsigned __int64 numReads, unsigned __int64 numWrites)
  100. {
  101. if (repmode==REPbefore)
  102. percentDone /= 2;
  103. else
  104. if (repmode==REPduring)
  105. percentDone = percentDone/2+50;
  106. progress->setProgress(percentDone, secsLeft, timeLeft, scaledDone, scaledTotal, scale,
  107. kbPerSecondAve, kbPerSecondRate, slavesDone, repmode==REPduring, numReads, numWrites);
  108. }
  109. void displaySummary(const char * timeTaken, unsigned kbPerSecond)
  110. {
  111. // ignore time passed through - use own
  112. char tts[20];
  113. formatTime(tts, (msTick()-start)/1000);
  114. progress->setDone(tts, kbPerSecond, (repmode!=REPbefore));
  115. }
  116. void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned _totalNodes)
  117. {
  118. DaftProgress::setRange(sizeReadBefore,totalSize,_totalNodes);
  119. progress->setTotalNodes(_totalNodes);
  120. }
  121. void setFileAccessCost(double fileAccessCost)
  122. {
  123. progress->setFileAccessCost(fileAccessCost);
  124. }
  125. };
  126. class cAbortNotify : public CInterface, implements IAbortRequestCallback, implements IDFUabortSubscriber
  127. {
  128. bool abort;
  129. bool last;
  130. public:
  131. IMPLEMENT_IINTERFACE;
  132. cAbortNotify()
  133. {
  134. abort = false;
  135. last = abort;
  136. }
  137. virtual bool abortRequested()
  138. {
  139. if (abort&&!last)
  140. PROGLOG("ABORT checked");
  141. last = abort;
  142. return abort;
  143. }
  144. void notifyAbort()
  145. {
  146. if (!abort)
  147. PROGLOG("ABORT notified");
  148. abort = true;
  149. }
  150. };
  151. class cDFUlistener: public Thread
  152. {
  153. Owned<IJobQueue> queue;
  154. StringAttr queuename;
  155. unsigned timeout;
  156. bool ismon;
  157. CSDSServerStatus *serverstatus;
  158. protected:
  159. bool cancelling;
  160. CDFUengine *parent;
  161. void setRunningStatus(const char *qname,const char *wuid,bool set)
  162. {
  163. if (ismon||!serverstatus||!wuid||!*wuid) // monitor not in status
  164. return;
  165. StringBuffer mask;
  166. mask.appendf("Queue[@name=\"%s\"][1]",qname);
  167. IPropertyTree *t = serverstatus->queryProperties()->queryPropTree(mask.str());
  168. if (!t) {
  169. OWARNLOG("DFUWU: setRunningStatus queue %s not set",qname);
  170. return;
  171. }
  172. mask.clear().appendf("Job[@wuid=\"%s\"]",wuid);
  173. if (set&&!t->hasProp(mask.str())) {
  174. t->addPropTree("Job",createPTree())->setProp("@wuid",wuid);
  175. }
  176. else
  177. t->removeProp(mask.str());
  178. serverstatus->commitProperties();
  179. }
  180. public:
  181. cDFUlistener(CDFUengine *_parent,const char *_queuename, bool _ismon, CSDSServerStatus *_serverstatus, unsigned _timeout=WAIT_FOREVER)
  182. : queuename(_queuename)
  183. {
  184. serverstatus = _serverstatus;
  185. ismon = _ismon;
  186. timeout = _timeout;
  187. parent = _parent;
  188. queue.setown(createJobQueue(queuename));
  189. cancelling = false;
  190. }
  191. int run()
  192. {
  193. try {
  194. queue->connect(false);
  195. }
  196. catch (IException *e) {
  197. EXCLOG(e, "DFURUN Server Connect queue: ");
  198. e->Release();
  199. return -1;
  200. }
  201. const char *serv = ismon?"Monitor":"Server";
  202. try {
  203. // MemoryBuffer mb;
  204. unsigned start = msTick();
  205. if (ismon) {
  206. try {
  207. onCycle(); // first run
  208. }
  209. catch (IException *e) {
  210. EXCLOG(e, "DFURUN Monitor Exception(1): ");
  211. e->Release();
  212. }
  213. }
  214. for (;;) {
  215. unsigned char mode;
  216. StringAttr wuid;
  217. Owned<IJobQueueItem> item = queue->dequeue(timeout);
  218. if (item.get()) {
  219. wuid.set(item->queryWUID());
  220. if ((wuid.length()==0)||(stricmp(wuid,"!stop")==0))
  221. mode = DFUservermode_stop;
  222. else
  223. mode = DFUservermode_run;
  224. }
  225. else {
  226. if (cancelling||(timeout==WAIT_FOREVER)||isAborting())
  227. mode = DFUservermode_stop;
  228. else
  229. mode = DFUservermode_cycle;
  230. }
  231. try {
  232. switch ((DFUservermode)mode) {
  233. case DFUservermode_run: {
  234. PROGLOG("DFU %s running job: %s",serv,wuid.get());
  235. setRunningStatus(queuename.get(),wuid,true);
  236. try {
  237. parent->runWU(wuid);
  238. }
  239. catch (IException *) {
  240. setRunningStatus(queuename.get(),wuid,false);
  241. throw;
  242. }
  243. if (!ismon) {
  244. setRunningStatus(queuename.get(),wuid,false);
  245. PROGLOG("DFU %s finished job: %s",serv,wuid.get());
  246. }
  247. PROGLOG("DFU %s waiting on queue %s",serv,queuename.get());
  248. }
  249. break;
  250. case DFUservermode_stop:
  251. if (cancelling||(msTick()-start>5000))
  252. return 0;
  253. start = msTick(); // remove enqueued stops
  254. break;
  255. case DFUservermode_cycle:
  256. onCycle();
  257. break;
  258. default:
  259. OERRLOG("DFURUN Unknown mode");
  260. break;
  261. }
  262. }
  263. catch (IException *e) {
  264. EXCLOG(e, "DFURUN Server Exception(1): ");
  265. e->Release();
  266. }
  267. }
  268. }
  269. catch (IException *e) {
  270. EXCLOG(e, "DFURUN Server Exception(2): ");
  271. e->Release();
  272. PROGLOG("Exiting DFU Server");
  273. }
  274. try {
  275. queue->disconnect();
  276. }
  277. catch (IException *e) {
  278. EXCLOG(e, "DFURUN Server queue disconnect: ");
  279. e->Release();
  280. }
  281. return 0;
  282. }
  283. void cancel()
  284. {
  285. cancelling = true;
  286. queue->cancelAcceptConversation();
  287. }
  288. virtual void onCycle()
  289. {
  290. }
  291. };
  292. class cDFUmonitor: public cDFUlistener
  293. {
  294. public:
  295. cDFUmonitor(CDFUengine *_parent,const char *_queuename, CSDSServerStatus *serverstatus, unsigned _timeout)
  296. : cDFUlistener(_parent,_queuename,true,serverstatus,_timeout)
  297. {
  298. }
  299. virtual void onCycle()
  300. {
  301. parent->monitorCycle(cancelling);
  302. }
  303. };
  304. IRemoteConnection *setRunning(const char * path)
  305. {
  306. unsigned mode = RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_DELETE_ON_DISCONNECT;
  307. IRemoteConnection *runningconn = querySDS().connect(path, myProcessSession(), mode, SDS_CONNECT_TIMEOUT);
  308. if (runningconn) {
  309. runningconn->queryRoot()->setPropBool("", true);
  310. runningconn->commit();
  311. }
  312. return runningconn;
  313. }
  314. void setFileRepeatOptions(IDistributedFile &file,const char *cluster,bool repeatlast,bool onlyrepeated)
  315. {
  316. if (!cluster||!*cluster)
  317. return;
  318. StringBuffer dir;
  319. GroupType groupType;
  320. Owned<IGroup> grp = queryNamedGroupStore().lookup(cluster, dir, groupType);
  321. if (!grp) {
  322. throw MakeStringException(-1,"setFileRepeatOptions cluster %s not found",cluster);
  323. return;
  324. }
  325. ClusterPartDiskMapSpec spec;
  326. unsigned cn = file.findCluster(cluster);
  327. if (cn!=NotFound)
  328. spec = file.queryPartDiskMapping(cn);
  329. if (repeatlast)
  330. spec.setRepeatedCopies(file.numParts()-1,onlyrepeated);
  331. if (dir.length())
  332. spec.setDefaultBaseDir(dir.str());
  333. if (cn==NotFound)
  334. file.addCluster(cluster,spec);
  335. else
  336. file.updatePartDiskMapping(cluster,spec);
  337. }
  338. bool testLocalCluster(const char *groupname)
  339. {
  340. if (!groupname)
  341. return false;
  342. if (isdigit(*groupname)) { // allow IPs
  343. const char *s = groupname+1;
  344. while (*s) {
  345. if (!isdigit(*s)&&(*s!='.')&&(*s!='-')&&(*s!=',')&&(*s!=':'))
  346. break;
  347. s++;
  348. }
  349. if (!*s)
  350. return true;
  351. }
  352. Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
  353. if (!conn)
  354. return false;
  355. IPropertyTree* root = conn->queryRoot();
  356. Owned<IPropertyTreeIterator> clusters;
  357. clusters.setown(root->getElements("ThorCluster"));
  358. ForEach(*clusters) {
  359. StringBuffer thorClusterGroupName;
  360. getClusterGroupName(clusters->query(), thorClusterGroupName);
  361. if (strcmp(thorClusterGroupName.str(),groupname)==0)
  362. return true;
  363. }
  364. clusters.setown(root->getElements("RoxieCluster"));
  365. ForEach(*clusters) {
  366. IPropertyTree& cluster = clusters->query();
  367. if (strcmp(cluster.queryProp("@name"),groupname)==0)
  368. return true;
  369. Owned<IPropertyTreeIterator> farms = cluster.getElements("RoxieFarmProcess"); // probably only one but...
  370. ForEach(*farms) {
  371. IPropertyTree& farm = farms->query();
  372. StringBuffer fgname(cluster.queryProp("@name"));
  373. fgname.append("__");
  374. fgname.append(farm.queryProp("@name"));
  375. if (strcmp(fgname.str(),groupname)==0)
  376. return true;
  377. }
  378. }
  379. clusters.setown(root->getElements("EclAgentProcess"));
  380. ForEach(*clusters) {
  381. unsigned ins = 0;
  382. IPropertyTree &pt = clusters->query();
  383. const char *hgname = pt.queryProp("@name");
  384. if (hgname&&*hgname) {
  385. Owned<IPropertyTreeIterator> insts = pt.getElements("Instance");
  386. ForEach(*insts) {
  387. const char *na = insts->query().queryProp("@netAddress");
  388. if (na&&*na) {
  389. SocketEndpoint ep(na);
  390. if (!ep.isNull()) {
  391. ins++;
  392. StringBuffer gname("hthor__");
  393. if (memicmp(groupname,gname.str(),gname.length())==0)
  394. gname.append(groupname+gname.length());
  395. else
  396. gname.append(groupname);
  397. if (ins>1)
  398. gname.append('_').append(ins);
  399. if (strcmp(gname.str(),groupname)==0)
  400. return true;
  401. }
  402. }
  403. }
  404. }
  405. }
  406. return false;
  407. }
  408. // DropZone check
  409. void checkFilePath(RemoteFilename & filename)
  410. {
  411. StringBuffer filePath;
  412. filename.getLocalPath(filePath);
  413. const char * pfilePath = filePath.str();
  414. #ifdef _DEBUG
  415. LOG(MCdebugInfo, unknownJob, "File path is '%s'", filePath.str());
  416. #endif
  417. const char pathSep = filename.getPathSeparator();
  418. const char dotString[] = {pathSep, '.', pathSep, '\0'};
  419. const char dotDotString[] = {pathSep, '.', '.', pathSep, '\0'};
  420. const char * isDotString = strstr(pfilePath, dotString);
  421. const char * isDotDotString = strstr(pfilePath, dotDotString);
  422. if ((isDotDotString != nullptr) || (isDotString != nullptr))
  423. throwError3(DFTERR_InvalidFilePath, pfilePath, dotDotString, dotString);
  424. StringBuffer netaddress;
  425. filename.queryIP().getIpText(netaddress);
  426. #ifdef _CONTAINERIZED
  427. Owned<IPropertyTreeIterator> planes = getDropZonePlanesIterator();
  428. ForEach(*planes)
  429. {
  430. IPropertyTree & plane = planes->query();
  431. const char * fullDropZoneDir = plane.queryProp("@prefix");
  432. assertex(fullDropZoneDir);
  433. if (startsWith(pfilePath, fullDropZoneDir))
  434. return;
  435. }
  436. throwError1(DFTERR_NoMatchingDropzonePlane, pfilePath);
  437. #else
  438. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  439. Owned<IConstEnvironment> env = factory->openEnvironment();
  440. Owned<IConstDropZoneInfo> dropZone = env->getDropZoneByAddressPath(netaddress.str(), pfilePath);
  441. if (!dropZone)
  442. {
  443. if (env->isDropZoneRestrictionEnabled())
  444. throwError2(DFTERR_NoMatchingDropzonePath, netaddress.str(), pfilePath);
  445. else
  446. LOG(MCdebugInfo, unknownJob, "No matching drop zone path on '%s' to file path: '%s'", netaddress.str(), pfilePath);
  447. }
  448. #ifdef _DEBUG
  449. else
  450. {
  451. SCMStringBuffer dropZoneName;
  452. dropZone->getName(dropZoneName);
  453. LOG(MCdebugInfo, unknownJob, "Drop zone path '%s' is %svisible in ECLWatch."
  454. , dropZoneName.str()
  455. , (dropZone->isECLWatchVisible() ? "" : "not ")
  456. );
  457. }
  458. #endif
  459. #endif
  460. }
  461. // Prepare DropZone check for file(s)
  462. void checkSourceTarget(IFileDescriptor * file)
  463. {
  464. unsigned numParts = file->numParts();
  465. for (unsigned idx=0; idx < numParts; idx++)
  466. {
  467. if (file->isMulti(idx))
  468. {
  469. // It expands wildcards and file list
  470. RemoteMultiFilename multi;
  471. file->getMultiFilename(idx, 0, multi);
  472. multi.expandWild();
  473. ForEachItemIn(i, multi)
  474. {
  475. RemoteFilename rfn2(multi.item(i));
  476. checkFilePath(rfn2);
  477. }
  478. }
  479. else
  480. {
  481. RemoteFilename filename;
  482. file->getFilename(idx, 0, filename);
  483. checkFilePath(filename);
  484. }
  485. }
  486. }
  487. Owned<IScheduleEventPusher> eventpusher;
  488. IArrayOf<cDFUlistener> listeners;
  489. CriticalSection monitorsect;
  490. CriticalSection subcopysect;
  491. std::atomic<unsigned> runningflag;
  492. public:
  493. IMPLEMENT_IINTERFACE;
  494. CDFUengine()
  495. {
  496. defaultTransferBufferSize = 0;
  497. runningflag = 1;
  498. eventpusher.setown(getScheduleEventPusher());
  499. }
  500. ~CDFUengine()
  501. {
  502. abortListeners();
  503. joinListeners();
  504. }
  505. void startListener(const char *queuename,CSDSServerStatus *serverstatus)
  506. {
  507. PROGLOG("DFU server waiting on queue %s",queuename);
  508. cDFUlistener *lt = new cDFUlistener(this,queuename,false,serverstatus);
  509. listeners.append(*lt);
  510. lt->start();
  511. }
  512. void startMonitor(const char *queuename,CSDSServerStatus *serverstatus,unsigned timeout)
  513. {
  514. if (timeout==0)
  515. return;
  516. PROGLOG("DFU monitor waiting on queue %s timeout %d",queuename,timeout);
  517. cDFUlistener *lt = new cDFUmonitor(this,queuename,serverstatus,timeout);
  518. listeners.append(*lt);
  519. lt->start();
  520. }
  521. void joinListeners()
  522. {
  523. unsigned i;
  524. for (i=0;i<listeners.ordinality();i++)
  525. listeners.item(i).join();
  526. }
  527. void abortListeners()
  528. {
  529. unsigned i;
  530. for (i=0;i<listeners.ordinality();i++)
  531. listeners.item(i).cancel();
  532. }
  533. void checkPhysicalFilePermissions(IFileDescriptor *fd,IUserDescriptor *user,bool write)
  534. {
  535. unsigned auditflags = (DALI_LDAP_AUDIT_REPORT|DALI_LDAP_READ_WANTED);
  536. if (write)
  537. auditflags |= DALI_LDAP_WRITE_WANTED;
  538. SecAccessFlags perm = queryDistributedFileDirectory().getFDescPermissions(fd,user,auditflags);
  539. IDFS_Exception *e = NULL;
  540. if (!HASREADPERMISSION(perm))
  541. throw MakeStringException(DFSERR_LookupAccessDenied,"Lookup permission denied for physical file(s)");
  542. if (write&&!HASWRITEPERMISSION(perm))
  543. throw MakeStringException(DFSERR_CreateAccessDenied,"Create permission denied for physical file(s)");
  544. }
  545. void monitorCycle(bool &cancelling)
  546. {
  547. CriticalBlock block(monitorsect);
  548. // scan all monitoring WUs
  549. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  550. Owned<IConstDFUWorkUnitIterator> iter = factory->getWorkUnitsByState(DFUstate_monitoring);
  551. StringBuffer wuid;
  552. if (iter) {
  553. StringAttrArray eventstriggered;
  554. StringAttrArray eventsfile;
  555. ForEach(*iter) {
  556. if (cancelling)
  557. break;
  558. // check for other owner here TBD
  559. iter->getId(wuid.clear());
  560. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(wuid.str(),true);
  561. if (!wu)
  562. continue;
  563. IDFUmonitor *monitor = wu->queryUpdateMonitor();
  564. if (!monitor)
  565. continue;
  566. SocketEndpoint handler;
  567. INode *me = queryMyNode();
  568. if (monitor->getHandlerEp(handler)) {
  569. if (!me->endpoint().equals(handler)) {
  570. Owned<IGroup> grp = createIGroup(1,&handler);
  571. Owned<ICommunicator> comm = createCommunicator(grp,true);
  572. if (comm->verifyConnection(0,1000*60)) // shouldn't take long
  573. continue; // other handler running
  574. monitor->setHandlerEp(me->endpoint());
  575. }
  576. }
  577. else
  578. monitor->setHandlerEp(me->endpoint());
  579. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  580. {
  581. StringBuffer username;
  582. StringBuffer password;
  583. wu->getUser(username);
  584. wu->getPassword(password);
  585. userdesc->set(username.str(),password.str());
  586. }
  587. if (performMonitor(wu,monitor,wu->querySource(),false,&eventstriggered,&eventsfile,userdesc)) {
  588. wu->queryUpdateProgress()->setState(DFUstate_finished);
  589. }
  590. wu->commit();
  591. }
  592. pushEvents(eventstriggered,eventsfile);
  593. }
  594. }
  595. void pushEvents(StringAttrArray &eventstriggered, StringAttrArray &eventsfile)
  596. {
  597. ForEachItemIn(i,eventstriggered) {
  598. const char *ename = eventstriggered.item(i).text.get();
  599. const char *fname = eventsfile.item(i).text.get();
  600. bool dup = false;
  601. for (unsigned j=i;j>0;j--) { // n^2 but not many hopefully
  602. if ((stricmp(ename,eventstriggered.item(j-1).text.get())==0)&&
  603. (stricmp(fname,eventsfile.item(j-1).text.get())==0)) {
  604. dup = true;
  605. break;
  606. }
  607. }
  608. if (!dup) {
  609. eventpusher->push(ename,fname,NULL);
  610. PROGLOG("DFUMON Event Pushed: %s, %s",ename,fname);
  611. }
  612. }
  613. }
  614. bool performMonitor(IDFUWorkUnit *wu,IDFUmonitor *monitor,IConstDFUfileSpec *source, bool raiseexception, StringAttrArray *eventstriggered, StringAttrArray *eventsfile, IUserDescriptor *user)
  615. {
  616. bool sub = monitor->getSub();
  617. StringBuffer lfn;
  618. source->getLogicalName(lfn);
  619. StringAttrArray prev;
  620. StringAttrArray done;
  621. monitor->getTriggeredList(prev);
  622. monitor->setCycleCount(monitor->getCycleCount()+1);
  623. if (lfn.length()) { // no wild cards so only 0 or 1 prev
  624. if (queryDistributedFileDirectory().exists(lfn.str(),user)) {
  625. done.append(*new StringAttrItem(lfn.str()));
  626. bool isdone = ((prev.ordinality()!=0)&&
  627. (stricmp(prev.item(0).text.get(),lfn.str())==0));
  628. if (!isdone) {
  629. if (eventstriggered) {
  630. StringBuffer ename;
  631. monitor->getEventName(ename);
  632. if (!ename.length())
  633. ename.append("DfuLogicalFileMonitor");
  634. PROGLOG("MONITOR(%s): triggering event: %s, %s",wu->queryId(),ename.str(),lfn.str());
  635. eventstriggered->append(*new StringAttrItem(ename.str()));
  636. eventsfile->append(*new StringAttrItem(lfn.str()));
  637. unsigned shots = monitor->getShotCount()+1;
  638. monitor->setShotCount(shots);
  639. unsigned limit = monitor->getShotLimit();
  640. if (limit<1)
  641. limit = 1;
  642. unsigned pc = (shots*100)/limit;
  643. IDFUprogress *progress = wu->queryUpdateProgress();
  644. progress->setPercentDone(pc);
  645. if (shots>=limit) {
  646. PROGLOG("MONITOR(%s): Complete",wu->queryId());
  647. monitor->setTriggeredList(prev);
  648. return true; // all done
  649. }
  650. }
  651. }
  652. // could compare prev and done TBD
  653. monitor->setTriggeredList(done);
  654. }
  655. }
  656. else {
  657. Owned<IFileDescriptor> fdesc = source->getFileDescriptor();
  658. if (!fdesc)
  659. return false;
  660. StringBuffer path;
  661. StringBuffer dir;
  662. RemoteFilename rfn;
  663. if (fdesc->numParts()!=1) {
  664. OERRLOG("MONITOR: monitor file incorrectly specified");
  665. if (raiseexception)
  666. throw MakeStringException(-1,"MONITOR: monitor file incorrectly specified");
  667. return true;
  668. }
  669. fdesc->getFilename(0,0,rfn);
  670. rfn.getLocalPath(path);
  671. const char * filemask = splitDirTail(path.str(),dir);
  672. RemoteFilename dirfn;
  673. dirfn.setPath(rfn.queryEndpoint(),dir.str());
  674. Owned<IFile> dirf = createIFile(dirfn);
  675. if (!dirf||(dirf->isDirectory()!=fileBool::foundYes)) {
  676. OERRLOG("MONITOR: %s is not a directory in DFU WUID %s",dir.str(),wu->queryId());
  677. if (raiseexception)
  678. throw MakeStringException(-1,"MONITOR: %s is not a directory in DFU WUID %s",dir.str(),wu->queryId());
  679. return true;
  680. }
  681. Owned<IDirectoryIterator> iter = dirf->directoryFiles(filemask,sub);
  682. if (iter) {
  683. StringBuffer fname;
  684. StringBuffer fnamedate;
  685. CDateTime mod;
  686. ForEach(*iter) {
  687. fname.clear().append(iter->query().queryFilename()); // may need to adjust to match input
  688. fnamedate.clear().append(fname).append(';');
  689. iter->getModifiedTime(mod);
  690. mod.getString(fnamedate);
  691. done.append(*new StringAttrItem(fnamedate.str()));
  692. bool isdone = false;
  693. ForEachItemIn(i,prev) {
  694. if(strcmp(prev.item(i).text.get(),fnamedate.str())==0) {
  695. isdone = true;
  696. break;
  697. }
  698. }
  699. if (!isdone&&eventstriggered) {
  700. StringBuffer ename;
  701. monitor->getEventName(ename);
  702. if (!ename.length())
  703. ename.append("DfuFileMonitor");
  704. PROGLOG("MONITOR(%s): triggering event: %s, %s",wu->queryId(),ename.str(),fname.str());
  705. eventstriggered->append(*new StringAttrItem(ename.str()));
  706. eventsfile->append(*new StringAttrItem(fname.str()));
  707. unsigned shots = monitor->getShotCount()+1;
  708. monitor->setShotCount(shots);
  709. unsigned limit = monitor->getShotLimit();
  710. if (limit<1)
  711. limit = 1;
  712. unsigned pc = (shots*100)/limit;
  713. IDFUprogress *progress = wu->queryUpdateProgress();
  714. progress->setPercentDone(pc);
  715. if (shots>=limit) {
  716. monitor->setTriggeredList(done);
  717. PROGLOG("MONITOR(%s): Complete",wu->queryId());
  718. return true; // all done
  719. }
  720. }
  721. }
  722. // could compare prev and done TBD
  723. monitor->setTriggeredList(done);
  724. }
  725. }
  726. return false;
  727. }
  728. INode *getForeignDali(IConstDFUfileSpec *source)
  729. {
  730. SocketEndpoint ep;
  731. if (!source->getForeignDali(ep))
  732. return NULL;
  733. if (ep.port==0)
  734. ep.port= DALI_SERVER_PORT;
  735. return createINode(ep);
  736. }
  737. struct sSuperCopyContext
  738. {
  739. IDFUWorkUnitFactory *wufactory;
  740. IUserDescriptor *srcuser;
  741. IUserDescriptor *user;
  742. IConstDFUWorkUnit *superwu;
  743. IConstDFUfileSpec *superdestination;
  744. IConstDFUoptions *superoptions;
  745. IDFUprogress *superprogress;
  746. cProgressReporter *feedback;
  747. unsigned level;
  748. };
  749. void constructDestinationName(const char *dstlfn, const char *oldprefix, IConstDFUfileSpec *dest, CDfsLogicalFileName &dlfn, StringBuffer &roxieprefix)
  750. {
  751. dlfn.set(dstlfn);
  752. if (dlfn.isForeign()) // trying to confuse me again!
  753. throw MakeStringException(-1,"Destination cannot be foreign file");
  754. if (dest->getRoxiePrefix(roxieprefix).length()) {
  755. StringBuffer tmp;
  756. dstlfn = dlfn.get();
  757. tmp.append(roxieprefix).append("::");
  758. if (oldprefix&&*oldprefix) {
  759. size32_t l = strlen(oldprefix);
  760. if ((l+2<strlen(dstlfn))&&
  761. (memicmp(oldprefix,dstlfn,l)==0)&&
  762. (dstlfn[l]==':') && (dstlfn[l+1]==':'))
  763. dstlfn += l+2;
  764. }
  765. tmp.append(dstlfn);
  766. dlfn.set(tmp.str());
  767. }
  768. }
  769. bool doSubFileCopy(sSuperCopyContext &ctx,const char *dstlfn,INode *srcdali,const char *srclfn,StringAttr &wuid, bool iskey, const char *roxieprefix)
  770. {
  771. StringBuffer saveinprogress;
  772. {
  773. CriticalBlock block(subcopysect);
  774. Owned<IDFUWorkUnit> wu = ctx.wufactory->createWorkUnit();
  775. ctx.superprogress->getSubInProgress(saveinprogress);
  776. ctx.superprogress->setSubInProgress(wu->queryId());
  777. StringBuffer tmp;
  778. ctx.superwu->getClusterName(tmp);
  779. wu->setClusterName(tmp.str());
  780. ctx.superwu->getJobName(tmp.clear());
  781. wu->setJobName(tmp.str());
  782. ctx.superwu->getQueue(tmp.clear());
  783. wu->setQueue(tmp.str());
  784. if (ctx.user) {
  785. StringBuffer uname;
  786. ctx.user->getUserName(uname);
  787. wu->setUser(uname.str());
  788. StringBuffer pwd;
  789. wu->setPassword(ctx.user->getPassword(pwd).str());
  790. }
  791. IDFUfileSpec *source = wu->queryUpdateSource();
  792. IDFUfileSpec *destination = wu->queryUpdateDestination();
  793. IDFUoptions *options = wu->queryUpdateOptions();
  794. wu->setCommand(DFUcmd_copy);
  795. source->setLogicalName(srclfn);
  796. if (srcdali) {
  797. source->setForeignDali(srcdali->endpoint());
  798. if (ctx.srcuser) {
  799. StringBuffer uname;
  800. StringBuffer pwd;
  801. ctx.srcuser->getUserName(uname);
  802. ctx.srcuser->getPassword(pwd);
  803. source->setForeignUser(uname.str(),pwd.str());
  804. }
  805. }
  806. destination->setLogicalName(dstlfn);
  807. if (roxieprefix&&*roxieprefix)
  808. destination->setRoxiePrefix(roxieprefix);
  809. unsigned nc = ctx.superdestination->numClusters();
  810. for (unsigned i=0;i<nc;i++) {
  811. ctx.superdestination->getGroupName(i,tmp.clear());
  812. if (tmp.length()) {
  813. ClusterPartDiskMapSpec spec;
  814. if (ctx.superdestination->getClusterPartDiskMapSpec(tmp.str(),spec)) {
  815. destination->setClusterPartDiskMapSpec(tmp.str(),spec);
  816. }
  817. else
  818. destination->setGroupName(tmp.str());
  819. // StringBuffer basedir; // not needed as in spec
  820. // if (ctx.superdestination->getClusterPartDefaultBaseDir(tmp.str(),basedir))
  821. // destination->setClusterPartDefaultBaseDir(tmp.str(),basedir);
  822. }
  823. }
  824. options->setNoSplit(ctx.superoptions->getNoSplit());
  825. options->setOverwrite(ctx.superoptions->getOverwrite());
  826. options->setReplicate(ctx.superoptions->getReplicate());
  827. options->setNoRecover(ctx.superoptions->getNoRecover());
  828. options->setIfNewer(ctx.superoptions->getIfNewer());
  829. options->setIfModified(ctx.superoptions->getIfModified());
  830. options->setCrcCheck(ctx.superoptions->getCrcCheck());
  831. options->setmaxConnections(ctx.superoptions->getmaxConnections());
  832. options->setPush(ctx.superoptions->getPush());
  833. options->setRetry(ctx.superoptions->getRetry());
  834. options->setCrc(ctx.superoptions->getCrc());
  835. options->setThrottle(ctx.superoptions->getThrottle());
  836. options->setTransferBufferSize(ctx.superoptions->getTransferBufferSize());
  837. options->setVerify(ctx.superoptions->getVerify());
  838. StringBuffer slave;
  839. if (ctx.superoptions->getSlavePathOverride(slave))
  840. options->setSlavePathOverride(slave);
  841. options->setSubfileCopy(true);
  842. wu->queryUpdateProgress()->setState(DFUstate_queued); // well, good as
  843. // should be no need for overwrite
  844. wuid.set(wu->queryId());
  845. }
  846. if (wuid.isEmpty())
  847. return false;
  848. StringBuffer eps;
  849. PROGLOG("%s: Copy %s from %s to %s",wuid.get(),srclfn,srcdali?srcdali->endpoint().getUrlStr(eps).str():"(local)",dstlfn);
  850. DFUstate state = runWU(wuid);
  851. StringBuffer tmp;
  852. PROGLOG("%s: Done: %s",wuid.get(),encodeDFUstate(state,tmp).str());
  853. ctx.superprogress->setSubInProgress(saveinprogress);
  854. StringBuffer donewuids;
  855. ctx.superprogress->getSubDone(donewuids);
  856. if (donewuids.length())
  857. donewuids.append(',');
  858. donewuids.append(wuid.get());
  859. ctx.superprogress->setSubDone(donewuids.str());
  860. return (state==DFUstate_finished);
  861. }
  862. void doSuperForeignCopy(sSuperCopyContext &ctx,const char *dstlfn,INode *foreigndalinode,const char *srclfn, CDfsLogicalFileName &dlfn)
  863. {
  864. ctx.level++;
  865. Linked<INode> srcdali = foreigndalinode;
  866. CDfsLogicalFileName slfn;
  867. slfn.set(srclfn);
  868. if (slfn.isForeign()) { // trying to confuse me
  869. SocketEndpoint ep;
  870. slfn.getEp(ep);
  871. slfn.clearForeign();
  872. srcdali.setown(createINode(ep));
  873. }
  874. Owned<IPropertyTree> ftree = queryDistributedFileDirectory().getFileTree(srclfn,ctx.srcuser,srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign);
  875. if (!ftree.get()) {
  876. StringBuffer s;
  877. throw MakeStringException(-1,"Source file %s could not be found in Dali %s",slfn.get(),srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  878. }
  879. // now we can create name
  880. StringBuffer newroxieprefix;
  881. constructDestinationName(dstlfn,ftree->queryProp("Attr/@roxiePrefix"),ctx.superdestination,dlfn,newroxieprefix);
  882. if (!srcdali.get()||queryCoven().inCoven(srcdali)) {
  883. // if dali is local and filenames same
  884. if (strcmp(slfn.get(),dlfn.get())==0) {
  885. PROGLOG("File copy of %s not done as file local",slfn.get());
  886. ctx.level--;
  887. return;
  888. }
  889. }
  890. // first see if target exists (and remove if does and overwrite specified)
  891. Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(dlfn,ctx.user,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser);
  892. if (dfile) {
  893. if (!ctx.superoptions->getOverwrite())
  894. throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
  895. if (!dfile->querySuperFile())
  896. {
  897. if (ctx.superoptions->getIfModified()&&
  898. (ftree->hasProp("Attr/@fileCrc")&&ftree->getPropInt64("Attr/@size")&&
  899. ((unsigned)ftree->getPropInt64("Attr/@fileCrc")==(unsigned)dfile->queryAttributes().getPropInt64("@fileCrc"))&&
  900. (ftree->getPropInt64("Attr/@size")==dfile->getFileSize(false,false)))) {
  901. PROGLOG("File copy of %s not done as file unchanged",srclfn);
  902. return;
  903. }
  904. }
  905. dfile->detach();
  906. dfile.clear();
  907. }
  908. if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))==0) {
  909. StringAttr wuid;
  910. const char *kind = ftree->queryProp("@kind");
  911. bool iskey = kind&&(strcmp(kind,"key")==0);
  912. // note dstlfn doesn't have roxie prefix
  913. if (!doSubFileCopy(ctx,dstlfn,srcdali,srclfn,wuid,iskey,newroxieprefix.str()))
  914. throw MakeStringException(-1,"File %s could not be copied - see %s",dstlfn,wuid.isEmpty()?"unknown":wuid.get());
  915. }
  916. else if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_SuperFile))==0) {
  917. unsigned numtodo=0;
  918. StringArray subfiles;
  919. Owned<IPropertyTreeIterator> piter = ftree->getElements("SubFile");
  920. ForEach(*piter) {
  921. numtodo++;
  922. }
  923. unsigned numdone=0;
  924. ForEach(*piter) {
  925. const char *name = piter->query().queryProp("@name");
  926. CDfsLogicalFileName dlfnsub;
  927. dlfnsub.set(name);
  928. CDfsLogicalFileName dlfnres;
  929. doSuperForeignCopy(ctx,dlfnsub.get(true),foreigndalinode,name,dlfnres);
  930. numdone++;
  931. subfiles.append(dlfnres.get());
  932. if ((ctx.level==1)&&ctx.feedback)
  933. ctx.feedback->displayProgress(numtodo?(numdone*100/numtodo):0,0,"unknown",0,0,"",0,0,0,0,0);
  934. }
  935. // now construct the superfile
  936. Owned<IDistributedSuperFile> sfile = queryDistributedFileDirectory().createSuperFile(dlfn.get(),ctx.user,true,false);
  937. if (!sfile)
  938. throw MakeStringException(-1,"SuperFile %s could not be created",dlfn.get());
  939. ForEachItemIn(i,subfiles) {
  940. sfile->addSubFile(subfiles.item(i));
  941. }
  942. if (newroxieprefix.length()) {
  943. DistributedFilePropertyLock lock(sfile);
  944. lock.queryAttributes().setProp("@roxiePrefix",newroxieprefix.str());
  945. }
  946. }
  947. else {
  948. StringBuffer s;
  949. throw MakeStringException(-1,"Source file %s in Dali %s is not a file or superfile",srclfn,srcdali?srcdali->endpoint().getUrlStr(s).str():"(local)");
  950. }
  951. if ((ctx.level==1)&&ctx.feedback)
  952. ctx.feedback->displaySummary("0",0);
  953. ctx.level--;
  954. }
  955. void runSuperCopy(IConstDFUWorkUnit *wu, IConstDFUfileSpec *source,IConstDFUfileSpec *destination,
  956. IConstDFUoptions *options,IDFUprogress *progress,IUserDescriptor *userdesc,
  957. cProgressReporter &feedback)
  958. {
  959. Owned<IDFUWorkUnitFactory> wufactory = getDFUWorkUnitFactory();
  960. Owned<INode> foreigndalinode = getForeignDali(source);
  961. StringBuffer fu;
  962. StringBuffer fp;
  963. Owned<IUserDescriptor> foreignuserdesc;
  964. if (source->getForeignUser(fu,fp)) {
  965. foreignuserdesc.setown(createUserDescriptor());
  966. foreignuserdesc->set(fu.str(),fp.str());
  967. }
  968. else
  969. foreignuserdesc.set(userdesc);
  970. StringBuffer srcname;
  971. source->getLogicalName(srcname);
  972. if (!srcname.length())
  973. throw MakeStringException(-1,"Source file not specified");
  974. StringBuffer dstname;
  975. destination->getLogicalName(dstname);
  976. if (!dstname.length())
  977. throw MakeStringException(-1,"Destination not specified");
  978. sSuperCopyContext ctx;
  979. ctx.wufactory = wufactory;
  980. ctx.srcuser = foreignuserdesc;
  981. ctx.user = userdesc;
  982. ctx.superwu = wu;
  983. ctx.superdestination = destination;
  984. ctx.superoptions = options;
  985. ctx.superprogress = progress;
  986. ctx.feedback = &feedback;
  987. ctx.level = 0;
  988. ctx.superprogress->setSubInProgress("");
  989. ctx.superprogress->setSubDone("");
  990. CDfsLogicalFileName dlfn;
  991. doSuperForeignCopy(ctx,dstname.str(),foreigndalinode,srcname, dlfn);
  992. }
  993. DFUstate runWU(const char *dfuwuid)
  994. {
  995. StringBuffer runningpath;
  996. // only clear cache when nothing running (bit of a kludge)
  997. class CenvClear
  998. {
  999. std::atomic<unsigned> &running;
  1000. public:
  1001. CenvClear(std::atomic<unsigned> &_running)
  1002. : running(_running)
  1003. {
  1004. if (--running == 0) {
  1005. #ifndef _CONTAINERIZED
  1006. Owned<IEnvironmentFactory> envf = getEnvironmentFactory(false);
  1007. Owned<IConstEnvironment> env = envf->openEnvironment();
  1008. env->clearCache();
  1009. #endif
  1010. }
  1011. }
  1012. ~CenvClear()
  1013. {
  1014. ++running;
  1015. }
  1016. } cenvclear(runningflag);
  1017. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1018. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(dfuwuid,false);
  1019. if (!wu) {
  1020. OWARNLOG("DFURUN: Workunit %s not found",dfuwuid);
  1021. return DFUstate_unknown;
  1022. }
  1023. if (dfuServerName.length())
  1024. wu->setDFUServerName(dfuServerName.str());
  1025. StringBuffer logname;
  1026. if (fileMsgHandler && fileMsgHandler->getLogName(logname))
  1027. wu->setDebugValue("dfulog", logname.str(), true);
  1028. IConstDFUfileSpec *source = wu->querySource();
  1029. IConstDFUfileSpec *destination = wu->queryDestination();
  1030. IConstDFUoptions *options = wu->queryOptions();
  1031. Owned<IPropertyTree> opttree = createPTreeFromIPT(options->queryTree());
  1032. StringAttr encryptkey;
  1033. StringAttr decryptkey;
  1034. if (options->getEncDec(encryptkey,decryptkey)) {
  1035. opttree->setProp("@encryptKey",encryptkey);
  1036. opttree->setProp("@decryptKey",decryptkey);
  1037. }
  1038. IDFUprogress *progress = wu->queryUpdateProgress();
  1039. IDistributedFileDirectory &fdir = queryDistributedFileDirectory();
  1040. IDistributedFileSystem &fsys = queryDistributedFileSystem();
  1041. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  1042. Owned<IUserDescriptor> foreignuserdesc;
  1043. StringBuffer username;
  1044. {
  1045. StringBuffer password;
  1046. wu->getUser(username);
  1047. wu->getPassword(password);
  1048. userdesc->set(username.str(),password.str());
  1049. }
  1050. IPropertyTree *recovery;
  1051. IRemoteConnection *recoveryconn;
  1052. Owned<IRemoteConnection> runningconn;
  1053. wu->queryRecoveryStore(recoveryconn,recovery,runningpath.clear());
  1054. DFUstate s = progress->getState();
  1055. switch (s) {
  1056. case DFUstate_aborting:
  1057. case DFUstate_started: // not sure what this for
  1058. progress->setState(DFUstate_aborted);
  1059. /* no break */
  1060. case DFUstate_aborted:
  1061. IWARNLOG("DFURUN: Workunit %s aborted",dfuwuid);
  1062. return DFUstate_aborted;
  1063. case DFUstate_queued:
  1064. break;
  1065. default:
  1066. OWARNLOG("DFURUN: Workunit %s unexpected state %d",dfuwuid,(int)s);
  1067. return s;
  1068. }
  1069. bool replicating=false;
  1070. if (recovery&&recovery->getPropBool("@replicating",false))
  1071. replicating = true;
  1072. progress->setState(DFUstate_started);
  1073. Owned<IDFPartFilter> filter;
  1074. const char *fs = options->queryPartFilter();
  1075. if (fs)
  1076. filter.setown(createPartFilter(fs));
  1077. StringBuffer tmp;
  1078. cProgressReporter feedback(progress);
  1079. cAbortNotify abortnotify;
  1080. wu->subscribeAbort(&abortnotify);
  1081. bool iskey=false;
  1082. StringAttr kind;
  1083. bool multiclusterinsert = false;
  1084. bool multiclustermerge = false;
  1085. bool useserverreplicate = false;
  1086. Owned<IFileDescriptor> multifdesc;
  1087. Owned<IFileDescriptor> auxfdesc; // used for multicopy
  1088. Owned<IFileDescriptor> srcFdesc;
  1089. DFUstate finalstate = DFUstate_finished;
  1090. try {
  1091. DFUcmd cmd = wu->getCommand();
  1092. Owned<IDistributedFile> srcFile;
  1093. Owned<IDistributedFile> dstFile; // NB not attached
  1094. StringAttr dstName;
  1095. StringAttr srcName;
  1096. StringAttr diffNameSrc;
  1097. StringAttr diffNameDst;
  1098. Owned<INode> foreigndalinode;
  1099. StringAttr oldRoxiePrefix;
  1100. bool foreigncopy = false;
  1101. // first check for 'specials' (e.g. multi-cluster keydiff etc)
  1102. switch (cmd) {
  1103. case DFUcmd_copy:
  1104. {
  1105. source->getDiffKey(tmp.clear());
  1106. if (tmp.length())
  1107. diffNameSrc.set(tmp.str());
  1108. destination->getDiffKey(tmp.clear());
  1109. if (tmp.length())
  1110. diffNameDst.set(tmp.str());
  1111. source->getLogicalName(tmp.clear());
  1112. CDfsLogicalFileName srclfn;
  1113. if (tmp.length())
  1114. srclfn.set(tmp.str());
  1115. destination->getLogicalName(tmp.clear());
  1116. CDfsLogicalFileName dstlfn;
  1117. if (tmp.length())
  1118. dstlfn.set(tmp.str());
  1119. SocketEndpoint foreigndali;
  1120. if (srclfn.isSet()&&dstlfn.isSet()&&(strcmp(srclfn.get(),dstlfn.get())==0)&&(!source->getForeignDali(foreigndali))) {
  1121. if (!diffNameSrc.isEmpty()||!diffNameDst.isEmpty())
  1122. throw MakeStringException(-1,"Cannot add to multi-cluster file using keypatch");
  1123. multiclusterinsert = true;
  1124. }
  1125. break;
  1126. }
  1127. }
  1128. // now fill srcfile for commands that need
  1129. switch (cmd) {
  1130. case DFUcmd_copymerge:
  1131. multiclustermerge = true;
  1132. // fall through
  1133. case DFUcmd_copy:
  1134. case DFUcmd_move:
  1135. case DFUcmd_rename:
  1136. case DFUcmd_replicate:
  1137. case DFUcmd_export:
  1138. {
  1139. source->getLogicalName(tmp.clear());
  1140. if (!tmp.length())
  1141. throw MakeStringException(-1,"Source file not specified");
  1142. foreigncopy = false;
  1143. if ((cmd==DFUcmd_copy)||multiclustermerge)
  1144. {
  1145. foreigndalinode.setown(getForeignDali(source));
  1146. foreigncopy = foreigndalinode.get()!=NULL;
  1147. if (foreigncopy)
  1148. {
  1149. CDfsLogicalFileName lfn;
  1150. lfn.set(tmp);
  1151. lfn.setForeign(foreigndalinode->endpoint(),true);
  1152. tmp = lfn.get();
  1153. StringBuffer fu;
  1154. StringBuffer fp;
  1155. if (source->getForeignUser(fu,fp))
  1156. {
  1157. foreignuserdesc.setown(createUserDescriptor());
  1158. foreignuserdesc->set(fu.str(),fp.str());
  1159. }
  1160. else
  1161. foreignuserdesc.set(userdesc);
  1162. }
  1163. }
  1164. srcFile.setown(fdir.lookup(tmp.str(),userdesc,
  1165. (cmd==DFUcmd_move)||(cmd==DFUcmd_rename)||((cmd==DFUcmd_copy)&&multiclusterinsert) ? AccessMode::tbdWrite : AccessMode::tbdRead,
  1166. false,false,nullptr,true));
  1167. if (!srcFile)
  1168. throw MakeStringException(-1,"Source file %s could not be found",tmp.str());
  1169. srcName.set(tmp);
  1170. srcFdesc.setown(srcFile->getFileDescriptor());
  1171. iskey = isFileKey(srcFile);
  1172. if ((cmd==DFUcmd_copy) && (srcFile->querySuperFile() != nullptr) && (srcFile->querySuperFile()->numSubFiles() > 1) && iskey)
  1173. throwError1(DFTERR_InvalidSuperindexCopy, srcName.str());
  1174. oldRoxiePrefix.set(srcFile->queryAttributes().queryProp("@roxiePrefix"));
  1175. kind.set(srcFile->queryAttributes().queryProp("@kind"));
  1176. // keys default wrap for copy
  1177. if (destination->getWrap()||(iskey&&(cmd==DFUcmd_copy)))
  1178. destination->setNumPartsOverride(srcFile->numParts());
  1179. if (options->getSubfileCopy())
  1180. opttree->setPropBool("@compress",srcFile->isCompressed());
  1181. if (options->getNoCommon())
  1182. opttree->setPropBool("@noCommon", true);
  1183. if (foreigncopy)
  1184. {
  1185. if (options->getPush())
  1186. {
  1187. #ifdef _CONTAINERIZED
  1188. UNIMPLEMENTED_X("CONTAINERIZED(ForeignFileCopy:push)");
  1189. #else
  1190. // need to set ftslave location
  1191. StringBuffer progpath;
  1192. StringBuffer workdir;
  1193. INode *n = srcFdesc->queryNode(0);
  1194. if (n&&getRemoteRunInfo("FTSlaveProcess", "ftslave", NULL, n->endpoint(), progpath, workdir, foreigndalinode, 1000*60*5))
  1195. {
  1196. opttree->setProp("@slave",progpath.str());
  1197. }
  1198. #endif
  1199. }
  1200. }
  1201. if (destination->getMultiCopy()&&!destination->getWrap())
  1202. auxfdesc.setown(createMultiCopyFileDescriptor(srcFdesc,destination->getNumParts(0)));
  1203. }
  1204. break;
  1205. }
  1206. // fill dstfile for commands that need it
  1207. switch (cmd) {
  1208. case DFUcmd_copymerge:
  1209. case DFUcmd_copy:
  1210. case DFUcmd_move:
  1211. case DFUcmd_import:
  1212. case DFUcmd_add:
  1213. {
  1214. destination->getLogicalName(tmp.clear());
  1215. if (tmp.length())
  1216. {
  1217. CDfsLogicalFileName tmpdlfn;
  1218. StringBuffer newroxieprefix;
  1219. constructDestinationName(tmp.str(),oldRoxiePrefix,destination,tmpdlfn,newroxieprefix);
  1220. tmp.clear().append(tmpdlfn.get());
  1221. bool iswin;
  1222. if (!destination->getWindowsOS(iswin)) // would normally know!
  1223. {
  1224. // set default OS to cluster 0
  1225. Owned<IGroup> grp=destination->getGroup(0);
  1226. if (grp.get())
  1227. {
  1228. switch (queryOS(grp->queryNode(0).endpoint()))
  1229. {
  1230. case MachineOsW2K:
  1231. destination->setWindowsOS(true);
  1232. iswin = false;
  1233. break;
  1234. case MachineOsSolaris:
  1235. case MachineOsLinux:
  1236. destination->setWindowsOS(false);
  1237. iswin = false;
  1238. break;
  1239. };
  1240. }
  1241. }
  1242. #ifdef _CONTAINERIZED
  1243. StringBuffer clusterName;
  1244. destination->getGroupName(0, clusterName);
  1245. Owned<IPropertyTree> plane = getStoragePlane(clusterName);
  1246. if (plane)
  1247. {
  1248. if (plane->hasProp("@defaultSprayParts"))
  1249. destination->setNumPartsOverride(plane->getPropInt("@defaultSprayParts"));
  1250. }
  1251. #endif
  1252. if (destination->getWrap())
  1253. {
  1254. Owned<IFileDescriptor> fdesc = source?source->getFileDescriptor():NULL;
  1255. if (fdesc)
  1256. destination->setNumPartsOverride(fdesc->numParts());
  1257. }
  1258. if (options->getFailIfNoSourceFile())
  1259. opttree->setPropBool("@failIfNoSourceFile", true);
  1260. if (options->getRecordStructurePresent())
  1261. opttree->setPropBool("@recordStructurePresent", true);
  1262. opttree->setPropInt("@expireDays", options->getExpireDays());
  1263. opttree->setPropBool("@quotedTerminator", options->getQuotedTerminator());
  1264. opttree->setPropBool("@nosplit", options->getNoSplit());
  1265. Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
  1266. if (fdesc)
  1267. {
  1268. if (options->getSubfileCopy()) // need to set destination compressed or not
  1269. {
  1270. if (opttree->getPropBool("@compress"))
  1271. fdesc->queryProperties().setPropBool("@blockCompressed",true);
  1272. else
  1273. fdesc->queryProperties().removeProp("@blockCompressed");
  1274. }
  1275. if (!encryptkey.isEmpty())
  1276. {
  1277. fdesc->queryProperties().setPropBool("@encrypted",true);
  1278. fdesc->queryProperties().setPropBool("@blockCompressed",true);
  1279. }
  1280. else if (options->getPreserveCompression())
  1281. {
  1282. bool dstCompressed = false;
  1283. if (srcFile)
  1284. dstCompressed = srcFile->isCompressed();
  1285. else if (auxfdesc)
  1286. dstCompressed = auxfdesc->isCompressed();
  1287. if (dstCompressed)
  1288. fdesc->queryProperties().setPropBool("@blockCompressed",true);
  1289. }
  1290. if (multiclusterinsert)
  1291. {
  1292. if (foreigncopy)
  1293. throw MakeStringException(-1,"Cannot create multi cluster file in foreign file");
  1294. StringBuffer err;
  1295. if (!srcFile->checkClusterCompatible(*fdesc,err))
  1296. throw MakeStringException(-1,"Incompatible file for multicluster add - %s",err.str());
  1297. }
  1298. else if (multiclustermerge)
  1299. {
  1300. dstFile.setown(fdir.lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser));
  1301. if (!dstFile)
  1302. throw MakeStringException(-1,"Destination for merge %s does not exist",tmp.str());
  1303. StringBuffer err;
  1304. if (!dstFile->checkClusterCompatible(*fdesc,err))
  1305. throw MakeStringException(-1,"Incompatible file for multicluster merge - %s",err.str());
  1306. }
  1307. else
  1308. {
  1309. Owned<IDistributedFile> oldfile = fdir.lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser);
  1310. if (oldfile)
  1311. {
  1312. StringBuffer reason;
  1313. bool canRemove = oldfile->canRemove(reason);
  1314. oldfile.clear();
  1315. if (!canRemove)
  1316. throw MakeStringException(-1,"%s",reason.str());
  1317. if (!options->getOverwrite())
  1318. throw MakeStringException(-1,"Destination file %s already exists and overwrite not specified",tmp.str());
  1319. if (!fdir.removeEntry(tmp.str(),userdesc))
  1320. throw MakeStringException(-1,"Internal error in attempt to remove file %s",tmp.str());
  1321. }
  1322. }
  1323. StringBuffer jobname;
  1324. fdesc->queryProperties().setProp("@owner", username.str());
  1325. fdesc->queryProperties().setProp("@workunit", dfuwuid);
  1326. fdesc->queryProperties().setProp("@job", wu->getJobName(jobname).str());
  1327. StringBuffer tmpprefix;
  1328. if (newroxieprefix.length())
  1329. fdesc->queryProperties().setProp("@roxiePrefix", newroxieprefix.str());
  1330. if (iskey)
  1331. fdesc->queryProperties().setProp("@kind", "key");
  1332. else if (kind.length()) // JCSMORE may not really need separate "if (iskey)" line above
  1333. fdesc->queryProperties().setProp("@kind", kind);
  1334. if (multiclusterinsert||multiclustermerge)
  1335. multifdesc.setown(fdesc.getClear());
  1336. else
  1337. dstFile.setown(fdir.createNew(fdesc));
  1338. dstName.set(tmp.str());
  1339. }
  1340. }
  1341. if (!dstFile&&!multiclusterinsert)
  1342. {
  1343. throw MakeStringException(-1,"Destination file %s could not be created",tmp.str());
  1344. }
  1345. }
  1346. break;
  1347. }
  1348. if (defaultTransferBufferSize&&(opttree->getPropInt("@transferBufferSize",0)==0))
  1349. opttree->setPropInt("@transferBufferSize",defaultTransferBufferSize);
  1350. switch (cmd) {
  1351. case DFUcmd_none:
  1352. break;
  1353. case DFUcmd_copymerge:
  1354. case DFUcmd_copy:
  1355. {
  1356. if (!replicating)
  1357. {
  1358. Owned<IFileDescriptor> patchf;
  1359. Owned<IFileDescriptor> olddstf;
  1360. if (diffNameSrc.get()||diffNameDst.get())
  1361. {
  1362. Owned<IFileDescriptor> oldf;
  1363. oldf.setown(queryDistributedFileDirectory().getFileDescriptor(diffNameSrc,foreigncopy?foreignuserdesc:userdesc,foreigncopy?foreigndalinode:NULL));
  1364. if (!oldf.get())
  1365. {
  1366. StringBuffer s;
  1367. throw MakeStringException(-1,"Old key file %s could not be found in source",diffNameSrc.get());
  1368. }
  1369. olddstf.setown(queryDistributedFileDirectory().getFileDescriptor(diffNameDst,userdesc,NULL));
  1370. if (!olddstf.get())
  1371. {
  1372. StringBuffer s;
  1373. throw MakeStringException(-1,"Old key file %s could not be found in destination",diffNameDst.get());
  1374. }
  1375. patchf.setown(createFileDescriptor());
  1376. doKeyDiff(oldf,srcFdesc,patchf);
  1377. }
  1378. runningconn.setown(setRunning(runningpath.str()));
  1379. bool needrep = options->getReplicate();
  1380. ClusterPartDiskMapSpec mspec;
  1381. if (destination) {
  1382. if (destination->numClusters()==1) {
  1383. destination->getClusterPartDiskMapSpec(0,mspec);
  1384. if (!mspec.isReplicated())
  1385. needrep = false;
  1386. }
  1387. }
  1388. else if (multifdesc) {
  1389. if (multifdesc->numClusters()==1) {
  1390. if (!multifdesc->queryPartDiskMapping(0).isReplicated())
  1391. needrep = false;
  1392. }
  1393. }
  1394. if (needrep)
  1395. feedback.repmode=cProgressReporter::REPbefore;
  1396. if (foreigncopy)
  1397. checkPhysicalFilePermissions(srcFdesc,userdesc,false);
  1398. if (patchf) { // patch assumes only 1 cluster
  1399. // need to create dstpatchf
  1400. StringBuffer gname;
  1401. destination->getGroupName(0,gname);
  1402. if (!gname.length())
  1403. throw MakeStringException(-1,"No cluster specified for destination");
  1404. Owned<IGroup> grp = queryNamedGroupStore().lookup(gname.str());
  1405. if (!grp)
  1406. throw MakeStringException(-1,"Destination cluster %s not found",gname.str());
  1407. StringBuffer lname;
  1408. destination->getLogicalName(lname);
  1409. lname.append(".__patch__");
  1410. DFD_OS os;
  1411. switch (queryOS(grp->queryNode(0).endpoint())) {
  1412. case MachineOsW2K:
  1413. os = DFD_OSwindows; break;
  1414. case MachineOsSolaris:
  1415. case MachineOsLinux:
  1416. os = DFD_OSunix; break;
  1417. default:
  1418. os = DFD_OSdefault;
  1419. };
  1420. Owned<IFileDescriptor> dstpatchf = createFileDescriptor(lname.str(), gname.str(), patchf->numParts());
  1421. fsys.transfer(patchf, dstpatchf, NULL, NULL, NULL, opttree, &feedback, &abortnotify, dfuwuid);
  1422. removePartFiles(patchf);
  1423. Owned<IFileDescriptor> newf = dstFile->getFileDescriptor();
  1424. doKeyPatch(olddstf,newf,dstpatchf);
  1425. removePartFiles(dstpatchf);
  1426. if (!abortnotify.abortRequested()) {
  1427. if (needrep)
  1428. replicating = true;
  1429. else
  1430. dstFile->attach(dstName.get(), userdesc);
  1431. Audit("COPYDIFF",userdesc,srcName.get(),dstName.get());
  1432. }
  1433. }
  1434. else if (foreigncopy||auxfdesc)
  1435. {
  1436. IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : srcFdesc.get());
  1437. fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1438. if (!abortnotify.abortRequested())
  1439. {
  1440. if (needrep)
  1441. replicating = true;
  1442. else
  1443. dstFile->attach(dstName.get(), userdesc);
  1444. Audit("COPY",userdesc,srcName.get(),dstName.get());
  1445. }
  1446. }
  1447. else if (multiclusterinsert||multiclustermerge) {
  1448. fsys.exportFile(srcFile, multifdesc, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1449. if (!abortnotify.abortRequested()) {
  1450. if (needrep)
  1451. replicating = true;
  1452. else {
  1453. StringBuffer cname;
  1454. multifdesc->getClusterLabel(0,cname);
  1455. if (cname.length()==0)
  1456. multifdesc->getClusterGroupName(0,cname,&queryNamedGroupStore());
  1457. (multiclusterinsert?srcFile:dstFile)->addCluster(cname.str(),multifdesc->queryPartDiskMapping(0));
  1458. }
  1459. Audit(multiclusterinsert?"COPY":"COPYMERGE",userdesc,srcFile?srcName.str():NULL,dstName.get());
  1460. }
  1461. }
  1462. else {
  1463. fsys.copy(srcFile,dstFile,recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1464. if (!abortnotify.abortRequested()) {
  1465. if (needrep)
  1466. replicating = true;
  1467. else
  1468. dstFile->attach(dstName.get(),userdesc);
  1469. Audit("COPY",userdesc,srcFile?srcName.str():NULL,dstName.get());
  1470. }
  1471. }
  1472. runningconn.clear();
  1473. }
  1474. }
  1475. break;
  1476. case DFUcmd_remove:
  1477. {
  1478. source->getLogicalName(tmp.clear());
  1479. if (tmp.length()) {
  1480. runningconn.setown(setRunning(runningpath.str()));;
  1481. fdir.removeEntry(tmp.str(),userdesc);
  1482. Audit("REMOVE",userdesc,tmp.clear(),NULL);
  1483. runningconn.clear();
  1484. }
  1485. else {
  1486. throw MakeStringException(-1,"No target name specified for remove");
  1487. }
  1488. }
  1489. break;
  1490. case DFUcmd_move:
  1491. {
  1492. runningconn.setown(setRunning(runningpath.str()));
  1493. fsys.move(srcFile,dstFile,recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1494. runningconn.clear();
  1495. if (!abortnotify.abortRequested()) {
  1496. dstFile->attach(dstName.get(),userdesc);
  1497. Audit("MOVE",userdesc,srcFile?srcName.str():NULL,dstName.get());
  1498. }
  1499. }
  1500. break;
  1501. case DFUcmd_rename:
  1502. {
  1503. wu->subscribeAbort(NULL);
  1504. StringBuffer toname;
  1505. destination->getLogicalName(toname);
  1506. if (toname.length()) {
  1507. unsigned start = msTick();
  1508. Owned<IDistributedFile> newfile = fdir.lookup(toname.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser);
  1509. if (newfile) {
  1510. // check for rename into multicluster
  1511. CDfsLogicalFileName dstlfn;
  1512. dstlfn.set(toname.str());
  1513. if (dstlfn.getCluster(tmp).length()==0)
  1514. throw MakeStringException(-1,"Target %s already exists",toname.str());
  1515. }
  1516. newfile.clear();
  1517. StringBuffer fromname(srcName);
  1518. srcFile.clear();
  1519. srcFdesc.clear();
  1520. queryDistributedFileDirectory().renamePhysical(fromname.str(),toname.str(),userdesc,NULL);
  1521. StringBuffer timetaken;
  1522. timetaken.appendf("%dms",msTick()-start);
  1523. progress->setDone(timetaken.str(),0,true);
  1524. Audit("RENAME",userdesc,fromname.str(),toname.str());
  1525. }
  1526. else {
  1527. throw MakeStringException(-1,"No target name specified for rename");
  1528. }
  1529. }
  1530. break;
  1531. case DFUcmd_replicate:
  1532. {
  1533. runningconn.setown(setRunning(runningpath.str()));
  1534. DaftReplicateMode mode = DRMreplicatePrimary;
  1535. StringBuffer repcluster;
  1536. bool repeatlast;
  1537. bool onlyrepeated;
  1538. switch (options->getReplicateMode(repcluster,repeatlast,onlyrepeated)) {
  1539. case DFURMprimary:
  1540. mode = DRMreplicatePrimary;
  1541. break;
  1542. case DFURMsecondary:
  1543. mode = DRMreplicateSecondary;
  1544. break;
  1545. case DFURMmissing:
  1546. mode = DRMcreateMissing;
  1547. break;
  1548. }
  1549. setFileRepeatOptions(*srcFile,repcluster.str(),repeatlast,onlyrepeated);
  1550. srcFdesc->ensureReplicate();
  1551. fsys.replicate(srcFdesc.get(), mode, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1552. runningconn.clear();
  1553. if (!abortnotify.abortRequested()) {
  1554. Audit("REPLICATE",userdesc,srcFile?srcName.str():NULL,NULL);
  1555. // srcFile->queryPartDiskMapping(0).maxCopies = 2; // ** TBD ?
  1556. }
  1557. }
  1558. break;
  1559. case DFUcmd_import:
  1560. {
  1561. if (!replicating) {
  1562. runningconn.setown(setRunning(runningpath.str()));
  1563. Owned<IFileDescriptor> fdesc = source->getFileDescriptor();
  1564. checkPhysicalFilePermissions(fdesc,userdesc,false);
  1565. checkSourceTarget(fdesc);
  1566. bool needrep = options->getReplicate();
  1567. ClusterPartDiskMapSpec mspec;
  1568. if (destination) {
  1569. if (destination->numClusters()==1) {
  1570. destination->getClusterPartDiskMapSpec(0,mspec);
  1571. if (!mspec.isReplicated())
  1572. needrep = false;
  1573. }
  1574. #if !defined(_DEBUG) && !defined(_CONTAINERIZED)
  1575. StringBuffer gname;
  1576. if (!destination->getRemoteGroupOverride()&&!testLocalCluster(destination->getGroupName(0,gname).str())) {
  1577. throw MakeStringException(-1,"IMPORT cluster %s is not recognized locally",gname.str());
  1578. }
  1579. #endif
  1580. }
  1581. if (needrep)
  1582. feedback.repmode=cProgressReporter::REPbefore;
  1583. fsys.import(fdesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1584. if (!abortnotify.abortRequested())
  1585. {
  1586. if (needrep && !recovery->getPropBool("@noFileMatch"))
  1587. replicating = true;
  1588. else
  1589. dstFile->attach(dstName.get(), userdesc);
  1590. Audit("IMPORT",userdesc,dstName.get(),NULL);
  1591. }
  1592. runningconn.clear();
  1593. }
  1594. }
  1595. break;
  1596. case DFUcmd_export:
  1597. {
  1598. runningconn.setown(setRunning(runningpath.str()));
  1599. Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey);
  1600. checkPhysicalFilePermissions(fdesc,userdesc,true);
  1601. checkSourceTarget(fdesc);
  1602. fsys.exportFile(srcFile, fdesc, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1603. if (!abortnotify.abortRequested()) {
  1604. Audit("EXPORT",userdesc,srcFile?srcName.str():NULL,NULL);
  1605. }
  1606. runningconn.clear();
  1607. }
  1608. break;
  1609. case DFUcmd_add:
  1610. {
  1611. dstFile->attach(dstName.get(),userdesc);
  1612. Audit("ADD",userdesc,dstName.get(),NULL);
  1613. }
  1614. break;
  1615. case DFUcmd_transfer:
  1616. {
  1617. runningconn.setown(setRunning(runningpath.str()));
  1618. Owned<IFileDescriptor> srcdesc = source->getFileDescriptor();
  1619. checkPhysicalFilePermissions(srcdesc,userdesc,true);
  1620. Owned<IFileDescriptor> dstdesc = destination->getFileDescriptor();
  1621. checkPhysicalFilePermissions(dstdesc,userdesc,true);
  1622. fsys.transfer(srcdesc, dstdesc, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1623. if (!abortnotify.abortRequested()) {
  1624. Audit("TRANSFER",userdesc,NULL,NULL);
  1625. }
  1626. runningconn.clear();
  1627. }
  1628. break;
  1629. case DFUcmd_monitor:
  1630. {
  1631. CriticalBlock block(monitorsect);
  1632. // first check done when WU received
  1633. IDFUmonitor *monitor = wu->queryUpdateMonitor();
  1634. if (!monitor)
  1635. break;
  1636. INode *me = queryMyNode();
  1637. monitor->setHandlerEp(me->endpoint());
  1638. StringAttrArray eventstriggered;
  1639. StringAttrArray eventsfile;
  1640. if (performMonitor(wu,monitor,source,true,&eventstriggered,&eventsfile,userdesc))
  1641. finalstate = DFUstate_finished;
  1642. else
  1643. finalstate = DFUstate_monitoring;
  1644. pushEvents(eventstriggered,eventsfile);
  1645. }
  1646. break;
  1647. case DFUcmd_supercopy:
  1648. runSuperCopy(wu,source,destination,options,progress,userdesc,feedback);
  1649. break;
  1650. default:
  1651. throw MakeStringException(-1,"DFURUN: Unsupported command (%d)",(int)cmd);
  1652. }
  1653. if (replicating) {
  1654. switch (cmd) {
  1655. case DFUcmd_copymerge:
  1656. case DFUcmd_copy:
  1657. case DFUcmd_import:{
  1658. if (feedback.repmode==cProgressReporter::REPbefore)
  1659. feedback.repmode=cProgressReporter::REPduring;
  1660. runningconn.setown(setRunning(runningpath.str()));
  1661. Owned<IFileDescriptor> fdesc = multiclusterinsert?multifdesc.getLink():dstFile->getFileDescriptor();
  1662. DaftReplicateMode mode = DRMreplicatePrimary;
  1663. // bit of a kludge here until filecopy supports multi copies
  1664. for (unsigned i=fdesc->numParts();i>0;i--) {
  1665. if (fdesc->numCopies(i-1)>2) {
  1666. mode = DRMcreateMissing;
  1667. break;
  1668. }
  1669. }
  1670. wu->removeRecoveryStore();
  1671. wu->queryRecoveryStore(recoveryconn,recovery,runningpath.clear());
  1672. if (recoveryconn&&recovery) {
  1673. recovery->setPropBool("@replicating",true);
  1674. recoveryconn->commit();
  1675. }
  1676. fsys.replicate(fdesc.get(), mode, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
  1677. if (!abortnotify.abortRequested()) {
  1678. if (multiclusterinsert||multiclustermerge) {
  1679. StringBuffer cname;
  1680. multifdesc->getClusterLabel(0,cname);
  1681. if (cname.length()==0)
  1682. multifdesc->getClusterGroupName(0,cname,&queryNamedGroupStore());
  1683. (multiclusterinsert?srcFile:dstFile)->addCluster(cname.str(),multifdesc->queryPartDiskMapping(0));
  1684. }
  1685. else {
  1686. //dstFile->queryPartDiskMapping(0).maxCopies = 2; // dont think this is right ** TBD
  1687. dstFile->attach(dstName.get(),userdesc);
  1688. }
  1689. progress->setDone(NULL,0,true);
  1690. Audit("REPLICATE",userdesc,dstName.get(),NULL);
  1691. }
  1692. runningconn.clear();
  1693. }
  1694. break;
  1695. }
  1696. }
  1697. wu->removeRecoveryStore();
  1698. wu->subscribeAbort(NULL);
  1699. }
  1700. catch(IException *e) {
  1701. runningconn.clear();
  1702. wu->subscribeAbort(NULL);
  1703. wu->addException(e);
  1704. EXCLOG(e, "DFURUN Exception: ");
  1705. finalstate = DFUstate_failed;
  1706. }
  1707. if ((finalstate != DFUstate_aborted)&&abortnotify.abortRequested())
  1708. finalstate = DFUstate_aborted;
  1709. progress->setState(finalstate);
  1710. wu.clear();
  1711. return finalstate;
  1712. }
  1713. };
  1714. IDFUengine *createDFUengine()
  1715. {
  1716. return new CDFUengine;
  1717. }
  1718. void stopDFUserver(const char *qname)
  1719. {
  1720. Owned<IJobQueue> queue = createJobQueue(qname);
  1721. if (!queue.get()) {
  1722. throw MakeStringException(-1, "Cound not create queue");
  1723. }
  1724. IJobQueueItem *item = createJobQueueItem("!STOP");
  1725. item->setEndpoint(queryMyNode()->endpoint());
  1726. queue->enqueue(item);
  1727. }