ws_fsService.cpp 122 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. #pragma warning (disable : 4129)
  15. #include <math.h>
  16. #include "jsocket.hpp"
  17. #include "dasds.hpp"
  18. #include "dadfs.hpp"
  19. #include "dautils.hpp"
  20. #include "daclient.hpp"
  21. #include "wshelpers.hpp"
  22. #include "dfuwu.hpp"
  23. #include "workunit.hpp"
  24. #include "ws_fsService.hpp"
  25. #ifdef _WIN32
  26. #include "windows.h"
  27. #endif
  28. #include "dalienv.hpp"
  29. #include "dfuutil.hpp"
  30. #include "portlist.h"
  31. #include "sacmd.hpp"
  32. #include "exception_util.hpp"
  33. #define DFU_WU_URL "DfuWorkunitsAccess"
  34. #define DFU_EX_URL "DfuExceptionsAccess"
  35. #define FILE_SPRAY_URL "FileSprayAccess"
  36. #define FILE_DESPRAY_URL "FileDesprayAccess"
  37. #define WUDETAILS_REFRESH_MINS 1
  38. void SetResp(StringBuffer &resp, IConstDFUWorkUnit * wu, bool array);
  39. int Schedule::run()
  40. {
  41. try
  42. {
  43. PROGLOG("DfuWorkunit WUSchedule Thread started.");
  44. while(!stopping)
  45. {
  46. {
  47. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  48. Owned<IConstDFUWorkUnitIterator> itr = factory->getWorkUnitsByState(DFUstate_scheduled);
  49. itr->first();
  50. while(itr->isValid())
  51. {
  52. Owned<IConstDFUWorkUnit> wu = itr->get();
  53. CDateTime dt, now;
  54. now.setNow();
  55. try
  56. {
  57. wu->getTimeScheduled(dt);
  58. if (now.compare(dt) > 0)
  59. {
  60. StringAttr wuid(wu->queryId());
  61. wu.clear();
  62. submitDFUWorkUnit(wuid.get());
  63. }
  64. }
  65. catch(IException *e)
  66. {
  67. StringBuffer msg;
  68. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  69. e->Release();
  70. }
  71. itr->next();
  72. }
  73. }
  74. semSchedule.wait(1000*60);
  75. }
  76. }
  77. catch(IException *e)
  78. {
  79. StringBuffer msg;
  80. ERRLOG("Exception %d:%s in WS_FS Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  81. e->Release();
  82. }
  83. catch(...)
  84. {
  85. ERRLOG("Unknown exception in WS_FS Schedule::run");
  86. }
  87. return 0;
  88. }
  89. void CFileSprayEx::init(IPropertyTree *cfg, const char *process, const char *service)
  90. {
  91. StringBuffer xpath;
  92. xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/QueueLabel", process, service);
  93. cfg->getProp(xpath.str(), m_QueueLabel);
  94. StringArray qlist;
  95. getDFUServerQueueNames(qlist, nullptr);
  96. if (qlist.ordinality())
  97. {
  98. if (!m_QueueLabel.length())
  99. m_QueueLabel.append(qlist.item(0));
  100. else
  101. {
  102. bool found = false;
  103. ForEachItemIn(i, qlist)
  104. {
  105. const char* qname = qlist.item(i);
  106. if (qname && strieq(qname, m_QueueLabel.str()))
  107. {
  108. found = true;
  109. break;
  110. }
  111. }
  112. if (!found)
  113. throw MakeStringException(-1, "Invalid DFU Queue Label %s in configuration file", m_QueueLabel.str());
  114. }
  115. }
  116. xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/MonitorQueueLabel", process, service);
  117. cfg->getProp(xpath.str(), m_MonitorQueueLabel);
  118. xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/RootFolder", process, service);
  119. cfg->getProp(xpath.str(), m_RootFolder);
  120. directories.set(cfg->queryPropTree("Software/Directories"));
  121. StringBuffer prop;
  122. prop.appendf("queueLabel=%s", m_QueueLabel.str());
  123. PrintLog(prop.str());
  124. prop.clear();
  125. prop.appendf("monitorQueueLabel=%s", m_MonitorQueueLabel.str());
  126. PrintLog(prop.str());
  127. prop.clear();
  128. prop.appendf("rootFolder=%s", m_RootFolder.str());
  129. PrintLog(prop.str());
  130. if (!daliClientActive())
  131. {
  132. ERRLOG("No Dali Connection Active.");
  133. throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
  134. }
  135. m_sched.start();
  136. }
  137. StringBuffer& CFileSprayEx::getAcceptLanguage(IEspContext& context, StringBuffer& acceptLanguage)
  138. {
  139. context.getAcceptLanguage(acceptLanguage);
  140. if (!acceptLanguage.length())
  141. {
  142. acceptLanguage.set("en");
  143. return acceptLanguage;
  144. }
  145. acceptLanguage.setLength(2);
  146. VStringBuffer languageFile("%ssmc_xslt/nls/%s/hpcc.xml", getCFD(), acceptLanguage.str());
  147. if (!checkFileExists(languageFile.str()))
  148. acceptLanguage.set("en");
  149. return acceptLanguage;
  150. }
  151. void ParsePath(const char * fullPath, StringBuffer &ip, StringBuffer &filePath, StringBuffer &title)
  152. {
  153. ip.clear();
  154. filePath.clear();
  155. title.clear();
  156. if(fullPath == NULL || *fullPath == '\0')
  157. return;
  158. const char* ptr = fullPath;
  159. if(*ptr == '\\' && *(ptr+1) == '\\')
  160. {
  161. ptr += 2;
  162. while(*ptr != '\0' && *ptr != '\\')
  163. ptr++;
  164. ip.append(ptr - fullPath - 2, fullPath + 2);
  165. }
  166. filePath.append(ptr);
  167. ptr = fullPath + strlen(fullPath) - 1;
  168. while(ptr > fullPath && *ptr != '\\')
  169. ptr--;
  170. title.append(ptr + 1);
  171. }
  172. const char * const NODATETIME="1970-01-01T00:00:00Z";
  173. // Assign from a dfuwu workunit structure to an esp request workunit structure.
  174. static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWorkunit &dest)
  175. {
  176. if(src == NULL)
  177. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "'Source DFU workunit' doesn't exist.");
  178. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  179. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  180. Owned<IPropertyTree> root = &constEnv->getPTree();
  181. if (!root)
  182. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  183. double version = context.getClientVersion();
  184. StringBuffer tmp, encoded;
  185. dest.setID(src->queryId());
  186. if (src->getClusterName(tmp.clear()).length()!=0)
  187. {
  188. char *clusterName = (char *)tmp.str();
  189. if (clusterName && *clusterName)
  190. {
  191. StringBuffer clusterNameForDisplay(clusterName);
  192. Owned<IPropertyTreeIterator> clusters= root->getElements("Software/Topology/Cluster");
  193. if (clusters->first())
  194. {
  195. do {
  196. IPropertyTree &cluster = clusters->query();
  197. const char* name = cluster.queryProp("@name");
  198. if (!name || !*name)
  199. continue;
  200. Owned<IPropertyTreeIterator> thorClusters= cluster.getElements(eqThorCluster);
  201. Owned<IPropertyTreeIterator> roxieClusters= cluster.getElements(eqRoxieCluster);
  202. if (thorClusters->first() || roxieClusters->first())
  203. {
  204. if (thorClusters->first())
  205. {
  206. IPropertyTree &thorCluster = thorClusters->query();
  207. const char* process = thorCluster.queryProp("@process");
  208. if (process && *process)
  209. {
  210. if (clusterName && !stricmp(clusterName, process))
  211. {
  212. clusterNameForDisplay.clear().append(name);
  213. break;
  214. }
  215. }
  216. }
  217. if (roxieClusters->first())
  218. {
  219. IPropertyTree &roxieCluster = roxieClusters->query();
  220. const char* process = roxieCluster.queryProp("@process");
  221. if (process && *process)
  222. {
  223. if (clusterName && !stricmp(clusterName, name))
  224. {
  225. clusterNameForDisplay.clear().append(name);
  226. break;
  227. }
  228. }
  229. }
  230. }
  231. } while (clusters->next());
  232. }
  233. dest.setClusterName(clusterNameForDisplay.str());
  234. }
  235. }
  236. if ((version > 1.05) && src->getDFUServerName(tmp.clear()).length())
  237. dest.setDFUServerName(tmp.str());
  238. if (src->getJobName(tmp.clear()).length()!=0)
  239. dest.setJobName(tmp.str());
  240. else
  241. dest.setJobName("");
  242. if (src->getQueue(tmp.clear()).length()!=0)
  243. dest.setQueue(tmp.str());
  244. if (src->getUser(tmp.clear()).length()!=0)
  245. dest.setUser(tmp.str());
  246. dest.setIsProtected(src->isProtected());
  247. dest.setCommand(src->getCommand());
  248. IConstDFUprogress *prog = src->queryProgress();
  249. if (prog != NULL)
  250. {
  251. DFUstate state = prog->getState();
  252. dest.setState(state);
  253. StringBuffer statemsg;
  254. encodeDFUstate(state,statemsg);
  255. dest.setStateMessage(statemsg.str());
  256. CDateTime startAt;
  257. CDateTime stoppAt;
  258. prog->getTimeStarted(startAt);
  259. prog->getTimeStopped(stoppAt);
  260. StringBuffer tmpstr;
  261. startAt.getDateString(tmpstr);
  262. tmpstr.append(" ");
  263. startAt.getTimeString(tmpstr);
  264. dest.setTimeStarted(tmpstr.str());
  265. tmpstr.clear();
  266. stoppAt.getDateString(tmpstr);
  267. tmpstr.append(" ");
  268. stoppAt.getTimeString(tmpstr);
  269. dest.setTimeStopped(tmpstr.str());
  270. StringBuffer prgmsg;
  271. prog->formatProgressMessage(prgmsg);
  272. dest.setProgressMessage(prgmsg.str());
  273. prog->formatSummaryMessage(prgmsg.clear());
  274. dest.setSummaryMessage(prgmsg.str());
  275. unsigned secs = prog->getSecsLeft();
  276. if(secs > 0)
  277. dest.setSecsLeft(secs);
  278. dest.setPercentDone(prog->getPercentDone());
  279. }
  280. IConstDFUoptions *options = src->queryOptions();
  281. if(options)
  282. {
  283. dest.setReplicate(options->getReplicate());
  284. dest.setOverwrite(options->getOverwrite());
  285. }
  286. IConstDFUfileSpec * file = src->querySource();
  287. if (file != NULL)
  288. {
  289. //if (file->getTitle(tmp.clear()).length()!=0)
  290. // dest.setSourceTitle(tmp.str());
  291. StringBuffer lfn;
  292. file->getLogicalName(lfn);
  293. if (lfn.length() != 0)
  294. dest.setSourceLogicalName(lfn.str());
  295. else
  296. dest.setSourceFormat(file->getFormat());
  297. if (file->getRawDirectory(tmp.clear()).length()!=0)
  298. dest.setSourceDirectory(tmp.str());
  299. SocketEndpoint srcdali;
  300. StringBuffer srcdaliip;
  301. file->getForeignDali(srcdali);
  302. srcdali.getIpText(srcdaliip);
  303. if(srcdaliip.length() > 0 && strcmp(srcdaliip.str(), "0.0.0.0") != 0)
  304. dest.setSourceDali(srcdaliip.str());
  305. StringBuffer diffkeyname;
  306. file->getDiffKey(diffkeyname);
  307. if(diffkeyname.length() > 0)
  308. dest.setSourceDiffKeyName(diffkeyname.str());
  309. StringBuffer socket, dir, title;
  310. unsigned np = file->getNumParts(0); // should handle multiple clusters?
  311. if (lfn.length() == 0) { // no logical name
  312. if (np == 1)
  313. {
  314. Owned<IFileDescriptor> info;
  315. try
  316. {
  317. info.setown(file->getFileDescriptor());
  318. if(info)
  319. {
  320. Owned<INode> node = info->getNode(0);
  321. if (node)
  322. {
  323. node->endpoint().getIpText(socket);
  324. dest.setSourceIP(socket.str());
  325. }
  326. const char *defaultdir = info->queryDefaultDir();
  327. if (defaultdir&&*defaultdir)
  328. addPathSepChar(dir.append(defaultdir));
  329. file->getRawFileMask(dir);
  330. dest.setSourceFilePath(dir.str());
  331. }
  332. }
  333. catch(IException *e)
  334. {
  335. EXCLOG(e,"DeepAssign getFileDescriptor");
  336. e->Release();
  337. }
  338. }
  339. }
  340. if (np)
  341. dest.setSourceNumParts(np);
  342. unsigned rs = file->getRecordSize();
  343. if (rs)
  344. dest.setSourceRecordSize(rs);
  345. StringBuffer rowtag;
  346. file->getRowTag(rowtag);
  347. if(rowtag.length() > 0)
  348. dest.setRowTag(rowtag.str());
  349. if (version >= 1.04 && (file->getFormat() == DFUff_csv))
  350. {
  351. StringBuffer separate, terminate, quote, escape;
  352. bool quotedTerminator;
  353. file->getCsvOptions(separate,terminate,quote, escape, quotedTerminator);
  354. if(separate.length() > 0)
  355. dest.setSourceCsvSeparate(separate.str());
  356. if(terminate.length() > 0)
  357. dest.setSourceCsvTerminate(terminate.str());
  358. if(quote.length() > 0)
  359. dest.setSourceCsvQuote(quote.str());
  360. if((version >= 1.05) && (escape.length() > 0))
  361. dest.setSourceCsvEscape(escape.str());
  362. if(version >=1.10)
  363. dest.setQuotedTerminator(quotedTerminator);
  364. }
  365. }
  366. file = src->queryDestination();
  367. if (file != NULL)
  368. {
  369. StringBuffer lfn;
  370. file->getLogicalName(lfn);
  371. if (lfn.length() != 0)
  372. dest.setDestLogicalName(lfn.str());
  373. else
  374. dest.setDestFormat(file->getFormat());
  375. if (file->getRawDirectory(tmp.clear()).length()!=0)
  376. dest.setDestDirectory(tmp.str());
  377. if (file->getGroupName(0,tmp.clear()).length()!=0) // should handle multiple clusters?
  378. {
  379. char *clusterName = (char *)tmp.str();
  380. if (clusterName)
  381. dest.setDestGroupName(clusterName);
  382. }
  383. StringBuffer socket, dir, title;
  384. unsigned np = file->getNumParts(0); // should handle multiple clusters?
  385. if (lfn.length() == 0) { // no logical name
  386. if (np == 1)
  387. {
  388. Owned<IFileDescriptor> info;
  389. try
  390. {
  391. info.setown(file->getFileDescriptor());
  392. if(info)
  393. {
  394. Owned<INode> node = info->getNode(0);
  395. if (node)
  396. {
  397. node->endpoint().getIpText(socket);
  398. dest.setDestIP(socket.str());
  399. }
  400. const char *defaultdir = info->queryDefaultDir();
  401. if (defaultdir&&*defaultdir)
  402. addPathSepChar(dir.append(defaultdir));
  403. file->getRawFileMask(dir);
  404. dest.setDestFilePath(dir.str());
  405. }
  406. }
  407. catch(IException *e)
  408. {
  409. EXCLOG(e,"DeepAssign getFileDescriptor dest");
  410. e->Release();
  411. }
  412. }
  413. }
  414. if (np)
  415. dest.setDestNumParts(np);
  416. unsigned rs = file->getRecordSize();
  417. if (rs)
  418. dest.setDestRecordSize(rs);
  419. dest.setCompress(file->isCompressed());
  420. }
  421. // monitor stuff
  422. IConstDFUmonitor *monitor = src->queryMonitor();
  423. if (monitor) {
  424. monitor->getEventName(tmp.clear());
  425. if (tmp.length())
  426. dest.setMonitorEventName(tmp.str());
  427. bool sub = monitor->getSub();
  428. dest.setMonitorSub(sub);
  429. unsigned sl = monitor->getShotLimit();
  430. if (sl)
  431. dest.setMonitorShotLimit(sl);
  432. }
  433. }
  434. bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* groupName, const char* cluster,
  435. StringBuffer &folder, StringBuffer &title, StringBuffer &defaultFolder, StringBuffer &defaultReplicateFolder)
  436. {
  437. if(!pLogicalPath || !*pLogicalPath)
  438. return false;
  439. folder.clear();
  440. title.clear();
  441. defaultFolder.clear();
  442. defaultReplicateFolder.clear();
  443. DFD_OS os = DFD_OSdefault;
  444. if(groupName != NULL && *groupName != '\0')
  445. {
  446. StringBuffer basedir;
  447. GroupType groupType;
  448. Owned<IGroup> group = queryNamedGroupStore().lookup(groupName, basedir, groupType);
  449. if (group) {
  450. switch (queryOS(group->queryNode(0).endpoint())) {
  451. case MachineOsW2K:
  452. os = DFD_OSwindows; break;
  453. case MachineOsSolaris:
  454. case MachineOsLinux:
  455. os = DFD_OSunix; break;
  456. }
  457. if (directories.get())
  458. {
  459. switch (groupType)
  460. {
  461. case grp_roxie:
  462. getConfigurationDirectory(directories, "data", "roxie", cluster, defaultFolder);
  463. getConfigurationDirectory(directories, "data2", "roxie", cluster, defaultReplicateFolder);
  464. // MORE - should extend to systems with higher redundancy
  465. break;
  466. case grp_hthor:
  467. getConfigurationDirectory(directories, "data", "hthor", cluster, defaultFolder);
  468. break;
  469. case grp_thor:
  470. default:
  471. getConfigurationDirectory(directories, "data", "thor", cluster, defaultFolder);
  472. getConfigurationDirectory(directories, "mirror", "thor", cluster, defaultReplicateFolder);
  473. }
  474. }
  475. }
  476. else
  477. {
  478. // Error here?
  479. }
  480. }
  481. makePhysicalPartName(pLogicalPath,0,0,folder,false,os,defaultFolder.str());
  482. const char *n = pLogicalPath;
  483. const char* p;
  484. do {
  485. p = strstr(n,"::");
  486. if(p)
  487. n = p+2;
  488. } while(p);
  489. title.append(n);
  490. return true;
  491. }
  492. bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, StringBuffer &title)
  493. {
  494. if(!pLogicalPath || !*pLogicalPath)
  495. return false;
  496. title.clear();
  497. const char *n = pLogicalPath;
  498. const char* p;
  499. do {
  500. p = strstr(n,"::");
  501. if(p)
  502. n = p+2;
  503. } while(p);
  504. title.append(n);
  505. return true;
  506. }
  507. void setRoxieClusterPartDiskMapping(const char *clusterName, const char *defaultFolder, const char *defaultReplicateFolder,
  508. bool supercopy, IDFUfileSpec *wuFSpecDest, IDFUoptions *wuOptions)
  509. {
  510. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  511. envFactory->validateCache();
  512. StringBuffer dirxpath;
  513. dirxpath.appendf("Software/RoxieCluster[@name=\"%s\"]",clusterName);
  514. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  515. Owned<IPropertyTree> pEnvRoot = &constEnv->getPTree();
  516. Owned<IPropertyTreeIterator> processes = pEnvRoot->getElements(dirxpath);
  517. if (!processes->first())
  518. {
  519. DBGLOG("Failed to get RoxieCluster settings");
  520. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "Failed to get RoxieCluster settings. The workunit will not be created.");
  521. }
  522. IPropertyTree &process = processes->query();
  523. const char *slaveConfig = process.queryProp("@slaveConfig");
  524. if (!slaveConfig || !*slaveConfig)
  525. {
  526. DBGLOG("Failed to get RoxieCluster settings");
  527. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "Failed to get RoxieCluster settings. The workunit will not be created.");
  528. }
  529. bool replicate = false;
  530. unsigned redundancy = 0; // Number of "spare" copies of the data
  531. unsigned channelsPerNode = 1; // Overloaded and cyclic modes
  532. int replicateOffset = 1; // Used In cyclic mode only
  533. unsigned numDataCopies = process.getPropInt("@numDataCopies", 1);
  534. ClusterPartDiskMapSpec spec;
  535. spec.setDefaultBaseDir(defaultFolder);
  536. if (strieq(slaveConfig, "overloaded"))
  537. {
  538. channelsPerNode = process.getPropInt("@channelsPernode", 1);
  539. spec.setDefaultReplicateDir(defaultReplicateFolder);
  540. }
  541. else if (strieq(slaveConfig, "full redundancy"))
  542. {
  543. redundancy = numDataCopies-1;
  544. replicateOffset = 0;
  545. replicate = true;
  546. }
  547. else if (strieq(slaveConfig, "cyclic redundancy"))
  548. {
  549. redundancy = numDataCopies-1;
  550. channelsPerNode = numDataCopies;
  551. replicateOffset = process.getPropInt("@cyclicOffset", 1);
  552. spec.setDefaultReplicateDir(defaultReplicateFolder);
  553. replicate = true;
  554. }
  555. spec.setRoxie (redundancy, channelsPerNode, replicateOffset);
  556. if (!supercopy)
  557. spec.setRepeatedCopies(CPDMSRP_lastRepeated,false);
  558. wuFSpecDest->setClusterPartDiskMapSpec(clusterName,spec);
  559. wuOptions->setReplicate(replicate);
  560. }
  561. StringBuffer& getNodeGroupFromLFN(StringBuffer& nodeGroup, const char* lfn, const char* username, const char* passwd)
  562. {
  563. Owned<IUserDescriptor> udesc;
  564. if(username != NULL && *username != '\0')
  565. {
  566. udesc.setown(createUserDescriptor());
  567. udesc->set(username, passwd);
  568. }
  569. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(lfn, udesc);
  570. if (!df)
  571. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed to find file: %s", lfn);
  572. return df->getClusterGroupName(0, nodeGroup);
  573. }
  574. StringBuffer& constructFileMask(const char* filename, StringBuffer& filemask)
  575. {
  576. filemask.clear().append(filename).toLowerCase().append("._$P$_of_$N$");
  577. return filemask;
  578. }
  579. bool CFileSprayEx::onDFUWUSearch(IEspContext &context, IEspDFUWUSearchRequest & req, IEspDFUWUSearchResponse & resp)
  580. {
  581. try
  582. {
  583. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false))
  584. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied.");
  585. StringArray dfuclusters;
  586. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  587. Owned<IConstEnvironment> environment = factory->openEnvironment();
  588. Owned<IPropertyTree> root = &environment->getPTree();
  589. if (!root)
  590. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  591. Owned<IPropertyTreeIterator> clusterIterator = root->getElements("Software/Topology/Cluster");
  592. if (clusterIterator->first())
  593. {
  594. do {
  595. IPropertyTree &cluster = clusterIterator->query();
  596. const char *clusterName = cluster.queryProp("@name");
  597. if (!clusterName || !*clusterName)
  598. continue;
  599. dfuclusters.append(clusterName);
  600. } while (clusterIterator->next());
  601. }
  602. resp.setClusterNames(dfuclusters);
  603. }
  604. catch(IException* e)
  605. {
  606. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  607. }
  608. return true;
  609. }
  610. int readFromCommaSeparatedString(const char *commaSeparatedString, StringBuffer* output)
  611. {
  612. int numOfItems = 0;
  613. if (commaSeparatedString && *commaSeparatedString)
  614. {
  615. char *pStr = (char *) commaSeparatedString;
  616. while (pStr)
  617. {
  618. char item[1024];
  619. bool bFoundComma = false;
  620. int len = strlen(pStr);
  621. for (int i = 0; i < len; i++)
  622. {
  623. char *pStr1 = pStr + i;
  624. if (pStr1[0] != ',')
  625. continue;
  626. strncpy(item, pStr, pStr1 - pStr);
  627. item[pStr1 - pStr] = 0;
  628. bFoundComma = true;
  629. if (i < len - 1)
  630. pStr = pStr1 + 1;
  631. else
  632. pStr = NULL;
  633. break;
  634. }
  635. if (!bFoundComma && len > 0)
  636. {
  637. strcpy(item, pStr);
  638. pStr = NULL;
  639. }
  640. output[numOfItems] = item;
  641. numOfItems++;
  642. }
  643. }
  644. return numOfItems;
  645. }
  646. bool CFileSprayEx::GetArchivedDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits &req, IEspGetDFUWorkunitsResponse &resp)
  647. {
  648. StringBuffer user;
  649. context.getUserID(user);
  650. StringBuffer sashaAddress;
  651. IArrayOf<IConstTpSashaServer> sashaservers;
  652. CTpWrapper dummy;
  653. dummy.getTpSashaServers(sashaservers);
  654. ForEachItemIn(i, sashaservers)
  655. {
  656. IConstTpSashaServer& sashaserver = sashaservers.item(i);
  657. IArrayOf<IConstTpMachine> &sashaservermachine = sashaserver.getTpMachines();
  658. sashaAddress.append(sashaservermachine.item(0).getNetaddress());
  659. }
  660. SocketEndpoint ep;
  661. ep.set(sashaAddress,DEFAULT_SASHA_PORT);
  662. Owned<INode> sashaserver = createINode(ep);
  663. __int64 count=req.getPageSize();
  664. if(count < 1)
  665. count=100;
  666. __int64 begin=req.getPageStartFrom();
  667. if (begin < 0)
  668. begin = 0;
  669. Owned<ISashaCommand> cmd = createSashaCommand();
  670. cmd->setAction(SCA_LIST);
  671. cmd->setOnline(false);
  672. cmd->setArchived(true);
  673. cmd->setDFU(true);
  674. cmd->setLimit((int) count+1);
  675. cmd->setStart((int)begin);
  676. if(req.getCluster() && *req.getCluster())
  677. cmd->setCluster(req.getCluster());
  678. if(req.getOwner() && *req.getOwner())
  679. cmd->setOwner(req.getOwner());
  680. if(req.getJobname() && *req.getJobname())
  681. cmd->setJobName(req.getJobname());
  682. if(req.getStateReq() && *req.getStateReq())
  683. cmd->setState(req.getStateReq());
  684. cmd->setOutputFormat("owner,jobname,cluster,state,command");//date range/owner/jobname/state*/
  685. if (!cmd->send(sashaserver))
  686. {
  687. StringBuffer msg;
  688. msg.appendf("Cannot connect to archive server at %s",sashaAddress.str());
  689. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str());
  690. }
  691. IArrayOf<IEspDFUWorkunit> results;
  692. __int64 actualCount = cmd->numIds();
  693. StringBuffer s;
  694. for (unsigned j=0;j<actualCount;j++)
  695. {
  696. const char *wuidStr = cmd->queryId(j);
  697. if (!wuidStr)
  698. continue;
  699. StringBuffer strArray[6];
  700. readFromCommaSeparatedString(wuidStr, strArray);
  701. //skip any workunits without access
  702. Owned<IEspDFUWorkunit> resultWU = createDFUWorkunit("", "");
  703. resultWU->setArchived(true);
  704. if (strArray[0].length() > 0)
  705. resultWU->setID(strArray[0].str());
  706. if (strArray[1].length() > 0)
  707. resultWU->setUser(strArray[1].str());
  708. if (strArray[2].length() > 0)
  709. resultWU->setJobName(strArray[2].str());
  710. if (strArray[3].length() > 0)
  711. resultWU->setClusterName(strArray[3].str());
  712. if (strArray[4].length() > 0)
  713. resultWU->setStateMessage(strArray[4].str());
  714. if (strArray[5].length() > 0)
  715. resultWU->setCommand(atoi(strArray[5].str()));
  716. results.append(*resultWU.getLink());
  717. }
  718. resp.setPageStartFrom(begin+1);
  719. resp.setNextPage(-1);
  720. if(count < actualCount)
  721. {
  722. if (results.length() > count)
  723. {
  724. results.pop();
  725. }
  726. resp.setNextPage(begin + count);
  727. resp.setPageEndAt(begin + count);
  728. }
  729. else
  730. {
  731. resp.setPageEndAt(begin + actualCount);
  732. }
  733. if(begin > 0)
  734. {
  735. resp.setFirst(false);
  736. if (begin - count > 0)
  737. resp.setPrevPage(begin - count);
  738. else
  739. resp.setPrevPage(0);
  740. }
  741. resp.setPageSize(count);
  742. resp.setResults(results);
  743. StringBuffer basicQuery;
  744. if (req.getStateReq() && *req.getStateReq())
  745. {
  746. resp.setStateReq(req.getStateReq());
  747. addToQueryString(basicQuery, "StateReq", req.getStateReq());
  748. }
  749. if (req.getCluster() && *req.getCluster())
  750. {
  751. resp.setCluster(req.getCluster());
  752. addToQueryString(basicQuery, "Cluster", req.getCluster());
  753. }
  754. if (req.getOwner() && *req.getOwner())
  755. {
  756. resp.setOwner(req.getOwner());
  757. addToQueryString(basicQuery, "Owner", req.getOwner());
  758. }
  759. if (req.getType() && *req.getType())
  760. {
  761. resp.setType(req.getType());
  762. addToQueryString(basicQuery, "Type", req.getType());
  763. }
  764. resp.setFilters(basicQuery.str());
  765. resp.setBasicQuery(basicQuery.str());
  766. return true;
  767. }
  768. bool CFileSprayEx::getOneDFUWorkunit(IEspContext& context, const char* wuid, IEspGetDFUWorkunitsResponse& resp)
  769. {
  770. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  771. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(wuid, false);
  772. if (!wu)
  773. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Dfu workunit %s not found.", wuid);
  774. Owned<IEspDFUWorkunit> resultWU = createDFUWorkunit();
  775. resultWU->setID(wuid);
  776. resultWU->setCommand(wu->getCommand());
  777. resultWU->setIsProtected(wu->isProtected());
  778. StringBuffer jobname, user, cluster;
  779. resultWU->setJobName(wu->getJobName(jobname).str());
  780. resultWU->setUser(wu->getUser(user).str());
  781. const char* clusterName = wu->getClusterName(cluster).str();
  782. if (clusterName && *clusterName)
  783. {
  784. Owned<IStringIterator> targets = getTargetClusters(NULL, clusterName);
  785. if (!targets->first())
  786. resultWU->setClusterName(clusterName);
  787. else
  788. {
  789. SCMStringBuffer targetCluster;
  790. targets->str(targetCluster);
  791. resultWU->setClusterName(targetCluster.str());
  792. }
  793. }
  794. IConstDFUprogress* prog = wu->queryProgress();
  795. if (prog)
  796. {
  797. StringBuffer statemsg;
  798. DFUstate state = prog->getState();
  799. encodeDFUstate(state, statemsg);
  800. resultWU->setState(state);
  801. resultWU->setStateMessage(statemsg.str());
  802. resultWU->setPercentDone(prog->getPercentDone());
  803. }
  804. IArrayOf<IEspDFUWorkunit> result;
  805. result.append(*resultWU.getClear());
  806. resp.setResults(result);
  807. return true;
  808. }
  809. bool CFileSprayEx::onGetDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits &req, IEspGetDFUWorkunitsResponse &resp)
  810. {
  811. try
  812. {
  813. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false))
  814. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied.");
  815. StringBuffer wuidStr = req.getWuid();
  816. const char* wuid = wuidStr.trim().str();
  817. if (wuid && *wuid && looksLikeAWuid(wuid, 'D'))
  818. return getOneDFUWorkunit(context, wuid, resp);
  819. double version = context.getClientVersion();
  820. if (version > 1.02)
  821. {
  822. const char *type = req.getType();
  823. if (type && *type && !stricmp(type, "archived workunits"))
  824. {
  825. return GetArchivedDFUWorkunits(context, req, resp);
  826. }
  827. }
  828. StringBuffer clusterReq;
  829. const char *clusterName = req.getCluster();
  830. if(clusterName && *clusterName)
  831. {
  832. clusterReq.append(clusterName);
  833. }
  834. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  835. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  836. Owned<IPropertyTree> root = &constEnv->getPTree();
  837. if (!root)
  838. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  839. StringArray targetClusters, clusterProcesses;
  840. Owned<IPropertyTreeIterator> clusters= root->getElements("Software/Topology/Cluster");
  841. if (clusters->first())
  842. {
  843. do {
  844. IPropertyTree &cluster = clusters->query();
  845. const char* name = cluster.queryProp("@name");
  846. if (!name || !*name)
  847. continue;
  848. Owned<IPropertyTreeIterator> thorClusters= cluster.getElements(eqThorCluster);
  849. Owned<IPropertyTreeIterator> roxieClusters= cluster.getElements(eqRoxieCluster);
  850. if (thorClusters->first() || roxieClusters->first())
  851. {
  852. bool bFound = false;
  853. if (thorClusters->first())
  854. {
  855. IPropertyTree &thorCluster = thorClusters->query();
  856. const char* process = thorCluster.queryProp("@process");
  857. if (process && *process)
  858. {
  859. targetClusters.append(name);
  860. clusterProcesses.append(process);
  861. if (clusterName && !stricmp(clusterName, name))
  862. {
  863. clusterReq.clear().append(process);
  864. }
  865. }
  866. }
  867. if (!bFound && roxieClusters->first())
  868. {
  869. IPropertyTree &roxieCluster = roxieClusters->query();
  870. const char* process = roxieCluster.queryProp("@process");
  871. if (process && *process)
  872. {
  873. targetClusters.append(name);
  874. clusterProcesses.append(process);
  875. if (clusterName && !stricmp(clusterName, name))
  876. {
  877. clusterReq.clear().append(process);
  878. }
  879. }
  880. }
  881. }
  882. } while (clusters->next());
  883. }
  884. __int64 pagesize = req.getPageSize();
  885. __int64 pagefrom = req.getPageStartFrom();
  886. __int64 displayFrom = 0;
  887. if (pagesize < 1)
  888. {
  889. pagesize = 100;
  890. }
  891. if (pagefrom > 0)
  892. {
  893. displayFrom = pagefrom;
  894. }
  895. DFUsortfield sortorder[2] = {DFUsf_wuid, DFUsf_term};
  896. sortorder[0] = (DFUsortfield) (DFUsf_wuid + DFUsf_reverse);
  897. if(req.getSortby() && *req.getSortby())
  898. {
  899. const char *sortby = req.getSortby();
  900. if (!stricmp(sortby, "Owner"))
  901. sortorder[0] = DFUsf_user;
  902. else if (!stricmp(sortby, "JobName"))
  903. sortorder[0] = DFUsf_job;
  904. else if (!stricmp(sortby, "Cluster"))
  905. sortorder[0] = DFUsf_cluster;
  906. else if (!stricmp(sortby, "State"))
  907. sortorder[0] = DFUsf_state;
  908. else if (!stricmp(sortby, "Type"))
  909. sortorder[0] = DFUsf_command;
  910. else if (!stricmp(sortby, "Protected"))
  911. sortorder[0] = DFUsf_protected;
  912. else if (!stricmp(sortby, "PCTDone"))
  913. sortorder[0] = (DFUsortfield) (DFUsf_pcdone | DFUsf_numeric);
  914. else
  915. sortorder[0] = DFUsf_wuid;
  916. bool descending = req.getDescending();
  917. if (descending)
  918. sortorder[0] = (DFUsortfield) (sortorder[0] | DFUsf_reverse);
  919. }
  920. DFUsortfield filters[10];
  921. unsigned short filterCount = 0;
  922. MemoryBuffer filterbuf;
  923. if(req.getStateReq() && *req.getStateReq())
  924. {
  925. filters[filterCount] = DFUsf_state;
  926. filterCount++;
  927. if (stricmp(req.getStateReq(), "unknown") != 0)
  928. filterbuf.append(req.getStateReq());
  929. else
  930. filterbuf.append("");
  931. }
  932. if(wuid && *wuid)
  933. {
  934. filters[filterCount] = DFUsf_wildwuid;
  935. filterCount++;
  936. filterbuf.append(wuid);
  937. }
  938. if(clusterName && *clusterName)
  939. {
  940. filters[filterCount] = DFUsf_cluster;
  941. filterCount++;
  942. filterbuf.append(clusterReq.str());
  943. }
  944. if(req.getOwner() && *req.getOwner())
  945. {
  946. filters[filterCount] = DFUsortfield (DFUsf_user | DFUsf_nocase);
  947. filterCount++;
  948. filterbuf.append(req.getOwner());
  949. }
  950. if(req.getJobname() && *req.getJobname())
  951. {
  952. filters[filterCount] = DFUsortfield (DFUsf_job | DFUsf_nocase);
  953. filterCount++;
  954. filterbuf.append(req.getJobname());
  955. }
  956. filters[filterCount] = DFUsf_term;
  957. __int64 cacheHint = req.getCacheHint();
  958. if (cacheHint < 0) //Not set yet
  959. cacheHint = 0;
  960. IArrayOf<IEspDFUWorkunit> result;
  961. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  962. unsigned numWUs;
  963. PROGLOG("GetDFUWorkunits: getWorkUnitsSorted");
  964. Owned<IConstDFUWorkUnitIterator> itr = factory->getWorkUnitsSorted(sortorder, filters, filterbuf.bufferBase(), (int) displayFrom, (int) pagesize+1, req.getOwner(), &cacheHint, &numWUs);
  965. if (version >= 1.07)
  966. resp.setCacheHint(cacheHint);
  967. PROGLOG("GetDFUWorkunits: getWorkUnitsSorted done");
  968. //unsigned actualCount = 0;
  969. itr->first();
  970. while(itr->isValid())
  971. {
  972. Owned<IConstDFUWorkUnit> wu = itr->get();
  973. //actualCount++;
  974. Owned<IEspDFUWorkunit> resultWU = createDFUWorkunit("", "");
  975. resultWU->setID(wu->queryId());
  976. StringBuffer jobname, user, cluster;
  977. resultWU->setJobName(wu->getJobName(jobname).str());
  978. resultWU->setCommand(wu->getCommand());
  979. resultWU->setUser(wu->getUser(user).str());
  980. const char* clusterName = wu->getClusterName(cluster).str();
  981. if (clusterName)
  982. {
  983. StringBuffer clusterForDisplay(clusterName);
  984. if (clusterProcesses.ordinality())
  985. {
  986. for (unsigned i = 0; i < clusterProcesses.length(); i++)
  987. {
  988. const char* clusterProcessName = clusterProcesses.item(i);
  989. if (!stricmp(clusterProcessName, clusterName))
  990. {
  991. clusterForDisplay.clear().append(targetClusters.item(i));
  992. break;
  993. }
  994. }
  995. }
  996. resultWU->setClusterName(clusterForDisplay.str());
  997. }
  998. resultWU->setIsProtected(wu->isProtected());
  999. IConstDFUprogress *prog = wu->queryProgress();
  1000. if (prog != NULL)
  1001. {
  1002. DFUstate state = prog->getState();
  1003. resultWU->setState(state);
  1004. StringBuffer statemsg;
  1005. encodeDFUstate(state,statemsg);
  1006. resultWU->setStateMessage(statemsg.str());
  1007. resultWU->setPercentDone(prog->getPercentDone());
  1008. }
  1009. result.append(*resultWU.getLink());
  1010. itr->next();
  1011. }
  1012. if (result.length() > pagesize)
  1013. result.pop();
  1014. resp.setPageSize(pagesize);
  1015. resp.setNumWUs(numWUs);
  1016. resp.setPageStartFrom(displayFrom + 1);
  1017. if(displayFrom + pagesize < numWUs)
  1018. {
  1019. resp.setNextPage(displayFrom + pagesize);
  1020. resp.setPageEndAt(pagefrom + pagesize);
  1021. __int64 last = displayFrom + pagesize;
  1022. while (last + pagesize < numWUs)
  1023. {
  1024. last += pagesize;
  1025. }
  1026. resp.setLastPage(last);
  1027. }
  1028. else
  1029. {
  1030. resp.setNextPage(-1);
  1031. resp.setPageEndAt(numWUs);
  1032. }
  1033. if(displayFrom > 0)
  1034. {
  1035. resp.setFirst(false);
  1036. if (displayFrom - pagesize > 0)
  1037. resp.setPrevPage(displayFrom - pagesize);
  1038. else
  1039. resp.setPrevPage(0);
  1040. }
  1041. StringBuffer basicQuery;
  1042. if (req.getStateReq() && *req.getStateReq())
  1043. {
  1044. resp.setStateReq(req.getStateReq());
  1045. addToQueryString(basicQuery, "StateReq", req.getStateReq());
  1046. }
  1047. if (req.getCluster() && *req.getCluster())
  1048. {
  1049. resp.setCluster(req.getCluster());
  1050. addToQueryString(basicQuery, "Cluster", req.getCluster());
  1051. }
  1052. if (req.getOwner() && *req.getOwner())
  1053. {
  1054. resp.setOwner(req.getOwner());
  1055. addToQueryString(basicQuery, "Owner", req.getOwner());
  1056. }
  1057. resp.setFilters(basicQuery.str());
  1058. if (req.getSortby() && *req.getSortby())
  1059. {
  1060. resp.setSortby(req.getSortby());
  1061. if (req.getDescending())
  1062. resp.setDescending(req.getDescending());
  1063. StringBuffer strbuf = req.getSortby();
  1064. strbuf.append("=");
  1065. String str1(strbuf.str());
  1066. String str(basicQuery.str());
  1067. if (str.indexOf(str1) < 0)
  1068. {
  1069. addToQueryString(basicQuery, "Sortby", req.getSortby());
  1070. if (req.getDescending())
  1071. addToQueryString(basicQuery, "Descending", "1");
  1072. }
  1073. }
  1074. resp.setBasicQuery(basicQuery.str());
  1075. resp.setResults(result);
  1076. }
  1077. catch(IException* e)
  1078. {
  1079. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1080. }
  1081. return true;
  1082. }
  1083. void CFileSprayEx::addToQueryString(StringBuffer &queryString, const char *name, const char *value)
  1084. {
  1085. if (queryString.length() > 0)
  1086. {
  1087. queryString.append("&amp;");
  1088. }
  1089. queryString.append(name);
  1090. queryString.append("=");
  1091. queryString.append(value);
  1092. }
  1093. void CFileSprayEx::getInfoFromSasha(IEspContext &context, const char *sashaServer, const char* wuid, IEspDFUWorkunit *info)
  1094. {
  1095. Owned<ISashaCommand> cmd = createSashaCommand();
  1096. cmd->addId(wuid);
  1097. cmd->setAction(SCA_GET);
  1098. cmd->setArchived(true);
  1099. cmd->setDFU(true);
  1100. SocketEndpoint ep(sashaServer, DEFAULT_SASHA_PORT);
  1101. Owned<INode> node = createINode(ep);
  1102. if (!cmd->send(node,1*60*1000))
  1103. {
  1104. DBGLOG("Cannot connect to Sasha server at %s",sashaServer);
  1105. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.",sashaServer);
  1106. }
  1107. if (cmd->numIds()==0)
  1108. {
  1109. DBGLOG("Could not read archived %s",wuid);
  1110. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot read workunit %s.",wuid);
  1111. }
  1112. unsigned num = cmd->numResults();
  1113. if (num < 1)
  1114. return;
  1115. StringBuffer res;
  1116. cmd->getResult(0,res);
  1117. if(res.length() < 1)
  1118. return;
  1119. Owned<IPropertyTree> wu = createPTreeFromXMLString(res.str());
  1120. if (!wu)
  1121. return;
  1122. const char * command = wu->queryProp("@command");
  1123. const char * submitID = wu->queryProp("@submitID");
  1124. const char * cluster = wu->queryProp("@clusterName");
  1125. const char * queue = wu->queryProp("@queue");
  1126. const char * jobName = wu->queryProp("@jobName");
  1127. const char * protectedWU = wu->queryProp("@protected");
  1128. info->setID(wuid);
  1129. info->setArchived(true);
  1130. if (command && *command)
  1131. info->setCommandMessage(command);
  1132. if (cluster && *cluster)
  1133. info->setClusterName(cluster);
  1134. if (submitID && *submitID)
  1135. info->setUser(submitID);
  1136. if (queue && *queue)
  1137. info->setQueue(queue);
  1138. if (jobName && *jobName)
  1139. info->setJobName(jobName);
  1140. if (protectedWU && stricmp(protectedWU, "0"))
  1141. info->setIsProtected(true);
  1142. else
  1143. info->setIsProtected(false);
  1144. IPropertyTree *source = wu->queryPropTree("Source");
  1145. if(source)
  1146. {
  1147. const char * directory = source->queryProp("@directory");
  1148. const char * name = source->queryProp("@name");
  1149. if (directory && *directory)
  1150. info->setSourceDirectory(directory);
  1151. if (name && *name)
  1152. info->setSourceLogicalName(name);
  1153. }
  1154. IPropertyTree *dest = wu->queryPropTree("Destination");
  1155. if(dest)
  1156. {
  1157. const char * directory = dest->queryProp("@directory");
  1158. int numParts = dest->getPropInt("@numparts", -1);
  1159. if (directory && *directory)
  1160. info->setDestDirectory(directory);
  1161. if (numParts > 0)
  1162. info->setDestNumParts(numParts);
  1163. }
  1164. IPropertyTree *progress = wu->queryPropTree("Progress");
  1165. if(progress)
  1166. {
  1167. const char * state = progress->queryProp("@state");
  1168. const char * timeStarted = progress->queryProp("@timestarted");
  1169. const char * timeStopped = progress->queryProp("@timestopped");
  1170. if (state && *state)
  1171. info->setStateMessage(state);
  1172. if (timeStarted && *timeStarted)
  1173. {
  1174. StringBuffer startStr = timeStarted;
  1175. startStr.replace('T', ' ');
  1176. info->setTimeStarted(startStr.str());
  1177. }
  1178. if (timeStopped && *timeStopped)
  1179. {
  1180. StringBuffer stopStr = timeStopped;
  1181. stopStr.replace('T', ' ');
  1182. info->setTimeStopped(stopStr.str());
  1183. }
  1184. }
  1185. return;
  1186. }
  1187. bool CFileSprayEx::getArchivedWUInfo(IEspContext &context, IEspGetDFUWorkunit &req, IEspGetDFUWorkunitResponse &resp)
  1188. {
  1189. const char *wuid = req.getWuid();
  1190. if (wuid && *wuid)
  1191. {
  1192. StringBuffer sashaAddress;
  1193. IArrayOf<IConstTpSashaServer> sashaservers;
  1194. CTpWrapper dummy;
  1195. dummy.getTpSashaServers(sashaservers);
  1196. ForEachItemIn(i, sashaservers)
  1197. {
  1198. IConstTpSashaServer& sashaserver = sashaservers.item(i);
  1199. IArrayOf<IConstTpMachine> &sashaservermachine = sashaserver.getTpMachines();
  1200. sashaAddress.append(sashaservermachine.item(0).getNetaddress());
  1201. }
  1202. if (sashaAddress.length() < 1)
  1203. {
  1204. throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found.");
  1205. }
  1206. getInfoFromSasha(context, sashaAddress.str(), wuid, &resp.updateResult());
  1207. return true;
  1208. }
  1209. return false;
  1210. }
  1211. bool CFileSprayEx::onGetDFUWorkunit(IEspContext &context, IEspGetDFUWorkunit &req, IEspGetDFUWorkunitResponse &resp)
  1212. {
  1213. try
  1214. {
  1215. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false))
  1216. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied.");
  1217. const char* wuid = req.getWuid();
  1218. if (!wuid || !*wuid)
  1219. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Dfu workunit ID not specified.");
  1220. bool found = false;
  1221. double version = context.getClientVersion();
  1222. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1223. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(wuid, false);
  1224. if(wu)
  1225. {
  1226. IEspDFUWorkunit &result = resp.updateResult();
  1227. PROGLOG("GetDFUWorkunit: %s", wuid);
  1228. DeepAssign(context, wu, result);
  1229. int n = resp.getResult().getState();
  1230. if (n == DFUstate_scheduled || n == DFUstate_queued || n == DFUstate_started)
  1231. {
  1232. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS);
  1233. }
  1234. found = true;
  1235. }
  1236. else if ((version > 1.02) && getArchivedWUInfo(context, req, resp))
  1237. {
  1238. found = true;
  1239. }
  1240. if (!found)
  1241. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
  1242. }
  1243. catch(IException* e)
  1244. {
  1245. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1246. }
  1247. return true;
  1248. }
  1249. bool CFileSprayEx::onGetDFUProgress(IEspContext &context, IEspProgressRequest &req, IEspProgressResponse &resp)
  1250. {
  1251. try
  1252. {
  1253. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false))
  1254. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied.");
  1255. const char* wuid = req.getWuid();
  1256. if(!wuid || !*wuid)
  1257. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified.");
  1258. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1259. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(req.getWuid(), false);
  1260. if(!wu)
  1261. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
  1262. resp.setWuid(req.getWuid());
  1263. PROGLOG("GetDFUProgress: %s", wuid);
  1264. IConstDFUprogress *prog = wu->queryProgress();
  1265. if (prog)
  1266. {
  1267. resp.setPercentDone(prog->getPercentDone());
  1268. resp.setKbPerSec(prog->getKbPerSec());
  1269. resp.setKbPerSecAve(prog->getKbPerSecAve());
  1270. resp.setSecsLeft(prog->getSecsLeft());
  1271. StringBuffer statestr;
  1272. encodeDFUstate(prog->getState(), statestr);
  1273. resp.setState(statestr.str());
  1274. resp.setSlavesDone(prog->getSlavesDone());
  1275. StringBuffer msg;
  1276. prog->formatProgressMessage(msg);
  1277. resp.setProgressMessage(msg.str());
  1278. prog->formatSummaryMessage(msg.clear());
  1279. resp.setSummaryMessage(msg.str());
  1280. prog->getTimeTaken(msg.clear());
  1281. resp.setTimeTaken(msg.str());
  1282. }
  1283. }
  1284. catch(IException* e)
  1285. {
  1286. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1287. }
  1288. return true;
  1289. }
  1290. bool CFileSprayEx::onCreateDFUWorkunit(IEspContext &context, IEspCreateDFUWorkunit &req, IEspCreateDFUWorkunitResponse &resp)
  1291. {
  1292. try
  1293. {
  1294. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1295. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to create DFU workunit. Permission denied.");
  1296. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1297. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1298. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  1299. setUserAuth(context, wu);
  1300. wu->commit();
  1301. const char * d = wu->queryId();
  1302. IEspDFUWorkunit &result = resp.updateResult();
  1303. DeepAssign(context, wu, result);
  1304. result.setOverwrite(false);
  1305. result.setReplicate(true);
  1306. }
  1307. catch(IException* e)
  1308. {
  1309. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1310. }
  1311. return true;
  1312. }
  1313. bool CFileSprayEx::onUpdateDFUWorkunit(IEspContext &context, IEspUpdateDFUWorkunit &req, IEspUpdateDFUWorkunitResponse &resp)
  1314. {
  1315. try
  1316. {
  1317. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1318. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to update DFU workunit. Permission denied.");
  1319. IConstDFUWorkunit & reqWU = req.getWu();
  1320. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1321. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(reqWU.getID());
  1322. if(!wu)
  1323. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Dfu workunit %s not found.", reqWU.getID());
  1324. PROGLOG("UpdateDFUWorkunit: %s", reqWU.getID());
  1325. IDFUprogress *prog = wu->queryUpdateProgress();
  1326. if (prog && req.getStateOrig() != reqWU.getState())
  1327. {
  1328. if (prog->getState() != req.getStateOrig())
  1329. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot update DFU workunit %s because its state has been changed internally. Please refresh the page and try again.",reqWU.getID());
  1330. prog->setState((enum DFUstate)reqWU.getState());
  1331. }
  1332. const char* clusterOrig = req.getClusterOrig();
  1333. const char* cluster = reqWU.getClusterName();
  1334. if(cluster && (!clusterOrig || stricmp(clusterOrig, cluster)))
  1335. {
  1336. wu->setClusterName(reqWU.getClusterName());
  1337. }
  1338. const char* jobNameOrig = req.getJobNameOrig();
  1339. const char* jobName = reqWU.getJobName();
  1340. if(jobName && (!jobNameOrig || stricmp(jobNameOrig, jobName)))
  1341. {
  1342. wu->setJobName(jobName);
  1343. }
  1344. if (reqWU.getIsProtected() != req.getIsProtectedOrig())
  1345. wu->protect(reqWU.getIsProtected());
  1346. wu->commit();
  1347. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(reqWU.getID()).str());
  1348. }
  1349. catch(IException* e)
  1350. {
  1351. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1352. }
  1353. return true;
  1354. }
  1355. bool markWUFailed(IDFUWorkUnitFactory *f, const char *wuid)
  1356. {
  1357. Owned<IDFUWorkUnit> wu = f->updateWorkUnit(wuid);
  1358. if(!wu)
  1359. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Dfu workunit %s not found.", wuid);
  1360. IDFUprogress *prog = wu->queryUpdateProgress();
  1361. if(!prog)
  1362. throw MakeStringException(ECLWATCH_PROGRESS_INFO_NOT_FOUND, "progress information not found for workunit %s.", wuid);
  1363. else if(prog->getState() == DFUstate_started)
  1364. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Cannot delete workunit %s because its state is Started.", wuid);
  1365. else
  1366. {
  1367. prog->setState(DFUstate_failed);
  1368. return true;
  1369. }
  1370. return false;
  1371. }
  1372. static unsigned NumOfDFUWUActionNames = 5;
  1373. static const char *DFUWUActionNames[] = { "Delete", "Protect" , "Unprotect" , "Restore" , "SetToFailed" };
  1374. bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsActionRequest &req, IEspDFUWorkunitsActionResponse &resp)
  1375. {
  1376. try
  1377. {
  1378. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1379. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to update DFU workunit. Permission denied.");
  1380. CDFUWUActions action = req.getType();
  1381. if (action == DFUWUActions_Undefined)
  1382. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Action not defined.");
  1383. StringArray& wuids = req.getWuids();
  1384. if (!wuids.ordinality())
  1385. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit not defined.");
  1386. Owned<INode> node;
  1387. Owned<ISashaCommand> cmd;
  1388. StringBuffer sashaAddress;
  1389. if (action == CDFUWUActions_Restore)
  1390. {
  1391. IArrayOf<IConstTpSashaServer> sashaservers;
  1392. CTpWrapper dummy;
  1393. dummy.getTpSashaServers(sashaservers);
  1394. ForEachItemIn(i, sashaservers)
  1395. {
  1396. IConstTpSashaServer& sashaserver = sashaservers.item(i);
  1397. IArrayOf<IConstTpMachine> &sashaservermachine = sashaserver.getTpMachines();
  1398. sashaAddress.append(sashaservermachine.item(0).getNetaddress());
  1399. }
  1400. if (sashaAddress.length() < 1)
  1401. {
  1402. throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found.");
  1403. }
  1404. SocketEndpoint ep(sashaAddress.str(), DEFAULT_SASHA_PORT);
  1405. node.setown(createINode(ep));
  1406. cmd.setown(createSashaCommand());
  1407. cmd->setAction(SCA_RESTORE);
  1408. cmd->setDFU(true);
  1409. }
  1410. IArrayOf<IEspDFUActionResult> results;
  1411. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1412. for(unsigned i = 0; i < wuids.ordinality(); ++i)
  1413. {
  1414. const char* wuid = wuids.item(i);
  1415. const char* actionStr = (action < NumOfDFUWUActionNames) ? DFUWUActionNames[action] : "Unknown";
  1416. Owned<IEspDFUActionResult> res = createDFUActionResult("", "");
  1417. res->setID(wuid);
  1418. res->setAction(actionStr);
  1419. try
  1420. {
  1421. PROGLOG("%s %s", actionStr, wuid);
  1422. switch (action)
  1423. {
  1424. case CDFUWUActions_Delete:
  1425. if (!markWUFailed(factory, wuid))
  1426. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Failed to mark workunit failed.");
  1427. if (!factory->deleteWorkUnit(wuid))
  1428. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Failed in deleting workunit.");
  1429. res->setResult("Success");
  1430. break;
  1431. case CDFUWUActions_Restore:
  1432. cmd->addId(wuid);
  1433. if (!cmd->send(node,1*60*1000))
  1434. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.",sashaAddress.str());
  1435. {
  1436. StringBuffer reply = "Restore ID: ";
  1437. if (!cmd->getId(0, reply))
  1438. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Failed to get ID.");
  1439. res->setResult(reply.str());
  1440. }
  1441. break;
  1442. case CDFUWUActions_Protect:
  1443. case CDFUWUActions_Unprotect:
  1444. case CDFUWUActions_SetToFailed:
  1445. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(wuid);
  1446. if(!wu.get())
  1447. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Failed in calling updateWorkUnit().");
  1448. switch (action)
  1449. {
  1450. case CDFUWUActions_Protect:
  1451. wu->protect(true);
  1452. break;
  1453. case CDFUWUActions_Unprotect:
  1454. wu->protect(false);
  1455. break;
  1456. case CDFUWUActions_SetToFailed:
  1457. IDFUprogress *prog = wu->queryUpdateProgress();
  1458. if (!prog)
  1459. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Failed in calling queryUpdateProgress().");
  1460. prog->setState(DFUstate_failed);
  1461. break;
  1462. }
  1463. wu->commit();
  1464. res->setResult("Success");
  1465. break;
  1466. }
  1467. PROGLOG("%s %s done", actionStr, wuid);
  1468. }
  1469. catch (IException *e)
  1470. {
  1471. StringBuffer eMsg, failedMsg("Failed: ");
  1472. res->setResult(failedMsg.append(e->errorMessage(eMsg)).str());
  1473. e->Release();
  1474. }
  1475. results.append(*res.getLink());
  1476. }
  1477. resp.setDFUActionResults(results);
  1478. }
  1479. catch(IException* e)
  1480. {
  1481. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1482. }
  1483. return true;
  1484. }
  1485. bool CFileSprayEx::onDeleteDFUWorkunits(IEspContext &context, IEspDeleteDFUWorkunits &req, IEspDeleteDFUWorkunitsResponse &resp)
  1486. {
  1487. try
  1488. {
  1489. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1490. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to delete DFU workunit. Permission denied.");
  1491. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1492. StringArray & wuids = req.getWuids();
  1493. for(unsigned i = 0; i < wuids.ordinality(); ++i)
  1494. {
  1495. const char* wuid = wuids.item(i);
  1496. if (markWUFailed(factory, wuid))
  1497. {
  1498. factory->deleteWorkUnit(wuid);
  1499. PROGLOG("DeleteDFUWorkunits: %s deleted", wuid);
  1500. }
  1501. }
  1502. resp.setRedirectUrl("/FileSpray/GetDFUWorkunits");
  1503. }
  1504. catch(IException* e)
  1505. {
  1506. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1507. }
  1508. return true;
  1509. }
  1510. bool CFileSprayEx::onDeleteDFUWorkunit(IEspContext &context, IEspDeleteDFUWorkunit &req, IEspDeleteDFUWorkunitResponse &resp)
  1511. {
  1512. try
  1513. {
  1514. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1515. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to delete DFU workunit. Permission denied.");
  1516. const char* wuid = req.getWuid();
  1517. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1518. if (markWUFailed(factory, wuid))
  1519. {
  1520. resp.setResult(factory->deleteWorkUnit(wuid));
  1521. PROGLOG("DeleteDFUWorkunit: %s deleted", wuid);
  1522. }
  1523. else
  1524. resp.setResult(false);
  1525. resp.setRedirectUrl("/FileSpray/GetDFUWorkunits");
  1526. }
  1527. catch(IException* e)
  1528. {
  1529. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1530. }
  1531. return true;
  1532. }
  1533. bool CFileSprayEx::onSubmitDFUWorkunit(IEspContext &context, IEspSubmitDFUWorkunit &req, IEspSubmitDFUWorkunitResponse &resp)
  1534. {
  1535. try
  1536. {
  1537. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1538. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to submit DFU workunit. Permission denied.");
  1539. if (!req.getWuid() || !*req.getWuid())
  1540. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Workunit ID required");
  1541. PROGLOG("SubmitDFUWorkunit: %s", req.getWuid());
  1542. submitDFUWorkUnit(req.getWuid());
  1543. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(req.getWuid()).str());
  1544. }
  1545. catch(IException* e)
  1546. {
  1547. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1548. }
  1549. return true;
  1550. }
  1551. bool CFileSprayEx::onAbortDFUWorkunit(IEspContext &context, IEspAbortDFUWorkunit &req, IEspAbortDFUWorkunitResponse &resp)
  1552. {
  1553. try
  1554. {
  1555. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
  1556. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to abort DFU workunit. Permission denied.");
  1557. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1558. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(req.getWuid());
  1559. if(!wu)
  1560. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
  1561. PROGLOG("AbortDFUWorkunit: %s", req.getWuid());
  1562. wu->requestAbort();
  1563. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(req.getWuid()).str());
  1564. }
  1565. catch(IException* e)
  1566. {
  1567. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1568. }
  1569. return true;
  1570. }
  1571. bool CFileSprayEx::onGetDFUExceptions(IEspContext &context, IEspGetDFUExceptions &req, IEspGetDFUExceptionsResponse &resp)
  1572. {
  1573. try
  1574. {
  1575. if (!context.validateFeatureAccess(DFU_EX_URL, SecAccess_Read, false))
  1576. throw MakeStringException(ECLWATCH_DFU_EX_ACCESS_DENIED, "Failed to get DFU Exceptions. Permission denied.");
  1577. IArrayOf<IEspDFUException> result;
  1578. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1579. Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(req.getWuid());
  1580. if(!wu)
  1581. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
  1582. PROGLOG("GetDFUExceptions: %s", req.getWuid());
  1583. Owned<IExceptionIterator> itr = wu->getExceptionIterator();
  1584. itr->first();
  1585. while(itr->isValid())
  1586. {
  1587. Owned<IEspDFUException> resultE = createDFUException("", "");
  1588. IException &e = itr->query();
  1589. resultE->setCode(e.errorCode());
  1590. StringBuffer msg;
  1591. resultE->setMessage(e.errorMessage(msg).str());
  1592. result.append(*resultE.getLink());
  1593. itr->next();
  1594. }
  1595. resp.setResult(result);
  1596. }
  1597. catch(IException* e)
  1598. {
  1599. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1600. }
  1601. return true;
  1602. }
  1603. bool CFileSprayEx::onSprayFixed(IEspContext &context, IEspSprayFixed &req, IEspSprayFixedResponse &resp)
  1604. {
  1605. try
  1606. {
  1607. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false))
  1608. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Spray. Permission denied.");
  1609. StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder;
  1610. const char* destNodeGroup = req.getDestGroup();
  1611. if(destNodeGroup == NULL || *destNodeGroup == '\0')
  1612. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination node group not specified.");
  1613. MemoryBuffer& srcxml = (MemoryBuffer&)req.getSrcxml();
  1614. const char* srcip = req.getSourceIP();
  1615. const char* srcfile = req.getSourcePath();
  1616. if(srcxml.length() == 0)
  1617. {
  1618. if(!srcip || !*srcip)
  1619. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source network IP not specified.");
  1620. if(!srcfile || !*srcfile)
  1621. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source file not specified.");
  1622. }
  1623. const char* destname = req.getDestLogicalName();
  1624. if(!destname || !*destname)
  1625. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified.");
  1626. CDfsLogicalFileName lfn;
  1627. if (!lfn.setValidate(destname))
  1628. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid destination filename");
  1629. destname = lfn.get();
  1630. PROGLOG("SprayFixed: DestLogicalName %s, DestGroup %s", destname, destNodeGroup);
  1631. StringBuffer gName, ipAddr;
  1632. const char *pTr = strchr(destNodeGroup, ' ');
  1633. if (pTr)
  1634. {
  1635. gName.append(pTr - destNodeGroup, destNodeGroup);
  1636. ipAddr.append(pTr+1);
  1637. }
  1638. else
  1639. gName.append(destNodeGroup);
  1640. if (ipAddr.length() > 0)
  1641. ParseLogicalPath(destname, ipAddr.str(), NULL, destFolder, destTitle, defaultFolder, defaultReplicateFolder);
  1642. else
  1643. ParseLogicalPath(destname, destNodeGroup, NULL, destFolder, destTitle, defaultFolder, defaultReplicateFolder);
  1644. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1645. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1646. wu->setClusterName(gName.str());
  1647. wu->setJobName(destTitle.str());
  1648. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  1649. setUserAuth(context, wu);
  1650. wu->setCommand(DFUcmd_import);
  1651. IDFUfileSpec *source = wu->queryUpdateSource();
  1652. if(srcxml.length() == 0)
  1653. {
  1654. RemoteMultiFilename rmfn;
  1655. SocketEndpoint ep(srcip);
  1656. rmfn.setEp(ep);
  1657. StringBuffer fnamebuf(srcfile);
  1658. fnamebuf.trim();
  1659. rmfn.append(fnamebuf.str()); // handles comma separated files
  1660. source->setMultiFilename(rmfn);
  1661. }
  1662. else
  1663. {
  1664. srcxml.append('\0');
  1665. source->setFromXML((const char*)srcxml.toByteArray());
  1666. }
  1667. IDFUfileSpec *destination = wu->queryUpdateDestination();
  1668. bool nosplit = req.getNosplit();
  1669. int recordsize = req.getSourceRecordSize();
  1670. const char* format = req.getSourceFormat();
  1671. if ((recordsize == RECFMVB_RECSIZE_ESCAPE) || (format && strieq(format, "recfmvb")))
  1672. {//recordsize may be set by dfuplus; format may be set by EclWatch
  1673. source->setFormat(DFUff_recfmvb);
  1674. destination->setFormat(DFUff_variable);
  1675. }
  1676. else if ((recordsize == RECFMV_RECSIZE_ESCAPE) || (format && strieq(format, "recfmv")))
  1677. {
  1678. source->setFormat(DFUff_recfmv);
  1679. destination->setFormat(DFUff_variable);
  1680. }
  1681. else if ((recordsize == PREFIX_VARIABLE_RECSIZE_ESCAPE) || (format && strieq(format, "variable")))
  1682. {
  1683. source->setFormat(DFUff_variable);
  1684. destination->setFormat(DFUff_variable);
  1685. }
  1686. else if((recordsize == PREFIX_VARIABLE_BIGENDIAN_RECSIZE_ESCAPE) || (format && strieq(format, "variablebigendian")))
  1687. {
  1688. source->setFormat(DFUff_variablebigendian);
  1689. destination->setFormat(DFUff_variable);
  1690. }
  1691. else if(recordsize == 0 && !nosplit) // -ve record sizes for blocked
  1692. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid record size");
  1693. else
  1694. source->setRecordSize(recordsize);
  1695. destination->setLogicalName(destname);
  1696. destination->setDirectory(destFolder.str());
  1697. StringBuffer fileMask;
  1698. constructFileMask(destTitle.str(), fileMask);
  1699. destination->setFileMask(fileMask.str());
  1700. destination->setGroupName(gName.str());
  1701. const char * encryptkey = req.getEncrypt();
  1702. if(req.getCompress()||(encryptkey&&*encryptkey))
  1703. destination->setCompressed(true);
  1704. ClusterPartDiskMapSpec mspec;
  1705. destination->getClusterPartDiskMapSpec(gName.str(), mspec);
  1706. mspec.setDefaultBaseDir(defaultFolder.str());
  1707. mspec.setDefaultReplicateDir(defaultReplicateFolder.str());
  1708. if (!req.getReplicate())
  1709. mspec.defaultCopies = DFD_NoCopies;
  1710. destination->setClusterPartDiskMapSpec(gName.str(), mspec);
  1711. int repo = req.getReplicateOffset();
  1712. bool isNull = req.getReplicateOffset_isNull();
  1713. if (!isNull && (repo!=1))
  1714. destination->setReplicateOffset(repo);
  1715. if (req.getWrap())
  1716. destination->setWrap(true);
  1717. IDFUoptions *options = wu->queryUpdateOptions();
  1718. const char * decryptkey = req.getDecrypt();
  1719. if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey))
  1720. options->setEncDec(encryptkey,decryptkey);
  1721. options->setReplicate(req.getReplicate());
  1722. options->setOverwrite(req.getOverwrite()); // needed if target already exists
  1723. const char* prefix = req.getPrefix();
  1724. if(prefix && *prefix)
  1725. options->setLengthPrefix(prefix);
  1726. if(req.getNosplit())
  1727. options->setNoSplit(true);
  1728. if(req.getNorecover())
  1729. options->setNoRecover(true);
  1730. if(req.getMaxConnections() > 0)
  1731. options->setmaxConnections(req.getMaxConnections());
  1732. if(req.getThrottle() > 0)
  1733. options->setThrottle(req.getThrottle());
  1734. if(req.getTransferBufferSize() > 0)
  1735. options->setTransferBufferSize(req.getTransferBufferSize());
  1736. if (req.getPull())
  1737. options->setPull(true);
  1738. if (req.getPush())
  1739. options->setPush(true);
  1740. if (req.getFailIfNoSourceFile())
  1741. options->setFailIfNoSourceFile(true);
  1742. if (req.getRecordStructurePresent())
  1743. options->setRecordStructurePresent(true);
  1744. resp.setWuid(wu->queryId());
  1745. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  1746. submitDFUWorkUnit(wu.getClear());
  1747. }
  1748. catch(IException* e)
  1749. {
  1750. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1751. }
  1752. return true;
  1753. }
  1754. bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req, IEspSprayResponse &resp)
  1755. {
  1756. try
  1757. {
  1758. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false))
  1759. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Spray. Permission denied.");
  1760. StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder;
  1761. const char* destNodeGroup = req.getDestGroup();
  1762. if(destNodeGroup == NULL || *destNodeGroup == '\0')
  1763. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination node group not specified.");
  1764. StringBuffer gName, ipAddr;
  1765. const char *pTr = strchr(destNodeGroup, ' ');
  1766. if (pTr)
  1767. {
  1768. gName.append(pTr - destNodeGroup, destNodeGroup);
  1769. ipAddr.append(pTr+1);
  1770. }
  1771. else
  1772. gName.append(destNodeGroup);
  1773. MemoryBuffer& srcxml = (MemoryBuffer&)req.getSrcxml();
  1774. const char* srcip = req.getSourceIP();
  1775. const char* srcfile = req.getSourcePath();
  1776. if(srcxml.length() == 0)
  1777. {
  1778. if(!srcip || !*srcip)
  1779. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source network IP not specified.");
  1780. if(!srcfile || !*srcfile)
  1781. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source file not specified.");
  1782. }
  1783. const char* destname = req.getDestLogicalName();
  1784. if(!destname || !*destname)
  1785. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified.");
  1786. CDfsLogicalFileName lfn;
  1787. if (!lfn.setValidate(destname))
  1788. throw MakeStringException(ECLWATCH_INVALID_INPUT, "invalid destination filename");
  1789. destname = lfn.get();
  1790. PROGLOG("SprayVariable: DestLogicalName %s, DestGroup %s", destname, destNodeGroup);
  1791. if (ipAddr.length() > 0)
  1792. ParseLogicalPath(destname, ipAddr.str(), NULL, destFolder, destTitle, defaultFolder, defaultReplicateFolder);
  1793. else
  1794. ParseLogicalPath(destname, destNodeGroup, NULL, destFolder, destTitle, defaultFolder, defaultReplicateFolder);
  1795. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1796. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1797. wu->setClusterName(gName.str());
  1798. wu->setJobName(destTitle.str());
  1799. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  1800. setUserAuth(context, wu);
  1801. wu->setCommand(DFUcmd_import);
  1802. IDFUfileSpec *source = wu->queryUpdateSource();
  1803. IDFUfileSpec *destination = wu->queryUpdateDestination();
  1804. IDFUoptions *options = wu->queryUpdateOptions();
  1805. if(srcxml.length() == 0)
  1806. {
  1807. RemoteMultiFilename rmfn;
  1808. SocketEndpoint ep(srcip);
  1809. rmfn.setEp(ep);
  1810. StringBuffer fnamebuf(srcfile);
  1811. fnamebuf.trim();
  1812. rmfn.append(fnamebuf.str()); // handles comma separated files
  1813. source->setMultiFilename(rmfn);
  1814. }
  1815. else
  1816. {
  1817. srcxml.append('\0');
  1818. source->setFromXML((const char*)srcxml.toByteArray());
  1819. }
  1820. source->setMaxRecordSize(req.getSourceMaxRecordSize());
  1821. source->setFormat((DFUfileformat)req.getSourceFormat());
  1822. StringBuffer rowtag;
  1823. if (req.getIsJSON())
  1824. {
  1825. const char *srcRowPath = req.getSourceRowPath();
  1826. if (!srcRowPath || *srcRowPath != '/')
  1827. rowtag.append("/");
  1828. rowtag.append(srcRowPath);
  1829. }
  1830. else
  1831. rowtag.append(req.getSourceRowTag());
  1832. // if rowTag specified, it means it's xml or json format, otherwise it's csv
  1833. if(rowtag.length())
  1834. {
  1835. source->setRowTag(rowtag);
  1836. options->setKeepHeader(true);
  1837. }
  1838. else
  1839. {
  1840. const char* cs = req.getSourceCsvSeparate();
  1841. if (req.getNoSourceCsvSeparator())
  1842. {
  1843. cs = "";
  1844. }
  1845. else if(cs == NULL || *cs == '\0')
  1846. cs = "\\,";
  1847. const char* ct = req.getSourceCsvTerminate();
  1848. if(ct == NULL || *ct == '\0')
  1849. ct = "\\n,\\r\\n";
  1850. const char* cq = req.getSourceCsvQuote();
  1851. if(cq== NULL)
  1852. cq = "\"";
  1853. source->setCsvOptions(cs, ct, cq, req.getSourceCsvEscape(), req.getQuotedTerminator());
  1854. options->setQuotedTerminator(req.getQuotedTerminator());
  1855. }
  1856. destination->setLogicalName(destname);
  1857. destination->setDirectory(destFolder.str());
  1858. StringBuffer fileMask;
  1859. constructFileMask(destTitle.str(), fileMask);
  1860. destination->setFileMask(fileMask.str());
  1861. destination->setGroupName(gName.str());
  1862. ClusterPartDiskMapSpec mspec;
  1863. destination->getClusterPartDiskMapSpec(gName.str(), mspec);
  1864. mspec.setDefaultBaseDir(defaultFolder.str());
  1865. mspec.setDefaultReplicateDir(defaultReplicateFolder.str());
  1866. if (!req.getReplicate())
  1867. mspec.defaultCopies = DFD_NoCopies;
  1868. destination->setClusterPartDiskMapSpec(gName.str(), mspec);
  1869. const char * encryptkey = req.getEncrypt();
  1870. if(req.getCompress()||(encryptkey&&*encryptkey))
  1871. destination->setCompressed(true);
  1872. const char * decryptkey = req.getDecrypt();
  1873. if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey))
  1874. options->setEncDec(encryptkey,decryptkey);
  1875. int repo = req.getReplicateOffset();
  1876. bool isNull = req.getReplicateOffset_isNull();
  1877. if (!isNull && (repo!=1))
  1878. destination->setReplicateOffset(repo);
  1879. options->setReplicate(req.getReplicate());
  1880. options->setOverwrite(req.getOverwrite()); // needed if target already exists
  1881. const char* prefix = req.getPrefix();
  1882. if(prefix && *prefix)
  1883. options->setLengthPrefix(prefix);
  1884. if(req.getNosplit())
  1885. options->setNoSplit(true);
  1886. if(req.getNorecover())
  1887. options->setNoRecover(true);
  1888. if(req.getMaxConnections() > 0)
  1889. options->setmaxConnections(req.getMaxConnections());
  1890. if(req.getThrottle() > 0)
  1891. options->setThrottle(req.getThrottle());
  1892. if(req.getTransferBufferSize() > 0)
  1893. options->setTransferBufferSize(req.getTransferBufferSize());
  1894. if (req.getPull())
  1895. options->setPull(true);
  1896. if (req.getPush())
  1897. options->setPush(true);
  1898. if (req.getFailIfNoSourceFile())
  1899. options->setFailIfNoSourceFile(true);
  1900. if (req.getRecordStructurePresent())
  1901. options->setRecordStructurePresent(true);
  1902. resp.setWuid(wu->queryId());
  1903. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  1904. submitDFUWorkUnit(wu.getClear());
  1905. }
  1906. catch(IException* e)
  1907. {
  1908. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1909. }
  1910. return true;
  1911. }
  1912. bool CFileSprayEx::onReplicate(IEspContext &context, IEspReplicate &req, IEspReplicateResponse &resp)
  1913. {
  1914. try
  1915. {
  1916. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false))
  1917. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Replicate. Permission denied.");
  1918. const char* srcname = req.getSourceLogicalName();
  1919. if(!srcname || !*srcname)
  1920. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified.");
  1921. PROGLOG("Replicate %s", srcname);
  1922. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  1923. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  1924. StringBuffer jobname = "Replicate: ";
  1925. jobname.append(srcname);
  1926. wu->setJobName(jobname.str());
  1927. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  1928. setUserAuth(context, wu);
  1929. wu->setCommand(DFUcmd_replicate);
  1930. IDFUfileSpec *source = wu->queryUpdateSource();
  1931. if (source)
  1932. {
  1933. source->setLogicalName(srcname);
  1934. int repo = req.getReplicateOffset();
  1935. if (repo!=1)
  1936. source->setReplicateOffset(repo);
  1937. }
  1938. const char* cluster = req.getCluster();
  1939. if(cluster && *cluster)
  1940. {
  1941. IDFUoptions *opt = wu->queryUpdateOptions();
  1942. opt->setReplicateMode(DFURMmissing,cluster,req.getRepeatLast(),req.getOnlyRepeated());
  1943. }
  1944. resp.setWuid(wu->queryId());
  1945. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  1946. submitDFUWorkUnit(wu.getClear());
  1947. }
  1948. catch(IException* e)
  1949. {
  1950. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1951. }
  1952. return true;
  1953. }
  1954. void CFileSprayEx::getDropZoneInfoByIP(double clientVersion, const char* ip, const char* destFileIn, StringBuffer& destFileOut, StringBuffer& umask)
  1955. {
  1956. if (destFileIn && *destFileIn)
  1957. destFileOut.set(destFileIn);
  1958. if (!ip || !*ip)
  1959. throw MakeStringExceptionDirect(ECLWATCH_INVALID_IP, "Network address must be specified for a dropzone!");
  1960. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  1961. Owned<IConstEnvironment> constEnv = factory->openEnvironment();
  1962. if (!constEnv)
  1963. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  1964. StringBuffer destFile;
  1965. if (isAbsolutePath(destFileIn))
  1966. {
  1967. destFile.set(destFileIn);
  1968. Owned<IConstDropZoneInfo> dropZone = constEnv->getDropZoneByAddressPath(ip, destFile.str());
  1969. if (!dropZone)
  1970. throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone not found for network address %s.", ip);
  1971. SCMStringBuffer directory, maskBuf;
  1972. dropZone->getDirectory(directory);
  1973. destFileOut.set(destFile.str());
  1974. dropZone->getUMask(maskBuf);
  1975. if (maskBuf.length())
  1976. umask.set(maskBuf.str());
  1977. return;
  1978. }
  1979. Owned<IConstDropZoneInfoIterator> dropZoneItr = constEnv->getDropZoneIteratorByAddress(ip);
  1980. if (dropZoneItr->count() < 1)
  1981. throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone not found for network address %s.", ip);
  1982. bool dzFound = false;
  1983. ForEach(*dropZoneItr)
  1984. {
  1985. IConstDropZoneInfo& dropZoneInfo = dropZoneItr->query();
  1986. SCMStringBuffer dropZoneDirectory, dropZoneUMask;
  1987. dropZoneInfo.getDirectory(dropZoneDirectory);
  1988. dropZoneInfo.getUMask(dropZoneUMask);
  1989. if (!dropZoneDirectory.length())
  1990. continue;
  1991. if (!dzFound)
  1992. {
  1993. dzFound = true;
  1994. destFileOut.set(dropZoneDirectory.str());
  1995. addPathSepChar(destFileOut);
  1996. destFileOut.append(destFileIn);
  1997. if (dropZoneUMask.length())
  1998. umask.set(dropZoneUMask.str());
  1999. }
  2000. else
  2001. throw MakeStringException(ECLWATCH_INVALID_INPUT, "> 1 dropzones found for network address %s.", ip);
  2002. }
  2003. if (!dzFound)
  2004. throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "No valid dropzone found for network address %s.", ip);
  2005. }
  2006. bool CFileSprayEx::onDespray(IEspContext &context, IEspDespray &req, IEspDesprayResponse &resp)
  2007. {
  2008. try
  2009. {
  2010. if (!context.validateFeatureAccess(FILE_DESPRAY_URL, SecAccess_Write, false))
  2011. throw MakeStringException(ECLWATCH_FILE_DESPRAY_ACCESS_DENIED, "Failed to do Despray. Permission denied.");
  2012. const char* srcname = req.getSourceLogicalName();
  2013. if(!srcname || !*srcname)
  2014. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified.");
  2015. PROGLOG("Despray %s", srcname);
  2016. double version = context.getClientVersion();
  2017. const char* destip = req.getDestIP();
  2018. StringBuffer fnamebuf(req.getDestPath());
  2019. const char* destfile = fnamebuf.trim().str();
  2020. MemoryBuffer& dstxml = (MemoryBuffer&)req.getDstxml();
  2021. if(dstxml.length() == 0)
  2022. {
  2023. if(!destip || !*destip)
  2024. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination network IP not specified.");
  2025. if(!destfile || !*destfile)
  2026. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified.");
  2027. }
  2028. StringBuffer srcTitle;
  2029. ParseLogicalPath(srcname, srcTitle);
  2030. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2031. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  2032. wu->setJobName(srcTitle.str());
  2033. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  2034. setUserAuth(context, wu);
  2035. wu->setCommand(DFUcmd_export);
  2036. IDFUfileSpec *source = wu->queryUpdateSource();
  2037. IDFUfileSpec *destination = wu->queryUpdateDestination();
  2038. IDFUoptions *options = wu->queryUpdateOptions();
  2039. source->setLogicalName(srcname);
  2040. if(dstxml.length() == 0)
  2041. {
  2042. RemoteFilename rfn;
  2043. SocketEndpoint ep(destip);
  2044. StringBuffer destfileWithPath, umask;
  2045. getDropZoneInfoByIP(version, destip, destfile, destfileWithPath, umask);
  2046. rfn.setPath(ep, destfileWithPath.str());
  2047. if (umask.length())
  2048. options->setUMask(umask.str());
  2049. destination->setSingleFilename(rfn);
  2050. }
  2051. else
  2052. {
  2053. dstxml.append('\0');
  2054. destination->setFromXML((const char*)dstxml.toByteArray());
  2055. }
  2056. destination->setTitle(srcTitle.str());
  2057. options->setKeepHeader(true);
  2058. options->setOverwrite(req.getOverwrite()); // needed if target already exists
  2059. const char* splitprefix = req.getSplitprefix();
  2060. if(splitprefix && *splitprefix)
  2061. options->setSplitPrefix(splitprefix);
  2062. if (version > 1.01)
  2063. {
  2064. if(req.getMaxConnections() > 0)
  2065. options->setmaxConnections(req.getMaxConnections());
  2066. else if(req.getSingleConnection())
  2067. options->setmaxConnections(1);
  2068. }
  2069. else
  2070. {
  2071. if(req.getMaxConnections() > 0)
  2072. options->setmaxConnections(req.getMaxConnections());
  2073. }
  2074. if(req.getThrottle() > 0)
  2075. options->setThrottle(req.getThrottle());
  2076. if(req.getTransferBufferSize() > 0)
  2077. options->setTransferBufferSize(req.getTransferBufferSize());
  2078. if(req.getNorecover())
  2079. options->setNoRecover(true);
  2080. if (req.getWrap()) {
  2081. options->setPush(); // I think needed for a despray
  2082. destination->setWrap(true);
  2083. }
  2084. if (req.getMultiCopy())
  2085. destination->setMultiCopy(true);
  2086. const char * encryptkey = req.getEncrypt();
  2087. if(req.getCompress()||(encryptkey&&*encryptkey))
  2088. destination->setCompressed(true);
  2089. const char * decryptkey = req.getDecrypt();
  2090. if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey))
  2091. options->setEncDec(encryptkey,decryptkey);
  2092. resp.setWuid(wu->queryId());
  2093. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  2094. submitDFUWorkUnit(wu.getClear());
  2095. }
  2096. catch(IException* e)
  2097. {
  2098. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2099. }
  2100. return true;
  2101. }
  2102. bool CFileSprayEx::onCopy(IEspContext &context, IEspCopy &req, IEspCopyResponse &resp)
  2103. {
  2104. try
  2105. {
  2106. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false))
  2107. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Copy. Permission denied.");
  2108. const char* srcname = req.getSourceLogicalName();
  2109. const char* dstname = req.getDestLogicalName();
  2110. if(!srcname || !*srcname)
  2111. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified.");
  2112. if(!dstname || !*dstname)
  2113. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination logical file not specified.");
  2114. PROGLOG("Copy from %s to %s", srcname, dstname);
  2115. StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder;
  2116. StringBuffer srcNodeGroup, destNodeGroup;
  2117. bool bRoxie = false;
  2118. const char* destNodeGroupReq = req.getDestGroup();
  2119. if(!destNodeGroupReq || !*destNodeGroupReq)
  2120. {
  2121. getNodeGroupFromLFN(destNodeGroup, srcname, context.queryUserId(), context.queryPassword());
  2122. DBGLOG("Destination node group not specified, using source node group %s", destNodeGroup.str());
  2123. }
  2124. else
  2125. {
  2126. destNodeGroup = destNodeGroupReq;
  2127. const char* destRoxie = req.getDestGroupRoxie();
  2128. if (destRoxie && !stricmp(destRoxie, "Yes"))
  2129. {
  2130. bRoxie = true;
  2131. }
  2132. }
  2133. CDfsLogicalFileName lfn; // NOTE: must not be moved into block below, or dstname will point to invalid memory
  2134. if (!bRoxie)
  2135. {
  2136. if (!lfn.setValidate(dstname))
  2137. throw MakeStringException(ECLWATCH_INVALID_INPUT, "invalid destination filename");
  2138. dstname = lfn.get();
  2139. }
  2140. ParseLogicalPath(dstname, destNodeGroup.str(), NULL, destFolder, destTitle, defaultFolder, defaultReplicateFolder);
  2141. StringBuffer fileMask;
  2142. constructFileMask(destTitle.str(), fileMask);
  2143. const char* srcDali = req.getSourceDali();
  2144. bool supercopy = req.getSuperCopy();
  2145. if (supercopy)
  2146. {
  2147. StringBuffer user, passwd;
  2148. context.getUserID(user);
  2149. context.getPassword(passwd);
  2150. StringBuffer u(user);
  2151. StringBuffer p(passwd);
  2152. Owned<INode> foreigndali;
  2153. if (srcDali)
  2154. {
  2155. SocketEndpoint ep(srcDali);
  2156. foreigndali.setown(createINode(ep));
  2157. const char* srcu = req.getSrcusername();
  2158. if(srcu && *srcu)
  2159. {
  2160. u.clear().append(srcu);
  2161. p.clear().append(req.getSrcpassword());
  2162. }
  2163. }
  2164. Owned<IUserDescriptor> udesc=createUserDescriptor();
  2165. udesc->set(u.str(),p.str());
  2166. if (!queryDistributedFileDirectory().isSuperFile(srcname,udesc,foreigndali))
  2167. supercopy = false;
  2168. }
  2169. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2170. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  2171. wu->setJobName(dstname);
  2172. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  2173. setUserAuth(context, wu);
  2174. if(destNodeGroup.length() > 0)
  2175. wu->setClusterName(destNodeGroup.str());
  2176. if (supercopy)
  2177. wu->setCommand(DFUcmd_supercopy);
  2178. else
  2179. wu->setCommand(DFUcmd_copy);
  2180. IDFUfileSpec *wuFSpecSource = wu->queryUpdateSource();
  2181. IDFUfileSpec *wuFSpecDest = wu->queryUpdateDestination();
  2182. IDFUoptions *wuOptions = wu->queryUpdateOptions();
  2183. wuFSpecSource->setLogicalName(srcname);
  2184. if(srcDali && *srcDali)
  2185. {
  2186. SocketEndpoint ep(srcDali);
  2187. wuFSpecSource->setForeignDali(ep);
  2188. const char* srcusername = req.getSrcusername();
  2189. if(srcusername && *srcusername)
  2190. {
  2191. const char* srcpasswd = req.getSrcpassword();
  2192. wuFSpecSource->setForeignUser(srcusername, srcpasswd);
  2193. }
  2194. }
  2195. wuFSpecDest->setLogicalName(dstname);
  2196. wuFSpecDest->setFileMask(fileMask.str());
  2197. wuOptions->setOverwrite(req.getOverwrite());
  2198. wuOptions->setPreserveCompression(req.getPreserveCompression());
  2199. if (bRoxie)
  2200. {
  2201. setRoxieClusterPartDiskMapping(destNodeGroup.str(), defaultFolder.str(), defaultReplicateFolder.str(), supercopy, wuFSpecDest, wuOptions);
  2202. wuFSpecDest->setWrap(true); // roxie always wraps
  2203. if(req.getCompress())
  2204. wuFSpecDest->setCompressed(true);
  2205. if (!supercopy)
  2206. wuOptions->setSuppressNonKeyRepeats(true); // **** only repeat last part when src kind = key
  2207. }
  2208. else
  2209. {
  2210. const char* srcDiffKeyName = req.getSourceDiffKeyName();
  2211. const char* destDiffKeyName = req.getDestDiffKeyName();
  2212. if (srcDiffKeyName&&*srcDiffKeyName)
  2213. wuFSpecSource->setDiffKey(srcDiffKeyName);
  2214. if (destDiffKeyName&&*destDiffKeyName)
  2215. wuFSpecDest->setDiffKey(destDiffKeyName);
  2216. wuFSpecDest->setDirectory(destFolder.str());
  2217. wuFSpecDest->setGroupName(destNodeGroup.str());
  2218. wuFSpecDest->setWrap(req.getWrap());
  2219. const char * encryptkey = req.getEncrypt();
  2220. if(req.getCompress()||(encryptkey&&*encryptkey))
  2221. wuFSpecDest->setCompressed(true);
  2222. wuOptions->setReplicate(req.getReplicate());
  2223. const char * decryptkey = req.getDecrypt();
  2224. if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey))
  2225. wuOptions->setEncDec(encryptkey,decryptkey);
  2226. if(req.getNorecover())
  2227. wuOptions->setNoRecover(true);
  2228. if(!req.getNosplit_isNull())
  2229. wuOptions->setNoSplit(req.getNosplit());
  2230. if(req.getMaxConnections() > 0)
  2231. wuOptions->setmaxConnections(req.getMaxConnections());
  2232. if(req.getThrottle() > 0)
  2233. wuOptions->setThrottle(req.getThrottle());
  2234. if(req.getTransferBufferSize() > 0)
  2235. wuOptions->setTransferBufferSize(req.getTransferBufferSize());
  2236. if (req.getPull())
  2237. wuOptions->setPull(true);
  2238. if (req.getPush())
  2239. wuOptions->setPush(true);
  2240. if (req.getIfnewer())
  2241. wuOptions->setIfNewer(true);
  2242. ClusterPartDiskMapSpec mspec;
  2243. wuFSpecDest->getClusterPartDiskMapSpec(destNodeGroup.str(), mspec);
  2244. mspec.setDefaultBaseDir(defaultFolder.str());
  2245. mspec.setDefaultReplicateDir(defaultReplicateFolder.str());
  2246. if (!req.getReplicate())
  2247. mspec.defaultCopies = DFD_NoCopies;
  2248. wuFSpecDest->setClusterPartDiskMapSpec(destNodeGroup.str(), mspec);
  2249. }
  2250. resp.setResult(wu->queryId());
  2251. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  2252. submitDFUWorkUnit(wu.getClear());
  2253. }
  2254. catch(IException* e)
  2255. {
  2256. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2257. }
  2258. return true;
  2259. }
  2260. bool CFileSprayEx::onRename(IEspContext &context, IEspRename &req, IEspRenameResponse &resp)
  2261. {
  2262. try
  2263. {
  2264. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false))
  2265. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Rename. Permission denied.");
  2266. const char* srcname = req.getSrcname();
  2267. const char* dstname = req.getDstname();
  2268. if(!srcname || !*srcname)
  2269. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified.");
  2270. if(!dstname || !*dstname)
  2271. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination logical file not specified.");
  2272. PROGLOG("Rename from %s to %s", srcname, dstname);
  2273. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2274. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  2275. StringBuffer destTitle;
  2276. ParseLogicalPath(req.getDstname(), destTitle);
  2277. wu->setJobName(destTitle.str());
  2278. setDFUServerQueueReq(req.getDFUServerQueue(), wu);
  2279. setUserAuth(context, wu);
  2280. wu->setCommand(DFUcmd_rename);
  2281. #if 0 // TBD - Handling for multiple clusters? the cluster should be specified by user if needed
  2282. Owned<IUserDescriptor> udesc;
  2283. if(user.length() > 0)
  2284. {
  2285. const char* passwd = context.queryPassword();
  2286. udesc.setown(createUserDescriptor());
  2287. udesc->set(user.str(), passwd);
  2288. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(srcname, udesc);
  2289. if(df)
  2290. {
  2291. StringBuffer cluster0;
  2292. df->getClusterName(0,cluster0); // TBD - Handling for multiple clusters?
  2293. if (cluster0.length()!=0)
  2294. {
  2295. wu->setClusterName(cluster0.str());
  2296. }
  2297. else
  2298. {
  2299. const char *cluster = df->queryAttributes().queryProp("@group");
  2300. if (cluster && *cluster)
  2301. {
  2302. wu->setClusterName(cluster);
  2303. }
  2304. }
  2305. }
  2306. }
  2307. #endif
  2308. IDFUfileSpec *source = wu->queryUpdateSource();
  2309. source->setLogicalName(srcname);
  2310. IDFUfileSpec *destination = wu->queryUpdateDestination();
  2311. destination->setLogicalName(dstname);
  2312. IDFUoptions *options = wu->queryUpdateOptions();
  2313. options->setOverwrite(req.getOverwrite());
  2314. resp.setWuid(wu->queryId());
  2315. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  2316. submitDFUWorkUnit(wu.getClear());
  2317. }
  2318. catch(IException* e)
  2319. {
  2320. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2321. }
  2322. return true;
  2323. }
  2324. bool CFileSprayEx::onDFUWUFile(IEspContext &context, IEspDFUWUFileRequest &req, IEspDFUWUFileResponse &resp)
  2325. {
  2326. try
  2327. {
  2328. if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false))
  2329. throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied.");
  2330. if (*req.getWuid())
  2331. {
  2332. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2333. Owned<IConstDFUWorkUnit> wu = factory->openWorkUnit(req.getWuid(), false);
  2334. if(!wu)
  2335. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
  2336. PROGLOG("DFUWUFile: %s", req.getWuid());
  2337. StringBuffer xmlbuf;
  2338. xmlbuf.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
  2339. const char* plainText = req.getPlainText();
  2340. if (plainText && (!stricmp(plainText, "yes")))
  2341. {
  2342. wu->toXML(xmlbuf);
  2343. resp.setFile(xmlbuf.str());
  2344. resp.setFile_mimetype(HTTP_TYPE_TEXT_PLAIN);
  2345. }
  2346. else
  2347. {
  2348. xmlbuf.append("<?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>");
  2349. wu->toXML(xmlbuf);
  2350. resp.setFile(xmlbuf.str());
  2351. resp.setFile_mimetype(HTTP_TYPE_APPLICATION_XML);
  2352. }
  2353. }
  2354. }
  2355. catch(IException* e)
  2356. {
  2357. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2358. }
  2359. return true;
  2360. }
  2361. bool CFileSprayEx::onFileList(IEspContext &context, IEspFileListRequest &req, IEspFileListResponse &resp)
  2362. {
  2363. try
  2364. {
  2365. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2366. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do FileList. Permission denied.");
  2367. const char* path = req.getPath();
  2368. if (!path || !*path)
  2369. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Path not specified.");
  2370. double version = context.getClientVersion();
  2371. const char* netaddr = req.getNetaddr();
  2372. if (!netaddr || !*netaddr)
  2373. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Network address not specified.");
  2374. const char* fileNameMask = req.getMask();
  2375. bool directoryOnly = req.getDirectoryOnly();
  2376. PROGLOG("FileList: Netaddr %s, Path %s", netaddr, path);
  2377. StringBuffer sPath(path);
  2378. const char* osStr = req.getOS();
  2379. if (osStr && *osStr)
  2380. {
  2381. int os = atoi(osStr);
  2382. const char pathSep = (os == OS_WINDOWS) ? '\\' : '/';
  2383. sPath.replace(pathSep=='\\'?'/':'\\', pathSep);
  2384. if (*(sPath.str() + sPath.length() -1) != pathSep)
  2385. sPath.append( pathSep );
  2386. }
  2387. if (!isEmptyString(fileNameMask))
  2388. {
  2389. const char* ext = pathExtension(sPath.str());
  2390. if (!strieq(ext, "cfg") && !strieq(ext, "log"))
  2391. throw MakeStringException(ECLWATCH_ACCESS_TO_FILE_DENIED, "Only cfg or log file allowed.");
  2392. }
  2393. RemoteFilename rfn;
  2394. SocketEndpoint ep;
  2395. #ifdef MACHINE_IP
  2396. ep.set(MACHINE_IP);
  2397. #else
  2398. ep.set(netaddr);
  2399. #endif
  2400. rfn.setPath(ep, sPath.str());
  2401. Owned<IFile> f = createIFile(rfn);
  2402. if(!f->isDirectory())
  2403. throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", path);
  2404. IArrayOf<IEspPhysicalFileStruct> files;
  2405. Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
  2406. if(di.get() != NULL)
  2407. {
  2408. ForEach(*di)
  2409. {
  2410. StringBuffer fname;
  2411. di->getName(fname);
  2412. if (fname.length() == 0 || (directoryOnly && !di->isDir()) || (!di->isDir() && !isEmptyString(fileNameMask) && !WildMatch(fname.str(), fileNameMask, true)))
  2413. continue;
  2414. Owned<IEspPhysicalFileStruct> onefile = createPhysicalFileStruct();
  2415. onefile->setName(fname.str());
  2416. onefile->setIsDir(di->isDir());
  2417. onefile->setFilesize(di->getFileSize());
  2418. CDateTime modtime;
  2419. StringBuffer timestr;
  2420. di->getModifiedTime(modtime);
  2421. unsigned y,m,d,h,min,sec,nsec;
  2422. modtime.getDate(y,m,d,true);
  2423. modtime.getTime(h,min,sec,nsec,true);
  2424. timestr.appendf("%04d-%02d-%02d %02d:%02d:%02d", y,m,d,h,min,sec);
  2425. onefile->setModifiedtime(timestr.str());
  2426. files.append(*onefile.getLink());
  2427. }
  2428. }
  2429. sPath.replace('\\', '/');//XSLT cannot handle backslashes
  2430. resp.setPath(sPath);
  2431. resp.setFiles(files);
  2432. resp.setNetaddr(netaddr);
  2433. if (osStr && *osStr)
  2434. {
  2435. int os = atoi(osStr);
  2436. resp.setOS(os);
  2437. }
  2438. if (!isEmptyString(fileNameMask))
  2439. resp.setMask(fileNameMask);
  2440. if (version >= 1.10)
  2441. {
  2442. StringBuffer acceptLanguage;
  2443. resp.setAcceptLanguage(getAcceptLanguage(context, acceptLanguage).str());
  2444. }
  2445. resp.setDirectoryOnly(directoryOnly);
  2446. }
  2447. catch(IException* e)
  2448. {
  2449. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2450. }
  2451. return true;
  2452. }
  2453. bool CFileSprayEx::checkDropZoneIPAndPath(double clientVersion, const char* dropZoneName, const char* netAddr, const char* path)
  2454. {
  2455. if (isEmptyString(netAddr) || isEmptyString(path))
  2456. throw MakeStringException(ECLWATCH_INVALID_INPUT, "NetworkAddress or Path not defined.");
  2457. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  2458. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  2459. Owned<IConstDropZoneInfoIterator> dropZoneItr = constEnv->getDropZoneIteratorByAddress(netAddr);
  2460. ForEach(*dropZoneItr)
  2461. {
  2462. SCMStringBuffer directory, name;
  2463. IConstDropZoneInfo& dropZoneInfo = dropZoneItr->query();
  2464. dropZoneInfo.getDirectory(directory);
  2465. if (directory.length() && (strnicmp(path, directory.str(), directory.length()) == 0))
  2466. {
  2467. if (isEmptyString(dropZoneName))
  2468. return true;
  2469. dropZoneInfo.getName(name);
  2470. if (strieq(name.str(), dropZoneName))
  2471. return true;
  2472. }
  2473. }
  2474. return false;
  2475. }
  2476. void CFileSprayEx::addDropZoneFile(IEspContext& context, IDirectoryIterator* di, const char* name, const char* dropZonePath,
  2477. StringBuffer& relativePath, const char pathSep, IArrayOf<IEspPhysicalFileStruct>& filesInFolder, IArrayOf<IEspPhysicalFileStruct>& files)
  2478. {
  2479. StringBuffer path = dropZonePath;
  2480. if (!relativePath.isEmpty())
  2481. path.append(pathSep).append(relativePath.str());
  2482. Owned<IEspPhysicalFileStruct> aFile = createPhysicalFileStruct();
  2483. aFile->setName(name);
  2484. aFile->setPath(path);
  2485. aFile->setIsDir(di->isDir());
  2486. CDateTime modtime;
  2487. StringBuffer timestr;
  2488. di->getModifiedTime(modtime);
  2489. unsigned y,m,d,h,min,sec,nsec;
  2490. modtime.getDate(y,m,d,true);
  2491. modtime.getTime(h,min,sec,nsec,true);
  2492. timestr.appendf("%04d-%02d-%02d %02d:%02d:%02d", y,m,d,h,min,sec);
  2493. aFile->setModifiedtime(timestr.str());
  2494. aFile->setFilesize(di->getFileSize());
  2495. if (di->isDir() && filesInFolder.ordinality())
  2496. aFile->setFiles(filesInFolder);
  2497. files.append(*aFile.getLink());
  2498. }
  2499. bool CFileSprayEx::searchDropZoneFileInFolder(IEspContext& context, IFile* f, const char* nameFilter,
  2500. bool returnAll, const char* dropZonePath, StringBuffer& relativePath, const char pathSep, IArrayOf<IEspPhysicalFileStruct>& files)
  2501. {
  2502. bool foundMatch = false;
  2503. Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
  2504. ForEach(*di)
  2505. {
  2506. StringBuffer fname;
  2507. di->getName(fname);
  2508. if (!fname.length())
  2509. continue;
  2510. StringBuffer newPath = relativePath;
  2511. if (!newPath.isEmpty())
  2512. newPath.append(pathSep);
  2513. newPath.append(fname.str());
  2514. IArrayOf<IEspPhysicalFileStruct> filesInFolder;
  2515. if (returnAll) //Every files and subfolders in this folder have to be returned
  2516. {
  2517. if (di->isDir())
  2518. searchDropZoneFileInFolder(context, &di->get(), NULL, returnAll, dropZonePath, newPath, pathSep, filesInFolder);
  2519. addDropZoneFile(context, di, fname.str(), dropZonePath, relativePath, pathSep, filesInFolder, files);
  2520. continue;
  2521. }
  2522. bool foundMatchNew = WildMatch(fname.str(), nameFilter, true);
  2523. if (di->isDir() && searchDropZoneFileInFolder(context, &di->get(), nameFilter, foundMatchNew, dropZonePath,
  2524. newPath, pathSep, filesInFolder))
  2525. {
  2526. foundMatchNew = true;
  2527. }
  2528. if (foundMatchNew)
  2529. {
  2530. addDropZoneFile(context, di, fname.str(), dropZonePath, relativePath, pathSep, filesInFolder, files);
  2531. foundMatch = true;
  2532. }
  2533. }
  2534. return foundMatch;
  2535. }
  2536. void CFileSprayEx::appendDropZoneFiles(IEspContext& context, IpAddress& ip, const char* dir, const char* nameFilter, IArrayOf<IEspPhysicalFileStruct>& files)
  2537. {
  2538. RemoteFilename rfn;
  2539. SocketEndpoint ep;
  2540. ep.ipset(ip);
  2541. rfn.setPath(ep, dir);
  2542. Owned<IFile> f = createIFile(rfn);
  2543. if(!f->isDirectory())
  2544. throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", dir);
  2545. StringBuffer relativePath;
  2546. searchDropZoneFileInFolder(context, f, nameFilter, !nameFilter || !*nameFilter, dir,
  2547. relativePath, getPathSepChar(dir), files);
  2548. }
  2549. bool CFileSprayEx::onDropZoneFileSearch(IEspContext &context, IEspDropZoneFileSearchRequest &req, IEspDropZoneFileSearchResponse &resp)
  2550. {
  2551. try
  2552. {
  2553. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Access, false))
  2554. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do FileList. Permission denied.");
  2555. const char* dropZoneName = req.getDropZoneName();
  2556. if (isEmptyString(dropZoneName))
  2557. throw MakeStringException(ECLWATCH_INVALID_INPUT, "DropZone not specified.");
  2558. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  2559. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  2560. Owned<IConstDropZoneInfo> dropZoneInfo = constEnv->getDropZone(dropZoneName);
  2561. if (!dropZoneInfo || (req.getECLWatchVisibleOnly() && !dropZoneInfo->isECLWatchVisible()))
  2562. throw MakeStringException(ECLWATCH_INVALID_INPUT, "DropZone %s not found.", dropZoneName);
  2563. SCMStringBuffer directory, computer;
  2564. dropZoneInfo->getDirectory(directory);
  2565. if (!directory.length())
  2566. throw MakeStringException(ECLWATCH_INVALID_INPUT, "DropZone Directory not found for %s.", dropZoneName);
  2567. IpAddress ipToMatch;
  2568. const char* dropZoneServerReq = req.getServer(); //IP or hostname
  2569. if (!isEmptyString(dropZoneServerReq))
  2570. {
  2571. ipToMatch.ipset(dropZoneServerReq);
  2572. if (ipToMatch.isNull())
  2573. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid server %s specified.", dropZoneServerReq);
  2574. }
  2575. IArrayOf<IEspPhysicalFileStruct> files;
  2576. Owned<IConstDropZoneServerInfoIterator> dropZoneServerItr = dropZoneInfo->getServers();
  2577. ForEach(*dropZoneServerItr)
  2578. {
  2579. StringBuffer server, networkAddress;
  2580. IConstDropZoneServerInfo& dropZoneServer = dropZoneServerItr->query();
  2581. dropZoneServer.getServer(server);
  2582. if (server.isEmpty())
  2583. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid server for dropzone %s.", dropZoneName);
  2584. IpAddress ipAddr;
  2585. ipAddr.ipset(server.str());
  2586. if (isEmptyString(dropZoneServerReq) || ipAddr.ipequals(ipToMatch))
  2587. appendDropZoneFiles(context, ipAddr, directory.str(), req.getNameFilter(), files);
  2588. }
  2589. resp.setFiles(files);
  2590. }
  2591. catch(IException* e)
  2592. {
  2593. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2594. }
  2595. return true;
  2596. }
  2597. bool CFileSprayEx::onDfuMonitor(IEspContext &context, IEspDfuMonitorRequest &req, IEspDfuMonitorResponse &resp)
  2598. {
  2599. try
  2600. {
  2601. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2602. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do DfuMonitor. Permission denied.");
  2603. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2604. Owned<IDFUWorkUnit> wu = factory->createWorkUnit();
  2605. wu->setQueue(m_MonitorQueueLabel.str());
  2606. StringBuffer user, passwd;
  2607. wu->setUser(context.getUserID(user).str());
  2608. wu->setPassword(context.getPassword(passwd).str());
  2609. wu->setCommand(DFUcmd_monitor);
  2610. IDFUmonitor *monitor = wu->queryUpdateMonitor();
  2611. IDFUfileSpec *source = wu->queryUpdateSource();
  2612. const char *eventname = req.getEventName();
  2613. const char *lname = req.getLogicalName();
  2614. if (lname&&*lname)
  2615. source->setLogicalName(lname);
  2616. else {
  2617. const char *ip = req.getIp();
  2618. const char *filename = req.getFilename();
  2619. if (filename&&*filename) {
  2620. RemoteFilename rfn;
  2621. if (ip&&*ip) {
  2622. SocketEndpoint ep;
  2623. ep.set(ip);
  2624. rfn.setPath(ep,filename);
  2625. }
  2626. else
  2627. rfn.setRemotePath(filename);
  2628. source->setSingleFilename(rfn);
  2629. }
  2630. else
  2631. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Neither logical name nor network ip/file specified for monitor.");
  2632. }
  2633. if (eventname)
  2634. monitor->setEventName(eventname);
  2635. monitor->setShotLimit(req.getShotLimit());
  2636. monitor->setSub(req.getSub());
  2637. resp.setWuid(wu->queryId());
  2638. resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
  2639. submitDFUWorkUnit(wu.getClear());
  2640. }
  2641. catch(IException* e)
  2642. {
  2643. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2644. }
  2645. return true;
  2646. }
  2647. bool CFileSprayEx::onOpenSave(IEspContext &context, IEspOpenSaveRequest &req, IEspOpenSaveResponse &resp)
  2648. {
  2649. try
  2650. {
  2651. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2652. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied.");
  2653. const char* location = req.getLocation();
  2654. const char* path = req.getPath();
  2655. const char* name = req.getName();
  2656. const char* type = req.getType();
  2657. const char* dateTime = req.getDateTime();
  2658. if (location && *location)
  2659. resp.setLocation(location);
  2660. if (path && *path)
  2661. resp.setPath(path);
  2662. if (name && *name)
  2663. resp.setName(name);
  2664. if (type && *type)
  2665. resp.setType(type);
  2666. if (dateTime && *dateTime)
  2667. resp.setDateTime(dateTime);
  2668. if (req.getBinaryFile())
  2669. resp.setViewable(false);
  2670. }
  2671. catch(IException* e)
  2672. {
  2673. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2674. }
  2675. return true;
  2676. }
  2677. bool CFileSprayEx::getDropZoneFiles(IEspContext &context, const char* dropZone, const char* netaddr, const char* path,
  2678. IEspDropZoneFilesRequest &req, IEspDropZoneFilesResponse &resp)
  2679. {
  2680. if (!checkDropZoneIPAndPath(context.getClientVersion(), dropZone, netaddr, path))
  2681. throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone is not found in the environment settings.");
  2682. bool directoryOnly = req.getDirectoryOnly();
  2683. RemoteFilename rfn;
  2684. SocketEndpoint ep;
  2685. #ifdef MACHINE_IP
  2686. ep.set(MACHINE_IP);
  2687. #else
  2688. ep.set(netaddr);
  2689. #endif
  2690. rfn.setPath(ep, path);
  2691. Owned<IFile> f = createIFile(rfn);
  2692. if(!f->isDirectory())
  2693. throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", path);
  2694. IArrayOf<IEspPhysicalFileStruct> files;
  2695. Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
  2696. if(di.get() != NULL)
  2697. {
  2698. ForEach(*di)
  2699. {
  2700. StringBuffer fname;
  2701. di->getName(fname);
  2702. if (fname.length() == 0 || (directoryOnly && !di->isDir()))
  2703. continue;
  2704. Owned<IEspPhysicalFileStruct> onefile = createPhysicalFileStruct();
  2705. onefile->setName(fname.str());
  2706. onefile->setIsDir(di->isDir());
  2707. onefile->setFilesize(di->getFileSize());
  2708. CDateTime modtime;
  2709. StringBuffer timestr;
  2710. di->getModifiedTime(modtime);
  2711. unsigned y,m,d,h,min,sec,nsec;
  2712. modtime.getDate(y,m,d,true);
  2713. modtime.getTime(h,min,sec,nsec,true);
  2714. timestr.appendf("%04d-%02d-%02d %02d:%02d:%02d", y,m,d,h,min,sec);
  2715. onefile->setModifiedtime(timestr.str());
  2716. files.append(*onefile.getLink());
  2717. }
  2718. }
  2719. resp.setFiles(files);
  2720. return true;
  2721. }
  2722. //This method returns all dropzones and, if NetAddress and Path specified, returns filtered list of files.
  2723. bool CFileSprayEx::onDropZoneFiles(IEspContext &context, IEspDropZoneFilesRequest &req, IEspDropZoneFilesResponse &resp)
  2724. {
  2725. try
  2726. {
  2727. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2728. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied.");
  2729. IpAddress ipToMatch;
  2730. const char* netAddress = req.getNetAddress();
  2731. if (!isEmptyString(netAddress))
  2732. {
  2733. ipToMatch.ipset(netAddress);
  2734. if (ipToMatch.isNull())
  2735. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid server %s specified.", netAddress);
  2736. }
  2737. bool filesFromALinux = false;
  2738. IArrayOf<IEspDropZone> dropZoneList;
  2739. bool ECLWatchVisibleOnly = req.getECLWatchVisibleOnly();
  2740. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  2741. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  2742. Owned<IConstDropZoneInfoIterator> dropZoneItr = constEnv->getDropZoneIterator();
  2743. ForEach(*dropZoneItr)
  2744. {
  2745. IConstDropZoneInfo& dropZoneInfo = dropZoneItr->query();
  2746. if (ECLWatchVisibleOnly && !dropZoneInfo.isECLWatchVisible())
  2747. continue;
  2748. SCMStringBuffer dropZoneName, directory, computerName;
  2749. dropZoneInfo.getName(dropZoneName);
  2750. dropZoneInfo.getDirectory(directory);
  2751. dropZoneInfo.getComputerName(computerName); //legacy env
  2752. if (!dropZoneName.length() || !directory.length())
  2753. continue;
  2754. bool isLinux = getPathSepChar(directory.str()) == '/' ? true : false;
  2755. Owned<IConstDropZoneServerInfoIterator> dropZoneServerItr = dropZoneInfo.getServers();
  2756. ForEach(*dropZoneServerItr)
  2757. {
  2758. IConstDropZoneServerInfo& dropZoneServer = dropZoneServerItr->query();
  2759. StringBuffer name, server, networkAddress;
  2760. dropZoneServer.getName(name);
  2761. dropZoneServer.getServer(server);
  2762. if (name.isEmpty() || server.isEmpty())
  2763. continue;
  2764. IpAddress ipAddr;
  2765. ipAddr.ipset(server.str());
  2766. ipAddr.getIpText(networkAddress);
  2767. Owned<IEspDropZone> aDropZone = createDropZone();
  2768. aDropZone->setName(dropZoneName.str());
  2769. aDropZone->setComputer(name.str());
  2770. aDropZone->setNetAddress(networkAddress.str());
  2771. aDropZone->setPath(directory.str());
  2772. if (isLinux)
  2773. aDropZone->setLinux("true");
  2774. if (!isEmptyString(netAddress) && ipAddr.ipequals(ipToMatch))
  2775. filesFromALinux = isLinux;
  2776. dropZoneList.append(*aDropZone.getClear());
  2777. }
  2778. }
  2779. if (dropZoneList.ordinality())
  2780. resp.setDropZones(dropZoneList);
  2781. const char* dzName = req.getDropZoneName();
  2782. const char* directory = req.getPath();
  2783. const char* subfolder = req.getSubfolder();
  2784. if (isEmptyString(netAddress) || (isEmptyString(directory) && isEmptyString(subfolder)))
  2785. return true;
  2786. StringBuffer netAddressStr, directoryStr, osStr;
  2787. netAddressStr.set(netAddress);
  2788. if (!isEmptyString(directory))
  2789. directoryStr.set(directory);
  2790. if (!isEmptyString(subfolder))
  2791. {
  2792. if (directoryStr.length())
  2793. addPathSepChar(directoryStr);
  2794. directoryStr.append(subfolder);
  2795. }
  2796. addPathSepChar(directoryStr);
  2797. getDropZoneFiles(context, dzName, netAddress, directoryStr.str(), req, resp);
  2798. resp.setDropZoneName(dzName);
  2799. resp.setNetAddress(netAddress);
  2800. resp.setPath(directoryStr.str());
  2801. resp.setOS(filesFromALinux);
  2802. resp.setECLWatchVisibleOnly(ECLWatchVisibleOnly);
  2803. }
  2804. catch(IException* e)
  2805. {
  2806. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2807. }
  2808. return true;
  2809. }
  2810. bool CFileSprayEx::onDeleteDropZoneFiles(IEspContext &context, IEspDeleteDropZoneFilesRequest &req, IEspDFUWorkunitsActionResponse &resp)
  2811. {
  2812. try
  2813. {
  2814. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Full, false))
  2815. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied.");
  2816. double version = context.getClientVersion();
  2817. const char* dzName = req.getDropZoneName();
  2818. const char* netAddress = req.getNetAddress();
  2819. const char* directory = req.getPath();
  2820. const char* osStr = req.getOS();
  2821. StringArray & files = req.getNames();
  2822. if (!files.ordinality())
  2823. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
  2824. StringBuffer path(directory);
  2825. if (!isEmptyString(osStr))
  2826. {
  2827. char pathSep = (atoi(osStr) == OS_WINDOWS) ? '\\' : '/';
  2828. path.replace(pathSep=='\\' ? '/' : '\\', pathSep);
  2829. }
  2830. addPathSepChar(path, getPathSepChar(path.str()));
  2831. if (!checkDropZoneIPAndPath(version, dzName, netAddress, path.str()))
  2832. throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone is not found in the environment settings.");
  2833. RemoteFilename rfn;
  2834. SocketEndpoint ep;
  2835. ep.set(netAddress);
  2836. rfn.setPath(ep, path.str());
  2837. Owned<IFile> f = createIFile(rfn);
  2838. if(!f->isDirectory())
  2839. throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", directory);
  2840. bool bAllSuccess = true;
  2841. IArrayOf<IEspDFUActionResult> results;
  2842. for(unsigned i = 0; i < files.ordinality(); ++i)
  2843. {
  2844. const char* file = files.item(i);
  2845. if (!file || !*file)
  2846. continue;
  2847. PROGLOG("DeleteDropZoneFiles: netAddress %s, path %s, file %s", netAddress, directory, file);
  2848. Owned<IEspDFUActionResult> res = createDFUActionResult("", "");
  2849. res->setID(files.item(i));
  2850. res->setAction("Delete");
  2851. res->setResult("Success");
  2852. try
  2853. {
  2854. StringBuffer fileToDelete = path;
  2855. fileToDelete.append(file);
  2856. rfn.setPath(ep, fileToDelete.str());
  2857. Owned<IFile> rFile = createIFile(rfn);
  2858. if (!rFile->exists())
  2859. res->setResult("Warning: this file does not exist.");
  2860. else
  2861. rFile->remove();
  2862. }
  2863. catch (IException *e)
  2864. {
  2865. bAllSuccess = false;
  2866. StringBuffer eMsg;
  2867. eMsg = e->errorMessage(eMsg);
  2868. e->Release();
  2869. StringBuffer failedMsg = "Failed: ";
  2870. failedMsg.append(eMsg);
  2871. res->setResult(failedMsg.str());
  2872. }
  2873. results.append(*res.getLink());
  2874. }
  2875. resp.setFirstColumn("File");
  2876. resp.setDFUActionResults(results);
  2877. }
  2878. catch(IException* e)
  2879. {
  2880. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2881. }
  2882. return true;
  2883. }
  2884. void CFileSprayEx::appendGroupNode(IArrayOf<IEspGroupNode>& groupNodes, const char* nodeName, const char* clusterType,
  2885. bool replicateOutputs)
  2886. {
  2887. Owned<IEspGroupNode> node = createGroupNode();
  2888. node->setName(nodeName);
  2889. node->setClusterType(clusterType);
  2890. if (replicateOutputs)
  2891. node->setReplicateOutputs(replicateOutputs);
  2892. groupNodes.append(*node.getClear());
  2893. }
  2894. bool CFileSprayEx::onGetSprayTargets(IEspContext &context, IEspGetSprayTargetsRequest &req, IEspGetSprayTargetsResponse &resp)
  2895. {
  2896. try
  2897. {
  2898. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2899. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied.");
  2900. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  2901. Owned<IConstEnvironment> environment = factory->openEnvironment();
  2902. Owned<IPropertyTree> root = &environment->getPTree();
  2903. if (!root)
  2904. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  2905. IArrayOf<IEspGroupNode> sprayTargets;
  2906. //Fetch all the group names for all the thor instances (and dedup them)
  2907. BoolHash uniqueThorClusterGroupNames;
  2908. Owned<IPropertyTreeIterator> it = root->getElements("Software/ThorCluster");
  2909. ForEach(*it)
  2910. {
  2911. IPropertyTree& cluster = it->query();
  2912. StringBuffer thorClusterGroupName;
  2913. getClusterGroupName(cluster, thorClusterGroupName);
  2914. if (!thorClusterGroupName.length())
  2915. continue;
  2916. bool* found = uniqueThorClusterGroupNames.getValue(thorClusterGroupName.str());
  2917. if (!found || !*found)
  2918. appendGroupNode(sprayTargets, thorClusterGroupName.str(), "thor", cluster.getPropBool("@replicateOutputs", false));
  2919. }
  2920. //Fetch all the group names for all the hthor instances
  2921. it.setown(root->getElements("Software/EclAgentProcess"));
  2922. ForEach(*it)
  2923. {
  2924. IPropertyTree &cluster = it->query();
  2925. const char* name = cluster.queryProp("@name");
  2926. if (!name || !*name)
  2927. continue;
  2928. unsigned ins = 0;
  2929. Owned<IPropertyTreeIterator> insts = cluster.getElements("Instance");
  2930. ForEach(*insts)
  2931. {
  2932. const char *na = insts->query().queryProp("@netAddress");
  2933. if (!na || !*na)
  2934. continue;
  2935. SocketEndpoint ep(na);
  2936. if (ep.isNull())
  2937. continue;
  2938. ins++;
  2939. VStringBuffer gname("hthor__%s", name);
  2940. if (ins>1)
  2941. gname.append('_').append(ins);
  2942. appendGroupNode(sprayTargets, gname.str(), "hthor", false);
  2943. }
  2944. }
  2945. resp.setGroupNodes(sprayTargets);
  2946. }
  2947. catch(IException* e)
  2948. {
  2949. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2950. }
  2951. return true;
  2952. }
  2953. void CFileSprayEx::setDFUServerQueueReq(const char* dfuServerQueue, IDFUWorkUnit* wu)
  2954. {
  2955. wu->setQueue((dfuServerQueue && *dfuServerQueue) ? dfuServerQueue : m_QueueLabel.str());
  2956. }
  2957. void CFileSprayEx::setUserAuth(IEspContext &context, IDFUWorkUnit* wu)
  2958. {
  2959. StringBuffer user, passwd;
  2960. wu->setUser(context.getUserID(user).str());
  2961. wu->setPassword(context.getPassword(passwd).str());
  2962. }
  2963. bool CFileSprayEx::onGetDFUServerQueues(IEspContext &context, IEspGetDFUServerQueuesRequest &req, IEspGetDFUServerQueuesResponse &resp)
  2964. {
  2965. try
  2966. {
  2967. if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false))
  2968. throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied.");
  2969. StringArray qlist;
  2970. getDFUServerQueueNames(qlist, req.getDFUServerName());
  2971. resp.setNames(qlist);
  2972. }
  2973. catch(IException* e)
  2974. {
  2975. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2976. }
  2977. return true;
  2978. }