/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #pragma warning (disable : 4786) #pragma warning (disable : 4129) #include #include "jsocket.hpp" #include "dasds.hpp" #include "dadfs.hpp" #include "dautils.hpp" #include "daclient.hpp" #include "wshelpers.hpp" #include "dfuwu.hpp" #include "ws_fsService.hpp" #ifdef _WIN32 #include "windows.h" #endif #include "TpWrapper.hpp" #include "LogicFileWrapper.hpp" #include "dfuutil.hpp" #include "portlist.h" #include "sacmd.hpp" #include "exception_util.hpp" #define DFU_WU_URL "DfuWorkunitsAccess" #define DFU_EX_URL "DfuExceptionsAccess" #define FILE_SPRAY_URL "FileSprayAccess" #define FILE_DESPRAY_URL "FileDesprayAccess" #define WUDETAILS_REFRESH_MINS 1 void SetResp(StringBuffer &resp, IConstDFUWorkUnit * wu, bool array); int Schedule::run() { try { while(true) { { Owned factory = getDFUWorkUnitFactory(); Owned itr = factory->getWorkUnitsByState(DFUstate_scheduled); itr->first(); while(itr->isValid()) { Owned wu = itr->get(); CDateTime dt, now; now.setNow(); try { wu->getTimeScheduled(dt); if (now.compare(dt) > 0) { StringAttr wuid(wu->queryId()); wu.clear(); submitDFUWorkUnit(wuid.get()); } } catch(IException *e) { StringBuffer msg; ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str()); e->Release(); } itr->next(); } } sleep(60); } } catch(IException *e) { StringBuffer msg; ERRLOG("Exception %d:%s in WS_FS Schedule::run", e->errorCode(), e->errorMessage(msg).str()); e->Release(); } catch(...) { ERRLOG("Unknown exception in WS_FS Schedule::run"); } return 0; } void CFileSprayEx::init(IPropertyTree *cfg, const char *process, const char *service) { StringBuffer xpath; xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/QueueLabel", process, service); cfg->getProp(xpath.str(), m_QueueLabel); xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/MonitorQueueLabel", process, service); cfg->getProp(xpath.str(), m_MonitorQueueLabel); xpath.clear().appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/RootFolder", process, service); cfg->getProp(xpath.str(), m_RootFolder); directories.set(cfg->queryPropTree("Software/Directories")); StringBuffer prop; prop.appendf("queueLabel=%s", m_QueueLabel.str()); PrintLog(prop.str()); prop.clear(); prop.appendf("monitorQueueLabel=%s", m_MonitorQueueLabel.str()); PrintLog(prop.str()); prop.clear(); prop.appendf("rootFolder=%s", m_RootFolder.str()); PrintLog(prop.str()); if (!daliClientActive()) { ERRLOG("No Dali Connection Active."); throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file"); } m_sched.start(); } void ParsePath(const char * fullPath, StringBuffer &ip, StringBuffer &filePath, StringBuffer &title) { ip.clear(); filePath.clear(); title.clear(); if(fullPath == NULL || *fullPath == '\0') return; const char* ptr = fullPath; if(*ptr == '\\' && *(ptr+1) == '\\') { ptr += 2; while(*ptr != '\0' && *ptr != '\\') ptr++; ip.append(ptr - fullPath - 2, fullPath + 2); } filePath.append(ptr); ptr = fullPath + strlen(fullPath) - 1; while(ptr > fullPath && *ptr != '\\') ptr--; title.append(ptr + 1); } const char * const NODATETIME="1970-01-01T00:00:00Z"; // Assign from a dfuwu workunit structure to an esp request workunit structure. static void DeepAssign(IConstDFUWorkUnit *src, IEspDFUWorkunit &dest) { if(src == NULL) throw MakeStringException(ECLWATCH_MISSING_PARAMS, "'Source DFU workunit' doesn't exist."); if(&dest == NULL) throw MakeStringException(ECLWATCH_MISSING_PARAMS, "'Destination DFU workunit' not valid."); Owned envFactory = getEnvironmentFactory(); Owned constEnv = envFactory->openEnvironmentByFile(); Owned root = &constEnv->getPTree(); if (!root) throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information."); StringBuffer tmp, encoded; dest.setID(src->queryId()); if (src->getClusterName(tmp.clear()).length()!=0) { char *clusterName = (char *)tmp.str(); if (clusterName && *clusterName) { StringBuffer clusterNameForDisplay(clusterName); Owned clusters= root->getElements("Software/Topology/Cluster"); if (clusters->first()) { do { IPropertyTree &cluster = clusters->query(); const char* name = cluster.queryProp("@name"); if (!name || !*name) continue; Owned thorClusters= cluster.getElements(eqThorCluster); Owned roxieClusters= cluster.getElements(eqRoxieCluster); if (thorClusters->first() || roxieClusters->first()) { if (thorClusters->first()) { IPropertyTree &thorCluster = thorClusters->query(); const char* process = thorCluster.queryProp("@process"); if (process && *process) { if (clusterName && !stricmp(clusterName, process)) { clusterNameForDisplay.clear().append(name); break; } } } if (roxieClusters->first()) { IPropertyTree &roxieCluster = roxieClusters->query(); const char* process = roxieCluster.queryProp("@process"); if (process && *process) { if (clusterName && !stricmp(clusterName, name)) { clusterNameForDisplay.clear().append(name); break; } } } } } while (clusters->next()); } dest.setClusterName(clusterNameForDisplay.str()); } } if (src->getJobName(tmp.clear()).length()!=0) dest.setJobName(tmp.str()); else dest.setJobName(""); if (src->getQueue(tmp.clear()).length()!=0) dest.setQueue(tmp.str()); if (src->getUser(tmp.clear()).length()!=0) dest.setUser(tmp.str()); dest.setIsProtected(src->isProtected()); dest.setCommand(src->getCommand()); IConstDFUprogress *prog = src->queryProgress(); if (prog != NULL) { DFUstate state = prog->getState(); dest.setState(state); StringBuffer statemsg; encodeDFUstate(state,statemsg); dest.setStateMessage(statemsg.str()); CDateTime startAt; CDateTime stoppAt; prog->getTimeStarted(startAt); prog->getTimeStopped(stoppAt); StringBuffer tmpstr; startAt.getDateString(tmpstr); tmpstr.append(" "); startAt.getTimeString(tmpstr); dest.setTimeStarted(tmpstr.str()); tmpstr.clear(); stoppAt.getDateString(tmpstr); tmpstr.append(" "); stoppAt.getTimeString(tmpstr); dest.setTimeStopped(tmpstr.str()); StringBuffer prgmsg; prog->formatProgressMessage(prgmsg); dest.setProgressMessage(prgmsg.str()); prog->formatSummaryMessage(prgmsg.clear()); dest.setSummaryMessage(prgmsg.str()); unsigned secs = prog->getSecsLeft(); if(secs > 0) dest.setSecsLeft(secs); dest.setPercentDone(prog->getPercentDone()); } IConstDFUoptions *options = src->queryOptions(); if(options) { dest.setReplicate(options->getReplicate()); dest.setOverwrite(options->getOverwrite()); } IConstDFUfileSpec * file = src->querySource(); if (file != NULL) { //if (file->getTitle(tmp.clear()).length()!=0) // dest.setSourceTitle(tmp.str()); StringBuffer lfn; file->getLogicalName(lfn); if (lfn.length() != 0) dest.setSourceLogicalName(lfn.str()); else dest.setSourceFormat(file->getFormat()); if (file->getRawDirectory(tmp.clear()).length()!=0) dest.setSourceDirectory(tmp.str()); SocketEndpoint srcdali; StringBuffer srcdaliip; file->getForeignDali(srcdali); srcdali.getIpText(srcdaliip); if(srcdaliip.length() > 0 && strcmp(srcdaliip.str(), "0.0.0.0") != 0) dest.setSourceDali(srcdaliip.str()); StringBuffer diffkeyname; file->getDiffKey(diffkeyname); if(diffkeyname.length() > 0) dest.setSourceDiffKeyName(diffkeyname.str()); StringBuffer socket, dir, title; unsigned np = file->getNumParts(0); // should handle multiple clusters? if (lfn.length() == 0) { // no logical name if (np == 1) { Owned info; try { info.setown(file->getFileDescriptor()); if(info) { info->getNode(0)->endpoint().getIpText(socket); dest.setSourceIP(socket.str()); const char *defaultdir = info->queryDefaultDir(); if (defaultdir&&*defaultdir) addPathSepChar(dir.append(defaultdir)); file->getRawFileMask(dir); dest.setSourceFilePath(dir.str()); } } catch(IException *e) { EXCLOG(e,"DeepAssign getFileDescriptor"); e->Release(); } } } if (np) dest.setSourceNumParts(np); unsigned rs = file->getRecordSize(); if (rs) dest.setSourceRecordSize(rs); StringBuffer rowtag; file->getRowTag(rowtag); if(rowtag.length() > 0) dest.setRowTag(rowtag.str()); } file = src->queryDestination(); if (file != NULL) { StringBuffer lfn; file->getLogicalName(lfn); if (lfn.length() != 0) dest.setDestLogicalName(lfn.str()); else dest.setDestFormat(file->getFormat()); if (file->getRawDirectory(tmp.clear()).length()!=0) dest.setDestDirectory(tmp.str()); if (file->getGroupName(0,tmp.clear()).length()!=0) // should handle multiple clusters? { char *clusterName = (char *)tmp.str(); if (clusterName) dest.setDestGroupName(clusterName); } StringBuffer socket, dir, title; unsigned np = file->getNumParts(0); // should handle multiple clusters? if (lfn.length() == 0) { // no logical name if (np == 1) { Owned info; try { info.setown(file->getFileDescriptor()); if(info) { info->getNode(0)->endpoint().getIpText(socket); dest.setDestIP(socket.str()); const char *defaultdir = info->queryDefaultDir(); if (defaultdir&&*defaultdir) addPathSepChar(dir.append(defaultdir)); file->getRawFileMask(dir); dest.setDestFilePath(dir.str()); } } catch(IException *e) { EXCLOG(e,"DeepAssign getFileDescriptor dest"); e->Release(); } } } if (np) dest.setDestNumParts(np); unsigned rs = file->getRecordSize(); if (rs) dest.setDestRecordSize(rs); dest.setCompress(file->isCompressed()); } // monitor stuff IConstDFUmonitor *monitor = src->queryMonitor(); if (monitor) { monitor->getEventName(tmp.clear()); if (tmp.length()) dest.setMonitorEventName(tmp.str()); bool sub = monitor->getSub(); dest.setMonitorSub(sub); unsigned sl = monitor->getShotLimit(); if (sl) dest.setMonitorShotLimit(sl); } } bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* cluster, StringBuffer &folder, StringBuffer &title, StringBuffer &defaultFolder, StringBuffer &defaultReplicateFolder) { if(!pLogicalPath || !*pLogicalPath) return false; folder.clear(); title.clear(); defaultFolder.clear(); defaultReplicateFolder.clear(); DFD_OS os = DFD_OSdefault; if(cluster != NULL && *cluster != '\0') { Owned group = queryNamedGroupStore().lookup(cluster); if (group) { switch (queryOS(group->queryNode(0).endpoint())) { case MachineOsW2K: os = DFD_OSwindows; break; case MachineOsSolaris: case MachineOsLinux: os = DFD_OSunix; break; } if (directories.get()) { getConfigurationDirectory(directories, "data", "thor", cluster, defaultFolder); getConfigurationDirectory(directories, "mirror", "thor", cluster, defaultReplicateFolder); } } else { // Error here? } } makePhysicalPartName(pLogicalPath,0,0,folder,false,os,defaultFolder.str()); const char *n = pLogicalPath; const char* p; do { p = strstr(n,"::"); if(p) n = p+2; } while(p); title.append(n); return true; } bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, StringBuffer &title) { if(!pLogicalPath || !*pLogicalPath) return false; title.clear(); const char *n = pLogicalPath; const char* p; do { p = strstr(n,"::"); if(p) n = p+2; } while(p); title.append(n); return true; } DFUclusterPartDiskMapping readClusterMappingSettings(const char *cluster, StringBuffer &dir, int& offset) { DFUclusterPartDiskMapping mapping = DFUcpdm_c_only; Owned envFactory = getEnvironmentFactory(); envFactory->validateCache(); StringBuffer dirxpath; dirxpath.appendf("Software/RoxieCluster[@name=\"%s\"]",cluster); Owned constEnv = envFactory->openEnvironmentByFile(); Owned pEnvRoot = &constEnv->getPTree(); Owned processes = pEnvRoot->getElements(dirxpath); if (processes->first()) { IPropertyTree &processe = processes->query(); const char *slaveConfig = processe.queryProp("@slaveConfig"); if (slaveConfig && *slaveConfig) { if (!stricmp(slaveConfig, "simple")) { mapping = DFUcpdm_c_only; } else if (!stricmp(slaveConfig, "overloaded")) { mapping = DFUcpdm_c_then_d; } else if (!stricmp(slaveConfig, "full_redundancy")) { ; } else //circular redundancy { mapping = DFUcpdm_c_replicated_by_d; offset = processe.getPropInt("@cyclicOffset"); } dir = processe.queryProp("@slaveDataDir"); } else { DBGLOG("Failed to get RoxieCluster settings"); throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "Failed to get RoxieCluster settings. The workunit will not be created."); } } else { DBGLOG("Failed to get RoxieCluster settings"); throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "Failed to get RoxieCluster settings. The workunit will not be created."); } return mapping; } void getClusterFromLFN(const char* lfn, StringBuffer& cluster, const char* username, const char* passwd) { Owned udesc; if(username != NULL && *username != '\0') { udesc.setown(createUserDescriptor()); udesc->set(username, passwd); } LogicFileWrapper lfw; lfw.FindClusterName(lfn, cluster, udesc); } StringBuffer& constructFileMask(const char* filename, StringBuffer& filemask) { filemask.clear().append(filename).toLowerCase().append("._$P$_of_$N$"); return filemask; } bool CFileSprayEx::onDFUWUSearch(IEspContext &context, IEspDFUWUSearchRequest & req, IEspDFUWUSearchResponse & resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); StringArray dfuclusters; Owned factory = getEnvironmentFactory(); Owned environment = factory->openEnvironmentByFile(); Owned root = &environment->getPTree(); if (!root) throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information."); Owned clusterIterator = root->getElements("Software/Topology/Cluster"); if (clusterIterator->first()) { do { IPropertyTree &cluster = clusterIterator->query(); const char *clusterName = cluster.queryProp("@name"); if (!clusterName || !*clusterName) continue; dfuclusters.append(clusterName); } while (clusterIterator->next()); } resp.setClusterNames(dfuclusters); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } int readFromCommaSeparatedString(const char *commaSeparatedString, StringBuffer* output) { int numOfItems = 0; if (commaSeparatedString && *commaSeparatedString) { char *pStr = (char *) commaSeparatedString; while (pStr) { char item[1024]; bool bFoundComma = false; int len = strlen(pStr); for (int i = 0; i < len; i++) { char *pStr1 = pStr + i; if (pStr1[0] != ',') continue; strncpy(item, pStr, pStr1 - pStr); item[pStr1 - pStr] = 0; bFoundComma = true; if (i < len - 1) pStr = pStr1 + 1; else pStr = NULL; break; } if (!bFoundComma && len > 0) { strcpy(item, pStr); pStr = NULL; } output[numOfItems] = item; numOfItems++; } } return numOfItems; } bool CFileSprayEx::GetArchivedDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits &req, IEspGetDFUWorkunitsResponse &resp) { StringBuffer user; context.getUserID(user); StringBuffer sashaAddress; IArrayOf sashaservers; CTpWrapper dummy; dummy.getTpSashaServers(sashaservers); ForEachItemIn(i, sashaservers) { IConstTpSashaServer& sashaserver = sashaservers.item(i); IArrayOf &sashaservermachine = sashaserver.getTpMachines(); sashaAddress.append(sashaservermachine.item(0).getNetaddress()); } SocketEndpoint ep; ep.set(sashaAddress,DEFAULT_SASHA_PORT); Owned sashaserver = createINode(ep); __int64 count=req.getPageSize(); if(count < 1) count=100; __int64 begin=req.getPageStartFrom(); if (begin < 0) begin = 0; Owned cmd = createSashaCommand(); cmd->setAction(SCA_LIST); cmd->setOnline(false); cmd->setArchived(true); cmd->setDFU(true); cmd->setLimit((int) count+1); cmd->setStart((int)begin); if(req.getCluster() && *req.getCluster()) cmd->setCluster(req.getCluster()); if(req.getOwner() && *req.getOwner()) cmd->setOwner(req.getOwner()); if(req.getJobname() && *req.getJobname()) cmd->setJobName(req.getJobname()); if(req.getStateReq() && *req.getStateReq()) cmd->setState(req.getStateReq()); cmd->setOutputFormat("owner,jobname,cluster,state,command");//date range/owner/jobname/state*/ if (!cmd->send(sashaserver)) { StringBuffer msg; msg.appendf("Cannot connect to archive server at %s",sashaAddress.str()); throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str()); } IArrayOf results; __int64 actualCount = cmd->numIds(); StringBuffer s; for (unsigned j=0;jqueryId(j); if (!wuidStr) continue; StringBuffer strArray[6]; readFromCommaSeparatedString(wuidStr, strArray); //skip any workunits without access Owned resultWU = createDFUWorkunit("", ""); resultWU->setArchived(true); if (strArray[0].length() > 0) resultWU->setID(strArray[0].str()); if (strArray[1].length() > 0) resultWU->setUser(strArray[1].str()); if (strArray[2].length() > 0) resultWU->setJobName(strArray[2].str()); if (strArray[3].length() > 0) resultWU->setClusterName(strArray[3].str()); if (strArray[4].length() > 0) resultWU->setStateMessage(strArray[4].str()); if (strArray[5].length() > 0) resultWU->setCommand(atoi(strArray[5].str())); results.append(*resultWU.getLink()); } resp.setPageStartFrom(begin+1); resp.setNextPage(-1); if(count < actualCount) { if (results.length() > count) { results.pop(); } resp.setNextPage(begin + count); resp.setPageEndAt(begin + count); } else { resp.setPageEndAt(begin + actualCount); } if(begin > 0) { resp.setFirst(false); if (begin - count > 0) resp.setPrevPage(begin - count); else resp.setPrevPage(0); } resp.setPageSize(count); resp.setResults(results); StringBuffer basicQuery; if (req.getStateReq() && *req.getStateReq()) { resp.setStateReq(req.getStateReq()); addToQueryString(basicQuery, "StateReq", req.getStateReq()); } if (req.getCluster() && *req.getCluster()) { resp.setCluster(req.getCluster()); addToQueryString(basicQuery, "Cluster", req.getCluster()); } if (req.getOwner() && *req.getOwner()) { resp.setOwner(req.getOwner()); addToQueryString(basicQuery, "Owner", req.getOwner()); } if (req.getType() && *req.getType()) { resp.setType(req.getType()); addToQueryString(basicQuery, "Type", req.getType()); } resp.setFilters(basicQuery.str()); resp.setBasicQuery(basicQuery.str()); return true; } bool CFileSprayEx::onGetDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits &req, IEspGetDFUWorkunitsResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); double version = context.getClientVersion(); if (version > 1.02) { const char *type = req.getType(); if (type && *type && !stricmp(type, "archived workunits")) { return GetArchivedDFUWorkunits(context, req, resp); } } StringBuffer clusterReq; const char *clusterName = req.getCluster(); if(clusterName && *clusterName) { clusterReq.append(clusterName); } Owned envFactory = getEnvironmentFactory(); Owned constEnv = envFactory->openEnvironmentByFile(); Owned root = &constEnv->getPTree(); if (!root) throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information."); StringArray targetClusters, clusterProcesses; Owned clusters= root->getElements("Software/Topology/Cluster"); if (clusters->first()) { do { IPropertyTree &cluster = clusters->query(); const char* name = cluster.queryProp("@name"); if (!name || !*name) continue; Owned thorClusters= cluster.getElements(eqThorCluster); Owned roxieClusters= cluster.getElements(eqRoxieCluster); if (thorClusters->first() || roxieClusters->first()) { bool bFound = false; if (thorClusters->first()) { IPropertyTree &thorCluster = thorClusters->query(); const char* process = thorCluster.queryProp("@process"); if (process && *process) { targetClusters.append(name); clusterProcesses.append(process); if (clusterName && !stricmp(clusterName, name)) { clusterReq.clear().append(process); } } } if (!bFound && roxieClusters->first()) { IPropertyTree &roxieCluster = roxieClusters->query(); const char* process = roxieCluster.queryProp("@process"); if (process && *process) { targetClusters.append(name); clusterProcesses.append(process); if (clusterName && !stricmp(clusterName, name)) { clusterReq.clear().append(process); } } } } } while (clusters->next()); } __int64 cachehint=0; __int64 pagesize = req.getPageSize(); __int64 pagefrom = req.getPageStartFrom(); __int64 displayFrom = 0; if (pagesize < 1) { pagesize = 100; } if (pagefrom > 0) { displayFrom = pagefrom; } DFUsortfield sortorder[2] = {DFUsf_wuid, DFUsf_term}; sortorder[0] = (DFUsortfield) (DFUsf_wuid + DFUsf_reverse); if(req.getSortby() && *req.getSortby()) { const char *sortby = req.getSortby(); if (!stricmp(sortby, "Owner")) sortorder[0] = DFUsf_user; else if (!stricmp(sortby, "JobName")) sortorder[0] = DFUsf_job; else if (!stricmp(sortby, "Cluster")) sortorder[0] = DFUsf_cluster; else if (!stricmp(sortby, "State")) sortorder[0] = DFUsf_state; else if (!stricmp(sortby, "Type")) sortorder[0] = DFUsf_command; else if (!stricmp(sortby, "Protected")) sortorder[0] = DFUsf_protected; else if (!stricmp(sortby, "PCTDone")) sortorder[0] = (DFUsortfield) (DFUsf_pcdone | DFUsf_numeric); else sortorder[0] = DFUsf_wuid; bool descending = req.getDescending(); if (descending) sortorder[0] = (DFUsortfield) (sortorder[0] | DFUsf_reverse); } DFUsortfield filters[10]; unsigned short filterCount = 0; MemoryBuffer filterbuf; if(req.getStateReq() && *req.getStateReq()) { filters[filterCount] = DFUsf_state; filterCount++; if (stricmp(req.getStateReq(), "unknown") != 0) filterbuf.append(req.getStateReq()); else filterbuf.append(""); } if(clusterName && *clusterName) { filters[filterCount] = DFUsf_cluster; filterCount++; filterbuf.append(clusterReq.str()); } if(req.getOwner() && *req.getOwner()) { filters[filterCount] = DFUsortfield (DFUsf_user | DFUsf_nocase); filterCount++; filterbuf.append(req.getOwner()); } if(req.getJobname() && *req.getJobname()) { filters[filterCount] = DFUsortfield (DFUsf_job | DFUsf_nocase); filterCount++; filterbuf.append(req.getJobname()); } filters[filterCount] = DFUsf_term; IArrayOf result; Owned factory = getDFUWorkUnitFactory(); unsigned numWUs = factory->numWorkUnitsFiltered(filters, filterbuf.bufferBase()); Owned itr = factory->getWorkUnitsSorted(sortorder, filters, filterbuf.bufferBase(), (int) displayFrom, (int) pagesize+1, req.getOwner(), &cachehint); //unsigned actualCount = 0; itr->first(); while(itr->isValid()) { Owned wu = itr->get(); //actualCount++; Owned resultWU = createDFUWorkunit("", ""); resultWU->setID(wu->queryId()); StringBuffer jobname, user, cluster; resultWU->setJobName(wu->getJobName(jobname).str()); resultWU->setCommand(wu->getCommand()); resultWU->setUser(wu->getUser(user).str()); const char* clusterName = wu->getClusterName(cluster).str(); if (clusterName) { StringBuffer clusterForDisplay(clusterName); if (clusterProcesses.ordinality()) { for (unsigned i = 0; i < clusterProcesses.length(); i++) { const char* clusterProcessName = clusterProcesses.item(i); if (!stricmp(clusterProcessName, clusterName)) { clusterForDisplay.clear().append(targetClusters.item(i)); break; } } } resultWU->setClusterName(clusterForDisplay.str()); } resultWU->setIsProtected(wu->isProtected()); IConstDFUprogress *prog = wu->queryProgress(); if (prog != NULL) { DFUstate state = prog->getState(); resultWU->setState(state); StringBuffer statemsg; encodeDFUstate(state,statemsg); resultWU->setStateMessage(statemsg.str()); resultWU->setPercentDone(prog->getPercentDone()); } result.append(*LINK(resultWU.getClear())); itr->next(); } if (result.length() > pagesize) result.pop(); resp.setPageSize(pagesize); resp.setNumWUs(numWUs); resp.setPageStartFrom(displayFrom + 1); if(displayFrom + pagesize < numWUs) { resp.setNextPage(displayFrom + pagesize); resp.setPageEndAt(pagefrom + pagesize); __int64 last = displayFrom + pagesize; while (last + pagesize < numWUs) { last += pagesize; } resp.setLastPage(last); } else { resp.setNextPage(-1); resp.setPageEndAt(numWUs); } if(displayFrom > 0) { resp.setFirst(false); if (displayFrom - pagesize > 0) resp.setPrevPage(displayFrom - pagesize); else resp.setPrevPage(0); } StringBuffer basicQuery; if (req.getStateReq() && *req.getStateReq()) { resp.setStateReq(req.getStateReq()); addToQueryString(basicQuery, "StateReq", req.getStateReq()); } if (req.getCluster() && *req.getCluster()) { resp.setCluster(req.getCluster()); addToQueryString(basicQuery, "Cluster", req.getCluster()); } if (req.getOwner() && *req.getOwner()) { resp.setOwner(req.getOwner()); addToQueryString(basicQuery, "Owner", req.getOwner()); } resp.setFilters(basicQuery.str()); if (req.getSortby() && *req.getSortby()) { resp.setSortby(req.getSortby()); if (req.getDescending()) resp.setDescending(req.getDescending()); StringBuffer strbuf = req.getSortby(); strbuf.append("="); String str1(strbuf.str()); String str(basicQuery.str()); if (str.indexOf(str1) < 0) { addToQueryString(basicQuery, "Sortby", req.getSortby()); if (req.getDescending()) addToQueryString(basicQuery, "Descending", "1"); } } resp.setBasicQuery(basicQuery.str()); resp.setResults(result); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } void CFileSprayEx::addToQueryString(StringBuffer &queryString, const char *name, const char *value) { if (queryString.length() > 0) { queryString.append("&"); } queryString.append(name); queryString.append("="); queryString.append(value); } void CFileSprayEx::getInfoFromSasha(IEspContext &context, const char *sashaServer, const char* wuid, IEspDFUWorkunit *info) { Owned cmd = createSashaCommand(); cmd->addId(wuid); cmd->setAction(SCA_GET); cmd->setArchived(true); cmd->setDFU(true); SocketEndpoint ep(sashaServer, DEFAULT_SASHA_PORT); Owned node = createINode(ep); if (!cmd->send(node,1*60*1000)) { DBGLOG("Cannot connect to Sasha server at %s",sashaServer); throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.",sashaServer); } if (cmd->numIds()==0) { DBGLOG("Could not read archived %s",wuid); throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot read workunit %s.",wuid); } unsigned num = cmd->numResults(); if (num < 1) return; StringBuffer res; cmd->getResult(0,res); if(res.length() < 1) return; Owned wu = createPTreeFromXMLString(res.str()); if (!wu) return; const char * command = wu->queryProp("@command"); const char * submitID = wu->queryProp("@submitID"); const char * cluster = wu->queryProp("@clusterName"); const char * queue = wu->queryProp("@queue"); const char * jobName = wu->queryProp("@jobName"); const char * protectedWU = wu->queryProp("@protected"); info->setID(wuid); info->setArchived(true); if (command && *command) info->setCommandMessage(command); if (cluster && *cluster) info->setClusterName(cluster); if (submitID && *submitID) info->setUser(submitID); if (queue && *queue) info->setQueue(queue); if (jobName && *jobName) info->setJobName(jobName); if (protectedWU && stricmp(protectedWU, "0")) info->setIsProtected(true); else info->setIsProtected(false); IPropertyTree *source = wu->queryPropTree("Source"); if(source) { const char * directory = source->queryProp("@directory"); const char * name = source->queryProp("@name"); if (directory && *directory) info->setSourceDirectory(directory); if (name && *name) info->setSourceLogicalName(name); } IPropertyTree *dest = wu->queryPropTree("Destination"); if(dest) { const char * directory = dest->queryProp("@directory"); int numParts = dest->getPropInt("@numparts", -1); if (directory && *directory) info->setDestDirectory(directory); if (numParts > 0) info->setDestNumParts(numParts); } IPropertyTree *progress = wu->queryPropTree("Progress"); if(progress) { const char * state = progress->queryProp("@state"); const char * timeStarted = progress->queryProp("@timestarted"); const char * timeStopped = progress->queryProp("@timestopped"); if (state && *state) info->setStateMessage(state); if (timeStarted && *timeStarted) { StringBuffer startStr = timeStarted; startStr.replace('T', ' '); info->setTimeStarted(startStr.str()); } if (timeStopped && *timeStopped) { StringBuffer stopStr = timeStopped; stopStr.replace('T', ' '); info->setTimeStopped(stopStr.str()); } } return; } bool CFileSprayEx::getArchivedWUInfo(IEspContext &context, IEspGetDFUWorkunit &req, IEspGetDFUWorkunitResponse &resp) { const char *wuid = req.getWuid(); if (wuid && *wuid) { StringBuffer sashaAddress; IArrayOf sashaservers; CTpWrapper dummy; dummy.getTpSashaServers(sashaservers); ForEachItemIn(i, sashaservers) { IConstTpSashaServer& sashaserver = sashaservers.item(i); IArrayOf &sashaservermachine = sashaserver.getTpMachines(); sashaAddress.append(sashaservermachine.item(0).getNetaddress()); } if (sashaAddress.length() < 1) { throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found."); } getInfoFromSasha(context, sashaAddress.str(), wuid, &resp.updateResult()); resp.setAutoRefresh(WUDETAILS_REFRESH_MINS); return true; } return false; } bool CFileSprayEx::onGetDFUWorkunit(IEspContext &context, IEspGetDFUWorkunit &req, IEspGetDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); const char* wuid = req.getWuid(); if (!wuid || !*wuid) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Dfu workunit ID not specified."); bool found = false; double version = context.getClientVersion(); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->openWorkUnit(wuid, false); if(wu) { IEspDFUWorkunit &result = resp.updateResult(); DeepAssign(wu, result); int n = resp.getResult().getState(); if (n == DFUstate_scheduled || n == DFUstate_queued || n == DFUstate_started) { resp.setAutoRefresh(WUDETAILS_REFRESH_MINS); } found = true; } else if ((version > 1.02) && getArchivedWUInfo(context, req, resp)) { found = true; } if (!found) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onGetDFUProgress(IEspContext &context, IEspProgressRequest &req, IEspProgressResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); const char* wuid = req.getWuid(); if(!wuid || !*wuid) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->openWorkUnit(req.getWuid(), false); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Dfu workunit %s not found.", req.getWuid()); resp.setWuid(req.getWuid()); IConstDFUprogress *prog = wu->queryProgress(); if (prog) { resp.setPercentDone(prog->getPercentDone()); resp.setKbPerSec(prog->getKbPerSec()); resp.setKbPerSecAve(prog->getKbPerSecAve()); resp.setSecsLeft(prog->getSecsLeft()); StringBuffer statestr; encodeDFUstate(prog->getState(), statestr); resp.setState(statestr.str()); resp.setSlavesDone(prog->getSlavesDone()); StringBuffer msg; prog->formatProgressMessage(msg); resp.setProgressMessage(msg.str()); prog->formatSummaryMessage(msg.clear()); resp.setSummaryMessage(msg.str()); prog->getTimeTaken(msg.clear()); resp.setTimeTaken(msg.str()); } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onCreateDFUWorkunit(IEspContext &context, IEspCreateDFUWorkunit &req, IEspCreateDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to create DFU workunit. Permission denied."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->commit(); const char * d = wu->queryId(); IEspDFUWorkunit &result = resp.updateResult(); DeepAssign(wu, result); result.setOverwrite(false); result.setReplicate(true); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onUpdateDFUWorkunit(IEspContext &context, IEspUpdateDFUWorkunit &req, IEspUpdateDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to update DFU workunit. Permission denied."); IConstDFUWorkunit & reqWU = req.getWu(); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->updateWorkUnit(reqWU.getID()); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Dfu workunit %s not found.", reqWU.getID()); IDFUprogress *prog = wu->queryUpdateProgress(); if (prog && req.getStateOrig() != reqWU.getState()) { if (prog->getState() != req.getStateOrig()) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot update DFU workunit %s because its state has been changed internally. Please refresh the page and try again.",reqWU.getID()); prog->setState((enum DFUstate)reqWU.getState()); } const char* clusterOrig = req.getClusterOrig(); const char* cluster = reqWU.getClusterName(); if(cluster && (!clusterOrig || stricmp(clusterOrig, cluster))) { wu->setClusterName(reqWU.getClusterName()); } const char* jobNameOrig = req.getJobNameOrig(); const char* jobName = reqWU.getJobName(); if(jobName && (!jobNameOrig || stricmp(jobNameOrig, jobName))) { wu->setJobName(jobName); } if (reqWU.getIsProtected() != req.getIsProtectedOrig()) wu->protect(reqWU.getIsProtected()); wu->commit(); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(reqWU.getID()).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool markWUFailed(IDFUWorkUnitFactory *f, const char *wuid) { Owned wu = f->updateWorkUnit(wuid); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Dfu workunit %s not found.", wuid); IDFUprogress *prog = wu->queryUpdateProgress(); if(!prog) throw MakeStringException(ECLWATCH_PROGRESS_INFO_NOT_FOUND, "progress information not found for workunit %s.", wuid); else if(prog->getState() == DFUstate_started) throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Cannot delete workunit %s because its state is Started.", wuid); else { prog->setState(DFUstate_failed); return true; } return false; } bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsActionRequest &req, IEspDFUWorkunitsActionResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to update DFU workunit. Permission denied."); bool bAllSuccess = true; IArrayOf results; const char* action = req.getType(); if(!action || !*action || !strcmp(action, "Delete")) { Owned factory = getDFUWorkUnitFactory(); StringArray & wuids = req.getWuids(); for(unsigned i = 0; i < wuids.ordinality(); ++i) { Owned res = createDFUActionResult("", ""); res->setID(wuids.item(i)); res->setAction("Delete"); res->setResult("Success"); try { if (markWUFailed(factory, wuids.item(i))) { if (!factory->deleteWorkUnit(wuids.item(i))) throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Failed in deleting workunit %s.", wuids.item(i)); } } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; eMsg = e->errorMessage(eMsg); e->Release(); StringBuffer failedMsg = "Failed: "; failedMsg.append(eMsg); res->setResult(failedMsg.str()); } results.append(*LINK(res.getClear())); } } else if (!strcmp(action, "Restore")) { StringBuffer sashaAddress; IArrayOf sashaservers; CTpWrapper dummy; dummy.getTpSashaServers(sashaservers); ForEachItemIn(i, sashaservers) { IConstTpSashaServer& sashaserver = sashaservers.item(i); IArrayOf &sashaservermachine = sashaserver.getTpMachines(); sashaAddress.append(sashaservermachine.item(0).getNetaddress()); } if (sashaAddress.length() < 1) { throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found."); } SocketEndpoint ep(sashaAddress.str(), DEFAULT_SASHA_PORT); Owned node = createINode(ep); Owned cmd = createSashaCommand(); cmd->setAction(SCA_RESTORE); cmd->setDFU(true); StringArray & wuids = req.getWuids(); for(unsigned ii = 0; ii < wuids.ordinality(); ++ii) { StringBuffer msg; const char *wuid = wuids.item(ii); cmd->addId(wuid); if (!cmd->send(node,1*60*1000)) { throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.",sashaAddress.str()); } if (cmd->numIds()==0) { bAllSuccess = false; msg.appendf("Restore failed for %s", wuid); } else { StringBuffer reply; cmd->getId(0,reply); msg.appendf("Restore: %s, reply: %s", wuid, reply.str()); } Owned res = createDFUActionResult("", ""); res->setID(wuid); res->setAction("Restore"); res->setResult(msg.str()); results.append(*LINK(res.getClear())); } } else if(!strcmp(action, "Protect")) { Owned factory = getDFUWorkUnitFactory(); StringArray & wuids = req.getWuids(); for(unsigned i = 0; i < wuids.ordinality(); ++i) { Owned res = createDFUActionResult("", ""); res->setID(wuids.item(i)); res->setAction("Protect"); res->setResult("Success"); try { Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->updateWorkUnit(wuids.item(i)); if(!wu.get()) continue; wu->protect(true); wu->commit(); } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; eMsg = e->errorMessage(eMsg); e->Release(); StringBuffer failedMsg = "Failed: "; failedMsg.append(eMsg); res->setResult(failedMsg.str()); } results.append(*LINK(res.getClear())); } } else if(!strcmp(action, "Unprotect")) { Owned factory = getDFUWorkUnitFactory(); StringArray & wuids = req.getWuids(); for(unsigned i = 0; i < wuids.ordinality(); ++i) { Owned res = createDFUActionResult("", ""); res->setID(wuids.item(i)); res->setAction("Unprotect"); res->setResult("Success"); try { Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->updateWorkUnit(wuids.item(i)); if(!wu.get()) continue; wu->protect(false); wu->commit(); } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; eMsg = e->errorMessage(eMsg); e->Release(); StringBuffer failedMsg = "Failed: "; failedMsg.append(eMsg); res->setResult(failedMsg.str()); } results.append(*LINK(res.getClear())); } } else if(!strcmp(action, "SetToFailed")) { Owned factory = getDFUWorkUnitFactory(); StringArray & wuids = req.getWuids(); for(unsigned i = 0; i < wuids.ordinality(); ++i) { Owned res = createDFUActionResult("", ""); res->setID(wuids.item(i)); res->setAction("SetToFailed"); res->setResult("Success"); try { Owned wu = factory->updateWorkUnit(wuids.item(i)); if(wu) { IDFUprogress *prog = wu->queryUpdateProgress(); if (prog) { prog->setState(DFUstate_failed); wu->commit(); } } } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; eMsg = e->errorMessage(eMsg); e->Release(); StringBuffer failedMsg = "Failed: "; failedMsg.append(eMsg); res->setResult(failedMsg.str()); } results.append(*LINK(res.getClear())); } } else throw MakeStringException(ECLWATCH_INVALID_ACTION, "Unknown action type %s", action); if (bAllSuccess && strcmp(action, "Delete")) { if (!strcmp(action, "Restore")) resp.setRedirectUrl("/FileSpray/GetDFUWorkunits?Type=archived workunits"); else resp.setRedirectUrl("/FileSpray/GetDFUWorkunits"); } else { resp.setDFUActionResults(results); } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDeleteDFUWorkunits(IEspContext &context, IEspDeleteDFUWorkunits &req, IEspDeleteDFUWorkunitsResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to delete DFU workunit. Permission denied."); Owned factory = getDFUWorkUnitFactory(); StringArray & wuids = req.getWuids(); for(unsigned i = 0; i < wuids.ordinality(); ++i) { if (markWUFailed(factory, wuids.item(i))) factory->deleteWorkUnit(wuids.item(i)); } resp.setRedirectUrl("/FileSpray/GetDFUWorkunits"); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDeleteDFUWorkunit(IEspContext &context, IEspDeleteDFUWorkunit &req, IEspDeleteDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to delete DFU workunit. Permission denied."); Owned factory = getDFUWorkUnitFactory(); if (markWUFailed(factory, req.getWuid())) resp.setResult(factory->deleteWorkUnit(req.getWuid())); else resp.setResult(false); resp.setRedirectUrl("/FileSpray/GetDFUWorkunits"); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onSubmitDFUWorkunit(IEspContext &context, IEspSubmitDFUWorkunit &req, IEspSubmitDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to submit DFU workunit. Permission denied."); submitDFUWorkUnit(req.getWuid()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(req.getWuid()).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onAbortDFUWorkunit(IEspContext &context, IEspAbortDFUWorkunit &req, IEspAbortDFUWorkunitResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to abort DFU workunit. Permission denied."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->updateWorkUnit(req.getWuid()); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid()); wu->requestAbort(); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(req.getWuid()).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onGetDFUExceptions(IEspContext &context, IEspGetDFUExceptions &req, IEspGetDFUExceptionsResponse &resp) { try { if (!context.validateFeatureAccess(DFU_EX_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_EX_ACCESS_DENIED, "Failed to get DFU Exceptions. Permission denied."); IArrayOf result; Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->updateWorkUnit(req.getWuid()); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Dfu workunit %s not found.", req.getWuid()); Owned itr = wu->getExceptionIterator(); itr->first(); while(itr->isValid()) { Owned resultE = createDFUException("", ""); IException &e = itr->query(); resultE->setCode(e.errorCode()); StringBuffer msg; resultE->setMessage(e.errorMessage(msg).str()); result.append(*LINK(resultE.getClear())); itr->next(); } resp.setResult(result); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onSprayFixed(IEspContext &context, IEspSprayFixed &req, IEspSprayFixedResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Spray. Permission denied."); StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder; const char* destCluster = req.getDestGroup(); if(destCluster == NULL || *destCluster == '\0') throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination cluster/group not specified."); MemoryBuffer& srcxml = (MemoryBuffer&)req.getSrcxml(); const char* srcip = req.getSourceIP(); const char* srcfile = req.getSourcePath(); if(srcxml.length() == 0) { if(!srcip || !*srcip) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source network IP not specified."); if(!srcfile || !*srcfile) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source file not specified."); } bool nosplit = req.getNosplit(); int recordsize = req.getSourceRecordSize(); if(recordsize == 0 && !nosplit) // -ve record sizes for blocked throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid record size"); const char* destname = req.getDestLogicalName(); if(!destname || !*destname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified."); CDfsLogicalFileName lfn; if (!lfn.setValidate(destname)) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid destination filename"); destname = lfn.get(); StringBuffer gName, ipAddr; const char *pTr = strchr(destCluster, ' '); if (pTr) { gName.append(pTr - destCluster, destCluster); ipAddr.append(pTr+1); } else gName.append(destCluster); if (ipAddr.length() > 0) ParseLogicalPath(destname, ipAddr.str(), destFolder, destTitle, defaultFolder, defaultReplicateFolder); else ParseLogicalPath(destname, destCluster, destFolder, destTitle, defaultFolder, defaultReplicateFolder); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setClusterName(gName.str()); wu->setJobName(destTitle.str()); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_import); IDFUfileSpec *source = wu->queryUpdateSource(); if(srcxml.length() == 0) { RemoteMultiFilename rmfn; SocketEndpoint ep(srcip); rmfn.setEp(ep); StringBuffer fnamebuf(srcfile); fnamebuf.trim(); rmfn.append(fnamebuf.str()); // handles comma separated files source->setMultiFilename(rmfn); } else { srcxml.append('\0'); source->setFromXML((const char*)srcxml.toByteArray()); } IDFUfileSpec *destination = wu->queryUpdateDestination(); if(recordsize > 0) source->setRecordSize(recordsize); else if (recordsize == RECFMVB_RECSIZE_ESCAPE) { source->setFormat(DFUff_recfmvb); destination->setFormat(DFUff_variable); } else if (recordsize == RECFMV_RECSIZE_ESCAPE) { source->setFormat(DFUff_recfmv); destination->setFormat(DFUff_variable); } else if (recordsize == PREFIX_VARIABLE_RECSIZE_ESCAPE) { source->setFormat(DFUff_variable); destination->setFormat(DFUff_variable); } else if (recordsize == PREFIX_VARIABLE_BIGENDIAN_RECSIZE_ESCAPE) { source->setFormat(DFUff_variablebigendian); destination->setFormat(DFUff_variable); } destination->setLogicalName(destname); destination->setDirectory(destFolder.str()); StringBuffer fileMask; constructFileMask(destTitle.str(), fileMask); destination->setFileMask(fileMask.str()); destination->setGroupName(gName.str()); const char * encryptkey = req.getEncrypt(); if(req.getCompress()||(encryptkey&&*encryptkey)) destination->setCompressed(true); ClusterPartDiskMapSpec mspec; destination->getClusterPartDiskMapSpec(gName.str(), mspec); mspec.setDefaultBaseDir(defaultFolder.str()); mspec.setDefaultReplicateDir(defaultReplicateFolder.str()); destination->setClusterPartDiskMapSpec(gName.str(), mspec); int repo = req.getReplicateOffset(); bool isNull = req.getReplicateOffset_isNull(); if (!isNull && (repo!=1)) destination->setReplicateOffset(repo); if (req.getWrap()) destination->setWrap(true); IDFUoptions *options = wu->queryUpdateOptions(); const char * decryptkey = req.getDecrypt(); if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey)) options->setEncDec(encryptkey,decryptkey); options->setReplicate(req.getReplicate()); options->setOverwrite(req.getOverwrite()); // needed if target already exists const char* prefix = req.getPrefix(); if(prefix && *prefix) options->setLengthPrefix(prefix); if(req.getNosplit()) options->setNoSplit(true); if(req.getNorecover()) options->setNoRecover(true); if(req.getMaxConnections() > 0) options->setmaxConnections(req.getMaxConnections()); if(req.getThrottle() > 0) options->setThrottle(req.getThrottle()); if(req.getTransferBufferSize() > 0) options->setTransferBufferSize(req.getTransferBufferSize()); if (req.getPull()) options->setPull(true); if (req.getPush()) options->setPush(true); resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req, IEspSprayResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Spray. Permission denied."); StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder; const char* destCluster = req.getDestGroup(); if(destCluster == NULL || *destCluster == '\0') throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination cluster/group not specified."); StringBuffer gName, ipAddr; const char *pTr = strchr(destCluster, ' '); if (pTr) { gName.append(pTr - destCluster, destCluster); ipAddr.append(pTr+1); } else gName.append(destCluster); MemoryBuffer& srcxml = (MemoryBuffer&)req.getSrcxml(); const char* srcip = req.getSourceIP(); const char* srcfile = req.getSourcePath(); if(srcxml.length() == 0) { if(!srcip || !*srcip) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source network IP not specified."); if(!srcfile || !*srcfile) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source file not specified."); } const char* destname = req.getDestLogicalName(); if(!destname || !*destname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified."); CDfsLogicalFileName lfn; if (!lfn.setValidate(destname)) throw MakeStringException(ECLWATCH_INVALID_INPUT, "invalid destination filename"); destname = lfn.get(); if (ipAddr.length() > 0) ParseLogicalPath(destname, ipAddr.str(), destFolder, destTitle, defaultFolder, defaultReplicateFolder); else ParseLogicalPath(destname, destCluster, destFolder, destTitle, defaultFolder, defaultReplicateFolder); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setClusterName(gName.str()); wu->setJobName(destTitle.str()); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_import); IDFUfileSpec *source = wu->queryUpdateSource(); IDFUfileSpec *destination = wu->queryUpdateDestination(); IDFUoptions *options = wu->queryUpdateOptions(); if(srcxml.length() == 0) { RemoteMultiFilename rmfn; SocketEndpoint ep(srcip); rmfn.setEp(ep); StringBuffer fnamebuf(srcfile); fnamebuf.trim(); rmfn.append(fnamebuf.str()); // handles comma separated files source->setMultiFilename(rmfn); } else { srcxml.append('\0'); source->setFromXML((const char*)srcxml.toByteArray()); } source->setMaxRecordSize(req.getSourceMaxRecordSize()); source->setFormat((DFUfileformat)req.getSourceFormat()); // if rowTag specified, it means it's xml format, otherwise it's csv const char* rowtag = req.getSourceRowTag(); if(rowtag != NULL && *rowtag != '\0') { source->setRowTag(rowtag); options->setKeepHeader(true); } else { const char* cs = req.getSourceCsvSeparate(); if (req.getNoSourceCsvSeparator()) { cs = ""; } else if(cs == NULL || *cs == '\0') cs = "\\,"; const char* ct = req.getSourceCsvTerminate(); if(ct == NULL || *ct == '\0') ct = "\\n,\\r\\n"; const char* cq = req.getSourceCsvQuote(); if(cq== NULL) cq = "'"; source->setCsvOptions(cs, ct, cq); } destination->setLogicalName(destname); destination->setDirectory(destFolder.str()); StringBuffer fileMask; constructFileMask(destTitle.str(), fileMask); destination->setFileMask(fileMask.str()); destination->setGroupName(gName.str()); ClusterPartDiskMapSpec mspec; destination->getClusterPartDiskMapSpec(gName.str(), mspec); mspec.setDefaultBaseDir(defaultFolder.str()); mspec.setDefaultReplicateDir(defaultReplicateFolder.str()); destination->setClusterPartDiskMapSpec(gName.str(), mspec); const char * encryptkey = req.getEncrypt(); if(req.getCompress()||(encryptkey&&*encryptkey)) destination->setCompressed(true); const char * decryptkey = req.getDecrypt(); if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey)) options->setEncDec(encryptkey,decryptkey); int repo = req.getReplicateOffset(); bool isNull = req.getReplicateOffset_isNull(); if (!isNull && (repo!=1)) destination->setReplicateOffset(repo); options->setReplicate(req.getReplicate()); options->setOverwrite(req.getOverwrite()); // needed if target already exists const char* prefix = req.getPrefix(); if(prefix && *prefix) options->setLengthPrefix(prefix); if(req.getNosplit()) options->setNoSplit(true); if(req.getNorecover()) options->setNoRecover(true); if(req.getMaxConnections() > 0) options->setmaxConnections(req.getMaxConnections()); if(req.getThrottle() > 0) options->setThrottle(req.getThrottle()); if(req.getTransferBufferSize() > 0) options->setTransferBufferSize(req.getTransferBufferSize()); if (req.getPull()) options->setPull(true); if (req.getPush()) options->setPush(true); resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onReplicate(IEspContext &context, IEspReplicate &req, IEspReplicateResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Replicate. Permission denied."); const char* srcname = req.getSourceLogicalName(); if(!srcname || !*srcname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); StringBuffer jobname = "Replicate: "; jobname.append(srcname); wu->setJobName(jobname.str()); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_replicate); IDFUfileSpec *source = wu->queryUpdateSource(); if (source) { source->setLogicalName(srcname); int repo = req.getReplicateOffset(); if (repo!=1) source->setReplicateOffset(repo); } const char* cluster = req.getCluster(); if(cluster && *cluster) { IDFUoptions *opt = wu->queryUpdateOptions(); opt->setReplicateMode(DFURMmissing,cluster,req.getRepeatLast(),req.getOnlyRepeated()); } resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDespray(IEspContext &context, IEspDespray &req, IEspDesprayResponse &resp) { try { if (!context.validateFeatureAccess(FILE_DESPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_DESPRAY_ACCESS_DENIED, "Failed to do Despray. Permission denied."); const char* srcname = req.getSourceLogicalName(); if(!srcname || !*srcname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified."); const char* destip = req.getDestIP(); StringBuffer fnamebuf(req.getDestPath()); const char* destfile = fnamebuf.trim().str(); MemoryBuffer& dstxml = (MemoryBuffer&)req.getDstxml(); if(dstxml.length() == 0) { if(!destip || !*destip) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination network IP not specified."); if(!destfile || !*destfile) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination file not specified."); } StringBuffer srcTitle; ParseLogicalPath(srcname, srcTitle); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setJobName(srcTitle.str()); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_export); IDFUfileSpec *source = wu->queryUpdateSource(); IDFUfileSpec *destination = wu->queryUpdateDestination(); IDFUoptions *options = wu->queryUpdateOptions(); source->setLogicalName(srcname); if(dstxml.length() == 0) { RemoteFilename rfn; SocketEndpoint ep(destip); rfn.setPath(ep, destfile); destination->setSingleFilename(rfn); } else { dstxml.append('\0'); destination->setFromXML((const char*)dstxml.toByteArray()); } destination->setTitle(srcTitle.str()); options->setKeepHeader(true); options->setOverwrite(req.getOverwrite()); // needed if target already exists const char* splitprefix = req.getSplitprefix(); if(splitprefix && *splitprefix) options->setSplitPrefix(splitprefix); double version = context.getClientVersion(); if (version > 1.01) { if(req.getMaxConnections() > 0) options->setmaxConnections(req.getMaxConnections()); else if(req.getSingleConnection()) options->setmaxConnections(1); } else { if(req.getMaxConnections() > 0) options->setmaxConnections(req.getMaxConnections()); } if(req.getThrottle() > 0) options->setThrottle(req.getThrottle()); if(req.getTransferBufferSize() > 0) options->setTransferBufferSize(req.getTransferBufferSize()); if(req.getNorecover()) options->setNoRecover(true); if (req.getWrap()) { options->setPush(); // I think needed for a despray destination->setWrap(true); } if (req.getMultiCopy()) destination->setMultiCopy(true); const char * encryptkey = req.getEncrypt(); if(req.getCompress()||(encryptkey&&*encryptkey)) destination->setCompressed(true); const char * decryptkey = req.getDecrypt(); if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey)) options->setEncDec(encryptkey,decryptkey); resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::doCopyForRoxie(IEspContext &context, const char * srcName, const char * srcDali, const char * srcUser, const char * srcPassword, const char * dstName, const char * destCluster, bool compressed, bool overwrite, bool supercopy, DFUclusterPartDiskMapping val, StringBuffer baseDir, StringBuffer fileMask, IEspCopyResponse &resp) { StringBuffer user, passwd; Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); if (supercopy) { wu->setJobName(dstName); wu->setQueue(m_QueueLabel.str()); wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setClusterName(destCluster); IDFUfileSpec *source = wu->queryUpdateSource(); wu->setCommand(DFUcmd_supercopy); // **** super copy source->setLogicalName(srcName); if (srcDali) // remote copy { SocketEndpoint ep(srcDali); source->setForeignDali(ep); source->setForeignUser(srcUser, srcPassword); } IDFUfileSpec *destination = wu->queryUpdateDestination(); destination->setLogicalName(dstName); destination->setFileMask(fileMask); destination->setClusterPartDiskMapping(val, baseDir, destCluster); // roxie destination->setRoxiePrefix(destCluster); // added to start of each file and sub file name if(compressed) destination->setCompressed(true); destination->setWrap(true); // roxie always wraps IDFUoptions *options = wu->queryUpdateOptions(); options->setOverwrite(overwrite); options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie } else { wu->setJobName(dstName); wu->setQueue(m_QueueLabel.str()); wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setClusterName(destCluster); wu->setCommand(DFUcmd_copy); IDFUfileSpec *source = wu->queryUpdateSource(); source->setLogicalName(srcName); if (srcDali) // remote copy { SocketEndpoint ep(srcDali); source->setForeignDali(ep); source->setForeignUser(srcUser, srcPassword); } IDFUfileSpec *destination = wu->queryUpdateDestination(); destination->setLogicalName(dstName); destination->setFileMask(fileMask); destination->setRoxiePrefix(destCluster); // added to start of each file name destination->setClusterPartDiskMapping(val, baseDir, destCluster, true); // **** repeat last part if(compressed) destination->setCompressed(true); destination->setWrap(true); // roxie always wraps IDFUoptions *options = wu->queryUpdateOptions(); options->setOverwrite(overwrite); options->setReplicate(val==DFUcpdm_c_replicated_by_d); // roxie options->setSuppressNonKeyRepeats(true); // **** only repeat last part when src kind = key } resp.setResult(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); return true; } bool CFileSprayEx::onCopy(IEspContext &context, IEspCopy &req, IEspCopyResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Copy. Permission denied."); const char* srcname = req.getSourceLogicalName(); const char* dstname = req.getDestLogicalName(); if(!srcname || !*srcname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified."); if(!dstname || !*dstname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination logical file not specified."); StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder; StringBuffer srcCluster, destCluster, destClusterName; bool bRoxie = false; const char* destCluster0 = req.getDestGroup(); if(destCluster0 == NULL || *destCluster0 == '\0') { getClusterFromLFN(srcname, srcCluster, context.queryUserId(), context.queryPassword()); DBGLOG("Destination cluster/group not specified, using source cluster %s", srcCluster.str()); destCluster = srcCluster.str(); destClusterName = srcCluster.str(); } else { destCluster = destCluster0; destClusterName = destCluster0; const char* destClusterRoxie = req.getDestGroupRoxie(); if (destClusterRoxie && !stricmp(destClusterRoxie, "Yes")) { bRoxie = true; } } int offset; StringBuffer sbf, baseDir; DFUclusterPartDiskMapping val; CDfsLogicalFileName lfn; if (!bRoxie) { if (!lfn.setValidate(dstname)) throw MakeStringException(ECLWATCH_INVALID_INPUT, "invalid destination filename"); dstname = lfn.get(); } else { val = readClusterMappingSettings(destCluster.str(), baseDir, offset); } ParseLogicalPath(dstname, destCluster.str(), destFolder, destTitle, defaultFolder, defaultReplicateFolder); StringBuffer fileMask; constructFileMask(destTitle.str(), fileMask); const char* srcDali = req.getSourceDali(); bool supercopy = req.getSuperCopy(); if (supercopy) { StringBuffer user, passwd; context.getUserID(user); context.getPassword(passwd); StringBuffer u(user); StringBuffer p(passwd); Owned foreigndali; if (srcDali) { SocketEndpoint ep(srcDali); foreigndali.setown(createINode(ep)); const char* srcu = req.getSrcusername(); if(srcu && *srcu) { u.clear().append(srcu); p.clear().append(req.getSrcpassword()); } } Owned udesc=createUserDescriptor(); udesc->set(u.str(),p.str()); if (!queryDistributedFileDirectory().isSuperFile(srcname,foreigndali,udesc)) supercopy = false; } if (bRoxie) { bool compressRoxieCopy = false; bool overwriteRoxieCopy = false; if(req.getCompress()) compressRoxieCopy = true; if(req.getOverwrite()) overwriteRoxieCopy = true; return doCopyForRoxie(context, srcname, req.getSourceDali(), req.getSrcusername(), req.getSrcpassword(), dstname, destCluster, req.getCompress(), req.getOverwrite(), supercopy, val, baseDir, fileMask, resp); } Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setJobName(dstname); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); if(destCluster.length() > 0) { wu->setClusterName(destCluster.str()); } const char* srcDiffKeyName = req.getSourceDiffKeyName(); const char* destDiffKeyName = req.getDestDiffKeyName(); IDFUfileSpec *source = wu->queryUpdateSource(); IDFUfileSpec *destination = wu->queryUpdateDestination(); IDFUoptions *options = wu->queryUpdateOptions(); if (supercopy) wu->setCommand(DFUcmd_supercopy); else wu->setCommand(DFUcmd_copy); source->setLogicalName(srcname); if(srcDali && *srcDali) { SocketEndpoint ep(srcDali); source->setForeignDali(ep); const char* srcusername = req.getSrcusername(); if(srcusername && *srcusername) { const char* srcpasswd = req.getSrcpassword(); source->setForeignUser(srcusername, srcpasswd); } } if (bRoxie) { destination->setClusterPartDiskMapping(val, baseDir.str(), destCluster.str()); if (val != DFUcpdm_c_replicated_by_d) { options->setReplicate(false); } else { options->setReplicate(true); destination->setReplicateOffset(offset); } } if (srcDiffKeyName&&*srcDiffKeyName) source->setDiffKey(srcDiffKeyName); if (destDiffKeyName&&*destDiffKeyName) destination->setDiffKey(destDiffKeyName); if (!bRoxie) { destination->setDirectory(destFolder.str()); ClusterPartDiskMapSpec mspec; destination->getClusterPartDiskMapSpec(destCluster.str(), mspec); mspec.setDefaultBaseDir(defaultFolder.str()); mspec.setDefaultReplicateDir(defaultReplicateFolder.str()); destination->setClusterPartDiskMapSpec(destCluster.str(), mspec); } destination->setFileMask(fileMask.str()); destination->setGroupName(destCluster.str()); destination->setLogicalName(dstname); const char * encryptkey = req.getEncrypt(); if(req.getCompress()||(encryptkey&&*encryptkey)) destination->setCompressed(true); if (!bRoxie) { options->setReplicate(req.getReplicate()); destination->setWrap(req.getWrap()); } else { destination->setWrap(true); } const char * decryptkey = req.getDecrypt(); if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey)) options->setEncDec(encryptkey,decryptkey); options->setOverwrite(req.getOverwrite()); if(req.getNorecover()) options->setNoRecover(true); if(!req.getNosplit_isNull()) options->setNoSplit(req.getNosplit()); if(req.getMaxConnections() > 0) options->setmaxConnections(req.getMaxConnections()); if(req.getThrottle() > 0) options->setThrottle(req.getThrottle()); if(req.getTransferBufferSize() > 0) options->setTransferBufferSize(req.getTransferBufferSize()); if (req.getPull()) options->setPull(true); if (req.getPush()) options->setPush(true); if (req.getIfnewer()) options->setIfNewer(true); resp.setResult(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onRename(IEspContext &context, IEspRename &req, IEspRenameResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do Rename. Permission denied."); const char* srcname = req.getSrcname(); const char* dstname = req.getDstname(); if(!srcname || !*srcname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Source logical file not specified."); if(!dstname || !*dstname) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Destination logical file not specified."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); StringBuffer destTitle; ParseLogicalPath(req.getDstname(), destTitle); wu->setJobName(destTitle.str()); wu->setQueue(m_QueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_rename); #if 0 // TBD - Handling for multiple clusters? the cluster should be specified by user if needed Owned udesc; if(user.length() > 0) { const char* passwd = context.queryPassword(); udesc.setown(createUserDescriptor()); udesc->set(user.str(), passwd); Owned df = queryDistributedFileDirectory().lookup(srcname, udesc); if(df) { StringBuffer cluster0; df->getClusterName(0,cluster0); // TBD - Handling for multiple clusters? if (cluster0.length()!=0) { wu->setClusterName(cluster0.str()); } else { const char *cluster = df->queryAttributes().queryProp("@group"); if (cluster && *cluster) { wu->setClusterName(cluster); } } } } #endif IDFUfileSpec *source = wu->queryUpdateSource(); source->setLogicalName(srcname); IDFUfileSpec *destination = wu->queryUpdateDestination(); destination->setLogicalName(dstname); IDFUoptions *options = wu->queryUpdateOptions(); options->setOverwrite(req.getOverwrite()); resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDFUWUFile(IEspContext &context, IEspDFUWUFileRequest &req, IEspDFUWUFileResponse &resp) { try { if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); if (*req.getWuid()) { Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->openWorkUnit(req.getWuid(), false); if(!wu) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Dfu workunit %s not found.", req.getWuid()); StringBuffer xmlbuf; xmlbuf.append(""); const char* plainText = req.getPlainText(); if (plainText && (!stricmp(plainText, "yes"))) { wu->toXML(xmlbuf); resp.setFile(xmlbuf.str()); resp.setFile_mimetype(HTTP_TYPE_TEXT_PLAIN); } else { xmlbuf.append(""); wu->toXML(xmlbuf); resp.setFile(xmlbuf.str()); resp.setFile_mimetype(HTTP_TYPE_TEXT_XML); } } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } int CFileSprayEx::doFileCheck(const char* mask, const char* netaddr, const char* osStr, const char* path) { int iRet = 1; if (mask && *mask) { char *str = (char *) mask + strlen(mask) - 4; if (!stricmp(str, ".cfg") || !stricmp(str, ".log")) iRet = 0; } else if (netaddr && *netaddr && path && *path) { iRet = 2; Owned factory = getEnvironmentFactory(); factory->validateCache(); Owned env = factory->openEnvironmentByFile(); Owned pEnvRoot = &env->getPTree(); IPropertyTree* pEnvSoftware = pEnvRoot->queryPropTree("Software"); IPropertyTree* pRoot = createPTreeFromXMLString(""); IPropertyTree* pSoftware = pRoot->addPropTree("Software", createPTree("Software")); if (pEnvSoftware && pSoftware) { Owned it = pEnvSoftware->getElements("DropZone"); ForEach(*it) { const char* pszComputer = it->query().queryProp("@computer"); if (!strcmp(pszComputer, ".")) pszComputer = "localhost"; StringBuffer xpath, sNetAddr; xpath.appendf("Hardware/Computer[@name='%s']/@netAddress", pszComputer); const char* pszNetAddr = pEnvRoot->queryProp(xpath.str()); if (strcmp(pszNetAddr, ".")) { sNetAddr.append(pszNetAddr); } else { StringBuffer ipStr; IpAddress ipaddr = queryHostIP(); ipaddr.getIpText(ipStr); if (ipStr.length() > 0) { #ifdef MACHINE_IP sNetAddr.append(MACHINE_IP); #else sNetAddr.append(ipStr.str()); #endif } } #ifdef MACHINE_IP if ((sNetAddr.length() > 0) && !stricmp(sNetAddr.str(), MACHINE_IP)) #else if ((sNetAddr.length() > 0) && !stricmp(sNetAddr.str(), netaddr)) #endif { StringBuffer dir; IPropertyTree* pDropZone = pSoftware->addPropTree("DropZone", &it->get()); pDropZone->getProp("@directory", dir); if (osStr && *osStr) { int os = atoi(osStr); const char pathSep = (os == OS_WINDOWS) ? '\\' : '/'; dir.replace(pathSep=='\\'?'/':'\\', pathSep); } if ((dir.length() > 0) && !strnicmp(path, dir.str(), dir.length())) { iRet = 0; break; } } } } } return iRet; } bool CFileSprayEx::onFileList(IEspContext &context, IEspFileListRequest &req, IEspFileListResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do FileList. Permission denied."); const char* path = req.getPath(); if (!path || !*path) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Path not specified."); const char* netaddr = req.getNetaddr(); const char* mask = req.getMask(); bool directoryOnly = req.getDirectoryOnly(); StringBuffer sPath(path); const char* osStr = req.getOS(); if (osStr && *osStr) { int os = atoi(osStr); const char pathSep = (os == OS_WINDOWS) ? '\\' : '/'; sPath.replace(pathSep=='\\'?'/':'\\', pathSep); if (*(sPath.str() + sPath.length() -1) != pathSep) sPath.append( pathSep ); } int checkReturn = doFileCheck(mask, netaddr, osStr, sPath.str()); if (checkReturn > 1) throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone is not found in the environment settings."); else if (checkReturn > 0) throw MakeStringException(ECLWATCH_ACCESS_TO_FILE_DENIED, "Access to the file path denied."); RemoteFilename rfn; SocketEndpoint ep; #ifdef MACHINE_IP ep.set(MACHINE_IP); #else ep.set(netaddr); #endif rfn.setPath(ep, sPath.str()); Owned f = createIFile(rfn); if(!f->isDirectory()) throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", path); IArrayOf files; if (mask && !*mask) mask = NULL; Owned di = f->directoryFiles(NULL, false, true); if(di.get() != NULL) { ForEach(*di) { StringBuffer fname; di->getName(fname); if (fname.length() == 0 || (directoryOnly && !di->isDir()) || (!di->isDir() && mask && !WildMatch(fname.str(), mask, true))) continue; Owned onefile = createPhysicalFileStruct(); onefile->setName(fname.str()); onefile->setIsDir(di->isDir()); onefile->setFilesize(di->getFileSize()); CDateTime modtime; StringBuffer timestr; di->getModifiedTime(modtime); unsigned y,m,d,h,min,sec,nsec; modtime.getDate(y,m,d,true); modtime.getTime(h,min,sec,nsec,true); timestr.appendf("%04d-%02d-%02d %02d:%02d:%02d", y,m,d,h,min,sec); onefile->setModifiedtime(timestr.str()); files.append(*onefile.getLink()); } } sPath.replace('\\', '/');//XSLT cannot handle backslashes resp.setPath(sPath); resp.setFiles(files); resp.setNetaddr(netaddr); if (osStr && *osStr) { int os = atoi(osStr); resp.setOS(os); } if (mask && *mask) resp.setMask(mask); resp.setDirectoryOnly(directoryOnly); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDfuMonitor(IEspContext &context, IEspDfuMonitorRequest &req, IEspDfuMonitorResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Failed to do DfuMonitor. Permission denied."); Owned factory = getDFUWorkUnitFactory(); Owned wu = factory->createWorkUnit(); wu->setQueue(m_MonitorQueueLabel.str()); StringBuffer user, passwd; wu->setUser(context.getUserID(user).str()); wu->setPassword(context.getPassword(passwd).str()); wu->setCommand(DFUcmd_monitor); IDFUmonitor *monitor = wu->queryUpdateMonitor(); IDFUfileSpec *source = wu->queryUpdateSource(); const char *eventname = req.getEventName(); const char *lname = req.getLogicalName(); if (lname&&*lname) source->setLogicalName(lname); else { const char *ip = req.getIp(); const char *filename = req.getFilename(); if (filename&&*filename) { RemoteFilename rfn; if (ip&&*ip) { SocketEndpoint ep; ep.set(ip); rfn.setPath(ep,filename); } else rfn.setRemotePath(filename); source->setSingleFilename(rfn); } else throw MakeStringException(ECLWATCH_INVALID_INPUT, "Neither logical name nor network ip/file specified for monitor."); } if (eventname) monitor->setEventName(eventname); monitor->setShotLimit(req.getShotLimit()); monitor->setSub(req.getSub()); resp.setWuid(wu->queryId()); resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str()); submitDFUWorkUnit(wu.getClear()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onOpenSave(IEspContext &context, IEspOpenSaveRequest &req, IEspOpenSaveResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied."); const char* location = req.getLocation(); const char* path = req.getPath(); const char* name = req.getName(); const char* type = req.getType(); const char* dateTime = req.getDateTime(); if (location && *location) resp.setLocation(location); if (path && *path) resp.setPath(path); if (name && *name) resp.setName(name); if (type && *type) resp.setType(type); if (dateTime && *dateTime) resp.setDateTime(dateTime); if (req.getBinaryFile()) resp.setViewable(false); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::getDropZoneFiles(IEspContext &context, const char* netaddr, const char* osStr, const char* path, IEspDropZoneFilesRequest &req, IEspDropZoneFilesResponse &resp) { bool directoryOnly = req.getDirectoryOnly(); int checkReturn = doFileCheck(NULL, netaddr, osStr, path); if (checkReturn > 1) throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone is not found in the environment settings."); else if (checkReturn > 0) throw MakeStringException(ECLWATCH_ACCESS_TO_FILE_DENIED, "Access to the file path denied."); RemoteFilename rfn; SocketEndpoint ep; #ifdef MACHINE_IP ep.set(MACHINE_IP); #else ep.set(netaddr); #endif rfn.setPath(ep, path); Owned f = createIFile(rfn); if(!f->isDirectory()) throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", path); IArrayOf files; Owned di = f->directoryFiles(NULL, false, true); if(di.get() != NULL) { ForEach(*di) { StringBuffer fname; di->getName(fname); if (fname.length() == 0 || (directoryOnly && !di->isDir())) continue; Owned onefile = createPhysicalFileStruct(); onefile->setName(fname.str()); onefile->setIsDir(di->isDir()); onefile->setFilesize(di->getFileSize()); CDateTime modtime; StringBuffer timestr; di->getModifiedTime(modtime); unsigned y,m,d,h,min,sec,nsec; modtime.getDate(y,m,d,true); modtime.getTime(h,min,sec,nsec,true); timestr.appendf("%04d-%02d-%02d %02d:%02d:%02d", y,m,d,h,min,sec); onefile->setModifiedtime(timestr.str()); files.append(*onefile.getLink()); } } resp.setFiles(files); return true; } bool CFileSprayEx::onDropZoneFiles(IEspContext &context, IEspDropZoneFilesRequest &req, IEspDropZoneFilesResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Read, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied."); const char* netAddress = req.getNetAddress(); const char* directory = req.getPath(); const char* subfolder = req.getSubfolder(); StringBuffer netAddressStr, directoryStr, osStr; if (netAddress && *netAddress && directory && *directory) { netAddressStr.append(netAddress); directoryStr.append(directory); } IArrayOf dropZoneList; Owned factory = getEnvironmentFactory(); Owned m_constEnv = factory->openEnvironmentByFile(); Owned pEnvRoot = &m_constEnv->getPTree(); IPropertyTree* pEnvSoftware = pEnvRoot->queryPropTree("Software"); if (pEnvSoftware) { Owned it = pEnvSoftware->getElements("DropZone"); ForEach(*it) { IPropertyTree& pDropZone = it->query(); //get IP Address of the computer associated with this drop zone const char* pszName = pDropZone.queryProp("@name"); const char* pszComputer = pDropZone.queryProp("@computer"); if (!strcmp(pszComputer, ".")) pszComputer = "localhost"; StringBuffer xpath; xpath.appendf("Hardware/Computer[@name='%s']/@netAddress", pszComputer); StringBuffer sNetAddr; const char* pszNetAddr = pEnvRoot->queryProp(xpath.str()); if (strcmp(pszNetAddr, ".")) { sNetAddr.append(pszNetAddr); } else { StringBuffer ipStr; IpAddress ipaddr = queryHostIP(); ipaddr.getIpText(ipStr); if (ipStr.length() > 0) { #ifdef MACHINE_IP sNetAddr.append(MACHINE_IP); #else sNetAddr.append(ipStr.str()); #endif } } Owned machine; if (strcmp(pszNetAddr, ".")) machine.setown(m_constEnv->getMachineByAddress(sNetAddr.str())); else { machine.setown(m_constEnv->getMachineByAddress(pszNetAddr)); if (!machine) machine.setown(m_constEnv->getMachineByAddress(sNetAddr.str())); } StringBuffer dir; pDropZone.getProp("@directory", dir); Owned aDropZone= createDropZone("",""); if (machine) { if (machine->getOS() == MachineOsLinux || machine->getOS() == MachineOsSolaris) { dir.replace('\\', '/');//replace all '\\' by '/' aDropZone->setLinux("true"); osStr = "1"; } else { dir.replace('/', '\\'); dir.replace('$', ':'); osStr = "0"; } } aDropZone->setComputer(pszComputer); aDropZone->setPath(dir.str()); aDropZone->setName(pszName); aDropZone->setNetAddress(sNetAddr.str()); if (netAddressStr.length() < 1) { netAddressStr = sNetAddr; directoryStr = dir; } dropZoneList.append(*aDropZone.getClear()); } } if (dropZoneList.ordinality()) resp.setDropZones(dropZoneList); char pathSep = '/'; if (osStr && *osStr) { int os = atoi(osStr); if (os == OS_WINDOWS) pathSep = '\\'; } directoryStr.replace(pathSep=='\\'?'/':'\\', pathSep); if (subfolder && *subfolder) { if (*(directoryStr.str() + directoryStr.length() -1) != pathSep) directoryStr.append( pathSep ); directoryStr.append(subfolder); } if (*(directoryStr.str() + directoryStr.length() -1) != pathSep) directoryStr.append( pathSep ); getDropZoneFiles(context, netAddressStr.str(), osStr.str(), directoryStr.str(), req, resp); if (pathSep=='\\') directoryStr.replaceString("\\", "\\\\"); resp.setNetAddress(netAddressStr.str()); resp.setPath(directoryStr.str()); resp.setOS(atoi(osStr.str())); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CFileSprayEx::onDeleteDropZoneFiles(IEspContext &context, IEspDeleteDropZoneFilesRequest &req, IEspDFUWorkunitsActionResponse &resp) { try { if (!context.validateFeatureAccess(FILE_SPRAY_URL, SecAccess_Full, false)) throw MakeStringException(ECLWATCH_FILE_SPRAY_ACCESS_DENIED, "Permission denied."); const char* netAddress = req.getNetAddress(); const char* directory = req.getPath(); const char* osStr = req.getOS(); StringArray & files = req.getNames(); if (!netAddress || !*netAddress || !directory || !*directory) throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone not specified."); if (!files.ordinality()) throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified."); char pathSep = '/'; StringBuffer sPath(directory); if (osStr && *osStr) { int os = atoi(osStr); pathSep = (os == OS_WINDOWS) ? '\\' : '/'; sPath.replace(pathSep=='\\'?'/':'\\', pathSep); if (*(sPath.str() + sPath.length() -1) != pathSep) sPath.append( pathSep ); } int checkReturn = doFileCheck(NULL, netAddress, osStr, sPath.str()); if (checkReturn > 1) throw MakeStringException(ECLWATCH_DROP_ZONE_NOT_FOUND, "Dropzone is not found in the environment settings."); else if (checkReturn > 0) throw MakeStringException(ECLWATCH_ACCESS_TO_FILE_DENIED, "Access to the file path denied."); RemoteFilename rfn; SocketEndpoint ep; #ifdef MACHINE_IP ep.set(MACHINE_IP); #else ep.set(netAddress); #endif rfn.setPath(ep, sPath.str()); Owned f = createIFile(rfn); if(!f->isDirectory()) throw MakeStringException(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", directory); bool bAllSuccess = true; IArrayOf results; for(unsigned i = 0; i < files.ordinality(); ++i) { const char* file = files.item(i); if (!file || !*file) continue; Owned res = createDFUActionResult("", ""); res->setID(files.item(i)); res->setAction("Delete"); res->setResult("Success"); try { StringBuffer fileToDelete = sPath; if (*(fileToDelete.str() + fileToDelete.length() -1) != pathSep) fileToDelete.append( pathSep ); fileToDelete.append(file); rfn.setPath(ep, fileToDelete.str()); Owned rFile = createIFile(rfn); if (!rFile->exists()) res->setResult("Warning: this file does not exist."); else rFile->remove(); } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; eMsg = e->errorMessage(eMsg); e->Release(); StringBuffer failedMsg = "Failed: "; failedMsg.append(eMsg); res->setResult(failedMsg.str()); } results.append(*LINK(res.getClear())); } resp.setFirstColumn("File"); resp.setDFUActionResults(results); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; }