123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #pragma warning (disable : 4786)
- #include "ws_packageprocessService.hpp"
- #include "daclient.hpp"
- #include "dalienv.hpp"
- #include "dadfs.hpp"
- #include "dfuutil.hpp"
- #include "ws_fs.hpp"
- #include "ws_workunits.hpp"
- #include "packageprocess_errors.h"
- #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
- void CWsPackageProcessEx::init(IPropertyTree *cfg, const char *process, const char *service)
- {
- }
- bool CWsPackageProcessEx::onEcho(IEspContext &context, IEspEchoRequest &req, IEspEchoResponse &resp)
- {
- StringBuffer respMsg;
- ISecUser* user = context.queryUser();
- if(user != NULL)
- {
- const char* name = user->getName();
- if (name && *name)
- respMsg.appendf("%s: ", name);
- }
- const char* reqMsg = req.getRequest();
- if (reqMsg && *reqMsg)
- respMsg.append(reqMsg);
- else
- respMsg.append("??");
- resp.setResponse(respMsg.str());
- return true;
- }
- IPropertyTree *getPkgSetRegistry(const char *setName, bool readonly)
- {
- Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageSets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
- //Only lock the branch for the target we're interested in.
- StringBuffer xpath;
- xpath.append("/PackageSets/PackageSet[@id=\"").append(setName).append("\"]");
- Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), readonly ? RTM_LOCK_READ : RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
- if (!conn)
- {
- if (readonly)
- return NULL;
- Owned<IPropertyTree> querySet = createPTree();
- querySet->setProp("@id", setName);
- globalLock->queryRoot()->addPropTree("PackageSet", querySet.getClear());
- globalLock->commit();
- conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
- if (!conn)
- throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali %s", xpath.str());
- }
- return conn->getRoot();
- }
- ////////////////////////////////////////////////////////////////////////////////////////
- const unsigned roxieQueryRoxieTimeOut = 60000;
- #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
- bool isRoxieProcess(const char *process)
- {
- if (!process)
- return false;
- Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
- if (!conn)
- return false;
- VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
- return conn->queryRoot()->hasProp(xpath.str());
- }
- bool isFileKnownOnCluster(const char *logicalname, const char *lookupDaliIp, const char *process, IUserDescriptor* userdesc)
- {
- Owned<IDistributedFile> dst = queryDistributedFileDirectory().lookup(logicalname, userdesc, true);
- if (dst)
- {
- if (dst->findCluster(process) != NotFound)
- return true; // file already known for this cluster
- }
- return false;
- }
- bool addFileInfoToDali(const char *logicalname, const char *lookupDaliIp, const char *process, bool overwrite, IUserDescriptor* userdesc, StringBuffer &host, short port, StringBuffer &msg)
- {
- bool retval = true;
- try
- {
- if (!overwrite)
- {
- if (isFileKnownOnCluster(logicalname, lookupDaliIp, process, userdesc))
- return true;
- }
- StringBuffer user;
- StringBuffer password;
- if (userdesc)
- {
- userdesc->getUserName(user);
- userdesc->getPassword(password);
- }
- Owned<IClientFileSpray> fs;
- fs.setown(createFileSprayClient());
- fs->setUsernameToken(user.str(), password.str(), NULL);
- VStringBuffer url("http://%s:%d/FileSpray", host.str(), port);
- fs->addServiceUrl(url.str());
- bool isRoxie = isRoxieProcess(process);
- Owned<IClientCopy> req = fs->createCopyRequest();
- req->setSourceLogicalName(logicalname);
- req->setDestLogicalName(logicalname);
- req->setDestGroup(process);
- req->setSuperCopy(false);
- if (isRoxie)
- req->setDestGroupRoxie("Yes");
- req->setSourceDali(lookupDaliIp);
- req->setSrcusername(user);
- req->setSrcpassword(password);
- req->setOverwrite(overwrite);
- Owned<IClientCopyResponse> resp = fs->Copy(req);
- }
- catch(IException *e)
- {
- e->errorMessage(msg);
- DBGLOG("ERROR = %s", msg.str());
- e->Release(); // report the error later if needed
- retval = false;
- }
- catch(...)
- {
- retval = false;
- }
- return retval;
- }
- //////////////////////////////////////////////////////////
- void addPackageMapInfo(IPropertyTree *pkgSetRegistry, const char *setName, const char *packageSetName, IPropertyTree *packageInfo, bool active, bool overWrite)
- {
- Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
- StringBuffer lcName(packageSetName);
- lcName.toLowerCase();
- StringBuffer xpath;
- xpath.append("PackageMap[@id='").append(lcName).append("']");
- IPropertyTree *pkgRegTree = pkgSetRegistry->queryPropTree(xpath.str());
- IPropertyTree *root = globalLock->queryRoot();
- IPropertyTree *mapTree = root->queryPropTree(xpath);
- if (!overWrite && (pkgRegTree || mapTree))
- throw MakeStringException(PKG_NAME_EXISTS, "Package name %s already exists, either delete it or specify overwrite", lcName.str());
- if (mapTree)
- root->removeTree(mapTree);
- if (pkgRegTree)
- pkgSetRegistry->removeTree(pkgRegTree);
- mapTree = root->addPropTree("PackageMap", createPTree());
- mapTree->addProp("@id", packageSetName);
- IPropertyTree *baseInfo = createPTree();
- Owned<IPropertyTreeIterator> iter = packageInfo->getElements("Package");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- Owned<IPropertyTreeIterator> super_iter = item.getElements("SuperFile");
- if (super_iter->first())
- {
- ForEach(*super_iter)
- {
- IPropertyTree &supertree = super_iter->query();
- StringAttr id(supertree.queryProp("@id"));
- if (id.length() && id[0] == '~')
- supertree.setProp("@id", id+1);
- Owned<IPropertyTreeIterator> sub_iter = supertree.getElements("SubFile");
- ForEach(*sub_iter)
- {
- IPropertyTree &subtree = sub_iter->query();
- StringAttr subid = subtree.queryProp("@value");
- if (subid.length())
- {
- if (subid[0] == '~')
- subtree.setProp("@value", subid+1);
- }
- }
- mapTree->addPropTree("Package", LINK(&item));
- }
- }
- else
- {
- baseInfo->addPropTree("Package", LINK(&item));
- }
- }
- mergePTree(mapTree, baseInfo);
- globalLock->commit();
- IPropertyTree *pkgSetTree = pkgSetRegistry->addPropTree("PackageMap", createPTree("PackageMap"));
- pkgSetTree->setProp("@id", lcName);
- pkgSetTree->setProp("@querySet", setName);
- pkgSetTree->setPropBool("@active", active);
- }
- void copyPackageSubFiles(IPropertyTree *packageInfo, const char *process, const char *defaultLookupDaliIp, bool overwrite, IUserDescriptor* userdesc, StringBuffer &host, short port)
- {
- Owned<IPropertyTreeIterator> iter = packageInfo->getElements("Package");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- StringBuffer lookupDaliIp;
- lookupDaliIp.append(item.queryProp("@daliip"));
- if (lookupDaliIp.length() == 0)
- lookupDaliIp.append(defaultLookupDaliIp);
- if (lookupDaliIp.length() == 0)
- {
- StringAttr superfile(item.queryProp("@id"));
- throw MakeStringException(PKG_MISSING_DALI_LOOKUP_IP, "Could not lookup SubFiles in package %s because no remote dali ip was specified", superfile.get());
- }
- Owned<IPropertyTreeIterator> super_iter = item.getElements("SuperFile");
- ForEach(*super_iter)
- {
- IPropertyTree &supertree = super_iter->query();
- Owned<IPropertyTreeIterator> sub_iter = supertree.getElements("SubFile");
- ForEach(*sub_iter)
- {
- IPropertyTree &subtree = sub_iter->query();
- StringAttr subid = subtree.queryProp("@value");
- if (subid.length())
- {
- StringBuffer msg;
- addFileInfoToDali(subid.get(), lookupDaliIp, process, overwrite, userdesc, host, port, msg);
- }
- }
- }
- }
- }
- void getPackageListInfo(IPropertyTree *mapTree, IEspPackageListMapData *pkgList)
- {
- pkgList->setId(mapTree->queryProp("@id"));
- Owned<IPropertyTreeIterator> iter = mapTree->getElements("Package");
- IArrayOf<IConstPackageListData> results;
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- Owned<IEspPackageListData> res = createPackageListData("", "");
- res->setId(item.queryProp("@id"));
- if (item.hasProp("@queries"))
- res->setQueries(item.queryProp("@queries"));
- results.append(*res.getClear());
- }
- pkgList->setPkgListData(results);
- }
- void getAllPackageListInfo(IPropertyTree *mapTree, StringBuffer &info)
- {
- info.append("<PackageMap id='").append(mapTree->queryProp("@id")).append("'");
- Owned<IPropertyTreeIterator> iter = mapTree->getElements("Package");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- info.append("<Package id='").append(item.queryProp("@id")).append("'");
- if (item.hasProp("@queries"))
- info.append(" queries='").append(item.queryProp("@queries")).append("'");
- info.append("></Package>");
- }
- info.append("</PackageMap>");
- }
- void listPkgInfo(const char *cluster, IArrayOf<IConstPackageListMapData>* results)
- {
- StringBuffer info;
- Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
- if (!globalLock)
- throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali /PackageMaps");
- IPropertyTree *root = globalLock->queryRoot();
- if (!cluster || !*cluster)
- {
- info.append("<PackageMaps>");
- Owned<IPropertyTreeIterator> iter = root->getElements("PackageMap");
- ForEach(*iter)
- {
- Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
- IPropertyTree &item = iter->query();
- getPackageListInfo(&item, res);
- results->append(*res.getClear());
- }
- info.append("</PackageMaps>");
- }
- else
- {
- Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(cluster, true);
- if (!pkgSetRegistry)
- throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for cluster %s", cluster);
- Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements("PackageMap");
- info.append("<PackageMaps>");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- const char *id = item.queryProp("@id");
- if (id)
- {
- StringBuffer xpath;
- xpath.append("PackageMap[@id='").append(id).append("']");
- IPropertyTree *mapTree = root->queryPropTree(xpath);
- Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
- getPackageListInfo(mapTree, res);
- results->append(*res.getClear());
- }
- }
- info.append("</PackageMaps>");
- }
- }
- void getPkgInfo(const char *cluster, const char *package, StringBuffer &info)
- {
- Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
- if (!globalLock)
- throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali /PackageMaps");
- IPropertyTree *root = globalLock->queryRoot();
- Owned<IPropertyTree> tree = createPTree("PackageMaps");
- if (cluster)
- {
- Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(cluster, true);
- Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements("PackageMap[@active='1']");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- const char *id = item.queryProp("@id");
- if (id)
- {
- StringBuffer xpath;
- xpath.append("PackageMap[@id='").append(id).append("']");
- IPropertyTree *mapTree = root->queryPropTree(xpath);
- if (mapTree)
- mergePTree(tree, mapTree);
- }
- }
- }
- else
- {
- StringBuffer xpath;
- xpath.append("PackageMap[@id='").append(package).append("']");
- Owned<IPropertyTreeIterator> iter = root->getElements(xpath.str());
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- mergePTree(tree, &item);
- }
- }
- toXML(tree, info);
- }
- bool deletePkgInfo(const char *packageSetName, const char *queryset)
- {
- Owned<IRemoteConnection> pkgSet = querySDS().connect("/PackageSets/", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
- if (!pkgSet)
- throw MakeStringException(PKG_SET_NOT_DEFINED, "No package sets defined");
- IPropertyTree* packageSets = pkgSet->queryRoot();
- VStringBuffer pkgSet_xpath("PackageSet[@id='%s']", queryset);
- IPropertyTree *pkgSetRegistry = packageSets->queryPropTree(pkgSet_xpath.str());
- if (!pkgSetRegistry)
- throw MakeStringException(PKG_SET_NOT_DEFINED, "No package sets defined for %s", queryset);
- StringBuffer lcName(packageSetName);
- lcName.toLowerCase();
- VStringBuffer xpath("PackageMap[@id='%s'][@querySet='%s']", lcName.str(), queryset);
- IPropertyTree *pm = pkgSetRegistry->getPropTree(xpath.str());
- if (pm)
- pkgSetRegistry->removeTree(pm);
- else
- throw MakeStringException(PKG_DELETE_NOT_FOUND, "Unable to delete %s - information not found", lcName.str());
- VStringBuffer ps_xpath("PackageSet/PackageMap[@id='%s']", lcName.str());
- if (!packageSets->hasProp(ps_xpath))
- {
- Owned<IRemoteConnection> pkgMap = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
- if (pkgMap)
- {
- VStringBuffer map_xpath("PackageMap[@id='%s']", lcName.str());
- IPropertyTree *pkgMaproot = pkgMap->queryRoot();
- IPropertyTree *pm = pkgMaproot->getPropTree(map_xpath.str());
- if (pm)
- pkgMaproot->removeTree(pm);
- }
- }
- return true;
- }
- void activatePackageMapInfo(const char *packageSetName, const char *packageMap, bool activate)
- {
- if (!packageSetName || !*packageSetName)
- throw MakeStringException(PKG_SET_NOT_DEFINED, "No package sets defined");
- Owned<IRemoteConnection> globalLock = querySDS().connect("PackageSets", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
- if (!globalLock)
- throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve PackageSets information from dali /PackageSets");
- StringBuffer lcName(packageSetName);
- lcName.toLowerCase();
- VStringBuffer xpath("PackageSet[@id=\"%s\"]", lcName.str());
- IPropertyTree *root = globalLock->queryRoot();
- if (!root)
- throw MakeStringException(PKG_ACTIVATE_NOT_FOUND, "Unable to retrieve PackageSet information for %s", lcName.str());
- IPropertyTree *pkgSetTree = root->queryPropTree(xpath);
- if (pkgSetTree)
- {
- if (packageMap && *packageMap)
- {
- StringBuffer lcMapName(packageMap);
- lcMapName.toLowerCase();
- VStringBuffer xpath_map("PackageMap[@id=\"%s\"]", lcMapName.str());
- IPropertyTree *mapTree = pkgSetTree->queryPropTree(xpath_map);
- mapTree->setPropBool("@active", activate);
- }
- else
- {
- Owned<IPropertyTreeIterator> iter = pkgSetTree->getElements("PackageMap");
- ForEach(*iter)
- {
- IPropertyTree &item = iter->query();
- item.setPropBool("@active", activate);
- }
- }
- }
- }
- bool CWsPackageProcessEx::onAddPackage(IEspContext &context, IEspAddPackageRequest &req, IEspAddPackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringBuffer info(req.getInfo());
- bool activate = req.getActivate();
- bool overWrite = req.getOverWrite();
- StringAttr querySet(req.getQuerySet());
- StringAttr pkgName(req.getPackageName());
- Owned<IPropertyTree> packageTree = createPTreeFromXMLString(info.str());
- Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(querySet.get(), false);
- addPackageMapInfo(pkgSetRegistry, querySet.get(), pkgName.get(), LINK(packageTree), activate, overWrite);
- StringBuffer msg;
- msg.append("Successfully loaded ").append(pkgName.get());
- resp.updateStatus().setDescription(msg.str());
- return true;
- }
- bool CWsPackageProcessEx::onDeletePackage(IEspContext &context, IEspDeletePackageRequest &req, IEspDeletePackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringAttr pkgName(req.getPackageName());
- bool ret = deletePkgInfo(pkgName.get(), req.getQuerySet());
- StringBuffer msg;
- (ret) ? msg.append("Successfully ") : msg.append("Unsuccessfully ");
- msg.append("deleted").append(pkgName.get());
- resp.updateStatus().setDescription(msg.str());
- return true;
- }
- bool CWsPackageProcessEx::onActivatePackage(IEspContext &context, IEspActivatePackageRequest &req, IEspActivatePackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringBuffer pkgName(req.getPackageName());
- StringBuffer pkgMapName(req.getPackageMapName());
- activatePackageMapInfo(pkgName.str(), pkgMapName.str(), true);
- return true;
- }
- bool CWsPackageProcessEx::onDeActivatePackage(IEspContext &context, IEspDeActivatePackageRequest &req, IEspDeActivatePackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringBuffer pkgName(req.getPackageName());
- StringBuffer pkgMapName(req.getPackageMapName());
- activatePackageMapInfo(pkgName.str(), pkgMapName.str(), false);
- return true;
- }
- bool CWsPackageProcessEx::onListPackage(IEspContext &context, IEspListPackageRequest &req, IEspListPackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- IArrayOf<IConstPackageListMapData> results;
- listPkgInfo(req.getCluster(), &results);
- resp.setPkgListMapData(results);
- return true;
- }
- bool CWsPackageProcessEx::onGetPackage(IEspContext &context, IEspGetPackageRequest &req, IEspGetPackageResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringAttr cluster(req.getCluster());
- StringAttr pkgName(req.getPackageName());
- StringBuffer info;
- getPkgInfo(cluster.length() ? cluster.get() : NULL, pkgName.length() ? pkgName.get() : NULL, info);
- resp.setInfo(info);
- return true;
- }
- bool CWsPackageProcessEx::onCopyFiles(IEspContext &context, IEspCopyFilesRequest &req, IEspCopyFilesResponse &resp)
- {
- resp.updateStatus().setCode(0);
- StringBuffer info(req.getInfo());
- StringAttr process(req.getProcess());
- StringAttr pkgName(req.getPackageName());
- StringAttr lookupDaliIp(req.getDaliIp());
- if (process.length() == 0)
- throw MakeStringException(PKG_MISSING_PARAM, "CWsPackageProcessEx::onCopyFiles process parameter not set.");
- Owned<IUserDescriptor> userdesc;
- const char *user = context.queryUserId();
- const char *password = context.queryPassword();
- if (user && *user && *password && *password)
- {
- userdesc.setown(createUserDescriptor());
- userdesc->set(user, password);
- }
- StringBuffer host;
- short port;
- context.getServAddress(host, port);
- Owned<IPropertyTree> packageTree = createPTreeFromXMLString(info.str());
- copyPackageSubFiles(LINK(packageTree), process, lookupDaliIp, req.getOverWrite(), userdesc, host, port);
- StringBuffer msg;
- msg.append("Successfully loaded ").append(pkgName.get());
- resp.updateStatus().setDescription(msg.str());
- return true;
- }
|