1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define da_decl DECL_EXPORT
- #include "platform.h"
- #include <typeinfo>
- #include "jlib.hpp"
- #include "jfile.hpp"
- #include "javahash.hpp"
- #include "javahash.tpp"
- #include "jptree.ipp"
- #include "mpbuff.hpp"
- #include "mpcomm.hpp"
- #include "mputil.hpp"
- #include "dacoven.hpp"
- #include "daserver.hpp"
- #include "dasess.hpp"
- #include "daclient.hpp"
- #include "dadfs.hpp"
- #include "dautils.hpp"
- #include "dasds.ipp" // common header for client/server sds
- #include "dacsds.ipp"
- static unsigned clientThrottleLimit;
- static unsigned clientThrottleDelay;
- static ISDSManager *SDSManager=NULL;
- static CriticalSection SDScrit;
- #define CHECK_CONNECTED(XSTR) \
- if (!connected) \
- { \
- LOG(MCerror, unknownJob, XSTR": Closed connection (xpath=%s, sessionId=%" I64F "d)", xpath.get(), sessionId); \
- return; \
- }
- ////////////////////
- class MonitoredChildMap : public ChildMap
- {
- CClientRemoteTree &owner;
- public:
- MonitoredChildMap(CClientRemoteTree &_owner) : ChildMap(), owner(_owner) { }
-
- virtual bool replace(const char *name, IPropertyTree *tree)
- {
- // suppress notification of old node - old node has been preserved.
- bool changes = owner.queryConnection().queryStateChanges();
- owner.queryConnection().setStateChanges(false);
- bool res = ChildMap::replace(name, tree);
- owner.queryConnection().setStateChanges(changes);
- return res;
- }
- virtual void onRemove(void *e)
- {
- if (owner.queryStateChanges())
- {
- CClientRemoteTree *child = (CClientRemoteTree *)((IPropertyTree *)e);
- assertex(child);
- __int64 sId = child->queryServerId();
- if (sId)
- owner.registerDeleted(child->queryName(), 0, sId);
- else
- {
- IPTArrayValue *value = child->queryValue();
- if (value)
- {
- if (value->isArray())
- {
- unsigned i = value->elements();
- while (i--)
- {
- CClientRemoteTree &child = *(CClientRemoteTree *)value->queryElement(i);
- sId = child.queryServerId();
- if (sId)
- owner.registerDeleted(child.queryName(), i, sId);
- }
- }
- }
- }
- }
- ChildMap::onRemove(e);
- }
- };
- bool collectChildless(CClientRemoteTree *parent, const char *xpath, CRTArray *childLessList, StringArray *headArr, StringArray *tailArr)
- {
- bool res = false;
- StringBuffer head;
- const char *tail;
- if (xpath && '/' == *xpath && '/' == *(xpath+1))
- {
- head.append("*");
- tail = xpath;
- }
- else
- {
- tail = xpath?queryHead(xpath, head):NULL;
- if (!tail && xpath)
- head.append(xpath);
- }
- if (parent->queryChildren())
- {
- if (head.length())
- {
- Owned<IPropertyTreeIterator> iter = parent->getElements(head.str());
- ForEach (*iter)
- {
- res |= collectChildless((CClientRemoteTree *)&iter->query(), tail, childLessList, headArr, tailArr);
- if (NULL == childLessList && res)
- break;
- }
- }
- else
- {
- Owned<IPropertyTreeIterator> iter = parent->getElements("*");
- ForEach (*iter)
- {
- res |= collectChildless((CClientRemoteTree *)&iter->query(), NULL, childLessList, headArr, tailArr);
- if (NULL == childLessList && res)
- break;
- }
- }
- }
- else if (parent->hasChildren()) // i.e. no local children, but server flagged as having
- {
- if (childLessList)
- {
- childLessList->append(*parent);
- if (headArr)
- headArr->append(head.length()?head.str():"");
- if (tailArr)
- tailArr->append(tail?tail:"");
- }
- res = true;
- }
- return res;
- }
- ////////////////////
- CRemoteConnection::CRemoteConnection(ISDSConnectionManager &manager, ConnectionId connectionId, const char *xpath, SessionId sessionId, unsigned mode, unsigned timeout)
- : CConnectionBase(manager, connectionId, xpath, sessionId, mode, timeout)
- {
- INIT_NAMEDCOUNT;
- lazyFetch = true;
- stateChanges = true;
- connected = true;
- serverIterAvailable = querySDS().queryProperties().getPropBool("Client/@serverIterAvailable");
- serverIter = querySDS().queryProperties().getPropBool("Client/@serverIter");
- useAppendOpt = querySDS().queryProperties().getPropBool("Client/@useAppendOpt");
- serverGetIdsAvailable = querySDS().queryProperties().getPropBool("Client/@serverGetIdsAvailable");
- lockCount = 0;
- }
- void CRemoteConnection::clearCommitChanges()
- {
- CClientRemoteTree *tree = (CClientRemoteTree *) queryRoot();
- bool lazyFetch = setLazyFetch(false);
- tree->clearCommitChanges();
- setLazyFetch(lazyFetch);
- }
- void CRemoteConnection::getDetails(MemoryBuffer &mb)
- {
- mb.append(connectionId);
- mb.append(sessionId);
- mb.append(mode);
- mb.append(timeout);
- }
- // IRemoteConnection impl.
- void CRemoteConnection::changeMode(unsigned mode, unsigned timeout, bool suppressReloads)
- {
- CHECK_CONNECTED("changeMode");
- manager.changeMode(*this, mode, timeout, suppressReloads);
- }
- void CRemoteConnection::rollback()
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("rollback");
- CDisableFetchChangeBlock block(*this);
- if (((CClientRemoteTree *)root.get())->queryState())
- reload(); // all
- else
- {
- class Cop : implements IIteratorOperator
- {
- public:
- virtual bool applyTop(IPropertyTree &_tree) { return true; }
- virtual bool applyChild(IPropertyTree &parent, IPropertyTree &_child, bool &levelBreak)
- {
- CClientRemoteTree &child = (CClientRemoteTree &)_child;
- if (child.queryState())
- {
- ((CClientRemoteTree &)parent).clearChildren(); // wipe children - SDS will lazy fetch them again as needed.
- levelBreak = true;
- return false;
- }
- return true;
- }
- } op;
- CIterationOperation iop(op);
- iop.iterate(*root);
- }
- }
- void CRemoteConnection::_rollbackChildren(IPropertyTree *_parent, bool force)
- {
- if (force)
- {
- CRemoteTreeBase *parent = QUERYINTERFACE(_parent, CRemoteTreeBase);
- if (parent)
- parent->clearChildren();
- }
- else
- {
- class CRollback
- {
- public:
- void apply(IPropertyTree &_parent)
- {
- CClientRemoteTree *parent = QUERYINTERFACE(&_parent, CClientRemoteTree);
- if (!parent)
- return;
- if (parent->queryState())
- parent->clearChildren();
- else
- {
- Owned<IPropertyTreeIterator> iter = parent->getElements("*");
- doit(*iter);
- }
- }
- void doit(IPropertyTreeIterator &iter)
- {
- ForEach (iter)
- apply(iter.query());
- }
- } op;
- op.apply(*_parent);
- }
- }
- void CRemoteConnection::rollbackChildren(IPropertyTree *parent, bool force)
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("rollbackChildren");
- CDisableFetchChangeBlock block(*this);
- _rollbackChildren(parent, force);
- }
- void CRemoteConnection::rollbackChildren(const char *_xpath, bool force)
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("rollbackChildren");
- CDisableFetchChangeBlock block(*this);
- Owned<IPropertyTreeIterator> iter = root->getElements(_xpath);
- ForEach (*iter)
- _rollbackChildren(&iter->query(), force);
- }
- void CRemoteConnection::reload(const char *_xpath)
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("close");
- CDisableFetchChangeBlock block(*this);
- // NB: any linked client trees will still be active.
- if (_xpath == NULL || '\0' == *_xpath)
- {
- clearChanges(*root);
- __int64 serverId = root->queryServerId();
- CRemoteTreeBase *newTree = manager.get(*this, serverId);
- if (NULL == newTree) throw MakeSDSException(SDSExcpt_Reload);
- root.setown(newTree);
- }
- else
- {
- ICopyArrayOf<CClientRemoteTree> parents;
- IArrayOf<CClientRemoteTree> children;
- if ('/' == *_xpath) ++_xpath;
- StringBuffer head;
- const char *tail = splitXPath(_xpath, head);
- Owned<IPropertyTreeIterator> iter = root->getElements(head.str());
- ForEach (*iter)
- {
- CClientRemoteTree &parent = (CClientRemoteTree &)iter->query();
- Owned<IPropertyTreeIterator> childIter = parent.getElements(tail);
- ForEach (*childIter)
- {
- parents.append(parent);
- children.append(*LINK((CClientRemoteTree *)&childIter->query()));
- }
- ForEachItemIn(c, children)
- {
- CClientRemoteTree &child = children.item(c);
- clearChanges(child);
- parent.removeTree(&child);
- }
- }
- ForEachItemIn(e, children)
- {
- CClientRemoteTree &child = (CClientRemoteTree &)children.item(e);
- CClientRemoteTree &parent = (CClientRemoteTree &)parents.item(e);
- if (child.queryServerId())
- {
- IPropertyTree *newChild = manager.get(*this, child.queryServerId());
- if (newChild)
- parent.addPropTree(child.queryName(), newChild);
- }
- }
- }
- }
- void CRemoteConnection::commit()
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("commit");
- manager.commit(*this, NULL);
- }
- void CRemoteConnection::close(bool deleteRoot)
- {
- CConnectionLock b(*this);
- CHECK_CONNECTED("close");
- manager.commit(*this, &deleteRoot);
- connected=false;
- }
- SubscriptionId CRemoteConnection::subscribe(ISDSConnectionSubscription ¬ify)
- {
- CSDSConnectionSubscriberProxy *subscriber = new CSDSConnectionSubscriberProxy(notify, connectionId);
- querySubscriptionManager(SDSCONN_PUBLISHER)->add(subscriber, subscriber->getId());
- return subscriber->getId();
- }
- void CRemoteConnection::unsubscribe(SubscriptionId id)
- {
- querySubscriptionManager(SDSCONN_PUBLISHER)->remove(id);
- }
- class CClientXPathIterator : public CXPathIterator
- {
- CRemoteConnection &connection;
- public:
- CClientXPathIterator(CRemoteConnection &_connection, IPropertyTree *root, IPropertyTree *matchTree, IPTIteratorCodes flags) : CXPathIterator(root, matchTree, flags), connection(_connection)
- {
- connection.Link();
- }
- ~CClientXPathIterator()
- {
- connection.Release();
- }
- virtual IPropertyTree *queryChild(IPropertyTree *parent, const char *path)
- {
- // NB: this is going to fetch into local cache what is necessary to satisfy prop[X]
- CSetServerIterBlock b(connection, false);
- return parent->queryPropTree(path);
- }
- };
- void mergeXPathPTree(IPropertyTree *target, IPropertyTree *toMerge)
- {
- Owned<IPropertyTreeIterator> iter = toMerge->getElements("*");
- ForEach (*iter)
- {
- IPropertyTree &e = iter->query();
- StringBuffer path(e.queryName());
- IPropertyTree *existing = target->queryPropTree(path.append("[@pos=\"").append(e.queryProp("@pos")).append("\"]").str());
- if (existing)
- mergeXPathPTree(existing, &e);
- else
- target->addPropTree(e.queryName(), LINK(&e));
- }
- }
- bool removeLocals(CRemoteConnection &connection, CRemoteTreeBase *tree, IPropertyTree *match)
- {
- if (0 == tree->queryServerId())
- return false;
- Owned<IPropertyTreeIterator> iter = match->getElements("*");
- bool res = false;
- StringArray toDelete;
- ForEach (*iter)
- {
- IPropertyTree &child = iter->query();
- StringBuffer childPath(child.queryName());
- CRemoteTreeBase *storeChild;
- {
- CSetServerIterBlock b(connection, false);
- CDisableLazyFetchBlock b2(connection);
- storeChild = (CRemoteTreeBase *)tree->queryPropTree(childPath.append('[').append(child.queryProp("@pos")).append(']').str());
- }
- if (storeChild)
- {
- if (0 != storeChild->queryServerId())
- {
- bool childRes = removeLocals(connection, storeChild, &child);
- if (childRes)
- {
- unsigned c = 0;
- Owned<IPropertyTreeIterator> iter = child.getElements("*");
- ForEach (*iter) c++;
- if (0 == c)
- {
- toDelete.append(childPath.str());
- res = true;
- }
- }
- }
- else
- {
- toDelete.append(childPath.str());
- res = true;
- }
- }
- }
- ForEachItemIn(d, toDelete)
- match->removeProp(toDelete.item(d));
- return res;
- }
- void extractServerIds(IPropertyTree &tree, MemoryBuffer &mb, bool completeTailBranch)
- {
- __int64 serverId = tree.getPropInt64("@serverId");
- assertex(serverId);
- mb.append(serverId);
- Owned<IPropertyTreeIterator> iter = tree.getElements("*");
- if (iter->first())
- {
- mb.append((unsigned) 1);
- do
- {
- extractServerIds(iter->query(), mb, completeTailBranch);
- }
- while (iter->next());
- }
- else
- mb.append(completeTailBranch ? (unsigned)0 : (unsigned)1);
- }
- static void walkAndFill(IPropertyTree &tree, CClientRemoteTree &parent, MemoryBuffer &mb, bool childrenCanBeMissing)
- {
- parent.createChildMap();
- bool r;
- if (childrenCanBeMissing)
- mb.read(r);
- else
- r = true;
- if (r)
- parent.deserializeChildrenRT(mb);
- Owned<IPropertyTreeIterator> iter = tree.getElements("*");
- ForEach (*iter)
- {
- IPropertyTree &elem = iter->query();
- StringBuffer path(elem.queryName());
- path.append("[").append(elem.queryProp("@pos")).append("]");
- CClientRemoteTree *child = (CClientRemoteTree *)parent.queryPropTree(path.str());
- assertex(child);
- walkAndFill(elem, *child, mb, childrenCanBeMissing);
- }
- }
- IPropertyTreeIterator *CRemoteConnection::doGetElements(CClientRemoteTree *tree, const char *xpath, IPTIteratorCodes flags)
- {
- CConnectionLock b(*this);
- CSetServerIterBlock b2(*this, false);
- Owned<IPropertyTree> matchTree, serverMatchTree;
- StringAttr path;
- if (xpath)
- {
- unsigned l = strlen(xpath);
- if ('/' == *(xpath+l-1))
- path.set(xpath, l-1);
- else
- path.set(xpath);
- }
- bool remoteGet = queryServerGetIdsAvailable() && iptiter_remoteget == (flags & iptiter_remoteget);
- {
- CDisableLazyFetchBlock b(*this);
- if (collectChildless(tree, path.get(), NULL, NULL, NULL))
- {
- serverMatchTree.setown(queryManager().getXPaths(tree->queryServerId(), xpath, remoteGet));
- if (serverMatchTree && removeLocals(*this, tree, serverMatchTree))
- serverMatchTree.clear();
- }
- // if all nodes had server-side children, then there'd be no point in this
- matchTree.setown(getXPathMatchTree(*tree, xpath));
- }
- if (matchTree && serverMatchTree)
- mergeXPathPTree(matchTree, serverMatchTree);
- else if (serverMatchTree)
- matchTree.setown(LINK(serverMatchTree));
- else if (!matchTree)
- return createNullPTreeIterator();
- if (remoteGet && serverMatchTree)
- queryManager().ensureLocal(*this, *tree, serverMatchTree, flags);
- return new CClientXPathIterator(*this, tree, matchTree, flags & ~iptiter_remote);
- }
- IPropertyTreeIterator *CRemoteConnection::getElements(const char *xpath, IPTIteratorCodes flags)
- {
- if (!serverIterAvailable)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Server-side getElements not supported by server versions prior to " SDS_SVER_MIN_GETXPATHS_CONNECT);
- flags |= iptiter_remote;
- return root->getElements(xpath, flags);
- }
- /////////////////
- CClientRemoteTree::CClientRemoteTree(CRemoteConnection &conn, CPState _state)
- : CRemoteTreeBase(NULL, NULL, NULL), connection(conn), serverTreeInfo(0), state(_state)
- {
- INIT_NAMEDCOUNT;
- assertex(!isnocase());
- }
- CClientRemoteTree::CClientRemoteTree(const char *name, IPTArrayValue *value, ChildMap *children, CRemoteConnection &conn, CPState _state)
- : CRemoteTreeBase(name, value, children), connection(conn), serverTreeInfo(0), state(_state)
- {
- INIT_NAMEDCOUNT;
- assertex(!isnocase());
- }
- void CClientRemoteTree::beforeDispose()
- {
- if (queryStateChanges())
- connection.clearChanges(*this);
- }
- void CClientRemoteTree::Link() const
- {
- connection.Link(); // inc ref count on connection
- CRemoteTreeBase::Link();
- }
- bool CClientRemoteTree::Release() const
- {
- //Note: getLinkCount() is not thread safe.
- if (1 < getLinkCount()) //NH -> JCS - you sure this is best way to do this?
- {
- bool res = CRemoteTreeBase::Release();
- connection.Release(); // if this tree is not being destroyed then decrement usage count on connection
- return res;
- }
- else
- return CRemoteTreeBase::Release();
- }
- void CClientRemoteTree::deserializeSelfRT(MemoryBuffer &mb)
- {
- CRemoteTreeBase::deserializeSelfRT(mb);
- mb.read(serverTreeInfo);
- }
- void CClientRemoteTree::deserializeChildrenRT(MemoryBuffer &src)
- {
- // if and only if there are children, must create monitored map here otherwise a non-monitored map could be create in base
- if (!children)
- {
- StringAttr eName;
- size32_t pos = src.getPos();
- src.read(eName);
- if (eName.length())
- createChildMap();
- src.reset(pos);
- }
- CRemoteTreeBase::deserializeChildrenRT(src);
- }
- bool CClientRemoteTree::renameTree(IPropertyTree *child, const char *newName)
- {
- class DisableStateChanges // supress reset that would result from ownPTree via addPropTree below
- {
- bool changes;
- CRemoteConnection &c;
- public:
- DisableStateChanges(CRemoteConnection &_c) : c(_c) { changes = c.queryStateChanges(); c.setStateChanges(false); }
- ~DisableStateChanges() { reset(); }
- void reset() { c.setStateChanges(changes); }
- } dc(connection);
-
- Linked<IPropertyTree> tmp = child;
- StringAttr oldName = child->queryName();
- if (removeTree(child))
- {
- addPropTree(newName, child);
- tmp.getClear(); // addPropTree has taken ownership.
- dc.reset();
- __int64 id = ((CClientRemoteTree *)child)->queryServerId();
- if (id)
- {
- unsigned pos = findChild(child);
- registerRenamed(((CClientRemoteTree *)child)->queryName(), oldName, pos+1, id); // flag new element as changed.
- }
- return true;
- }
- return false;
- }
- IPropertyTree *CClientRemoteTree::queryBranch(const char *xpath) const
- {
- return const_cast<CClientRemoteTree *>(this)->_queryBranch(xpath);
- }
- IPropertyTree *CClientRemoteTree::_queryBranch(const char *xpath)
- {
- if (queryLazyFetch())
- {
- CRTArray childLessList;
- StringArray headArr, tailArr;
- StringAttr path;
- if (xpath)
- {
- unsigned l = strlen(xpath);
- if ('/' == *(xpath+l-1))
- path.set(xpath, l-1);
- else
- path.set(xpath);
- }
-
- CConnectionLock b(connection);
- bool r;
- { CDisableLazyFetchBlock b2(connection);
- r = collectChildless(this, xpath, &childLessList, &headArr, &tailArr);
- }
- if (r)
- {
- bool getAll = true;
- ForEachItemIn(c, childLessList)
- {
- IPropertyTree &child = childLessList.item(c);
- const char *tail = tailArr.item(c);
- if (!*tail) tail = NULL;
- const char *head = headArr.item(c);
- if (!*head) head = NULL;
- if (head || tail)
- {
- getAll = false;
- break;
- }
- }
- if (getAll)
- queryManager().getChildrenFor(childLessList, connection, 0);
- else
- {
- bool useServerIter = connection.queryServerGetIdsAvailable();
- // bug in matching tree creation caused failure if path pointed to self
- if (useServerIter && queryDaliServerVersion().compare("3.7") < 0)
- {
- CDisableLazyFetchBlock b2(connection);
- const IPropertyTree *me = queryPropTree(xpath);
- if (me && me==this)
- useServerIter = false;
- }
- if (useServerIter)
- {
- CDisableLazyFetchBlock b2(connection);
- Owned<IPropertyTree> serverMatchTree = queryManager().getXPaths(queryServerId(), xpath, true);
- if (serverMatchTree && !removeLocals(connection, this, serverMatchTree))
- queryManager().ensureLocal(connection, *this, serverMatchTree);
- return queryPropTree(xpath); // intentionally inside disabled lazy fetching block, as now all local
- }
- else
- {
- queryManager().getChildrenFor(childLessList, connection, 1); // get 1 level of all parents without children that matched xpath
- ForEachItemIn(c, childLessList)
- {
- IPropertyTree &child = childLessList.item(c);
- const char *tail = tailArr.item(c);
- const char *head = headArr.item(c);
- if (!tail) // no children _below_ the tail or no match for tail portion of xpath
- {
- // get all nodes below
- CRTArray list;
- Owned<IPropertyTreeIterator> iter = child.getElements(head);
- ForEach (*iter)
- {
- IPropertyTree &c = iter->query();
- list.append((CRemoteTreeBase &)iter->query());
- }
- queryManager().getChildrenFor(list, connection, 0);
- }
- else
- {
- // couldn't fully match match against partial cache, request more missing children
- Owned<IPropertyTreeIterator> iter = child.getElements(head);
- ForEach (*iter)
- {
- IPropertyTree &e = iter->query();
- e.queryBranch(tail);
- }
- }
- }
- }
- }
- }
- }
-
- return queryPropTree(xpath);
- }
- ChildMap *CClientRemoteTree::checkChildren() const
- {
- return const_cast<CClientRemoteTree *>(this)->_checkChildren();
- }
- ChildMap *CClientRemoteTree::_checkChildren()
- {
- CConnectionLock b(connection);
- if (!children && STI_HaveChildren & serverTreeInfo)
- {
- if (queryLazyFetch())
- {
- serverTreeInfo &= ~STI_HaveChildren;
- createChildMap();
- if (serverId)
- queryManager().getChildren(*this, connection);
- }
- }
- return children;
- }
- IPropertyTree *CClientRemoteTree::ownPTree(IPropertyTree *tree)
- {
- // if taking ownership of an orphaned clientremote tree need to reset it's attributes.
- if ((connection.queryStateChanges()) && isEquivalent(tree) && (!QUERYINTERFACE(tree, CClientRemoteTree)->IsShared()))
- {
- CClientRemoteTree *_tree = QUERYINTERFACE(tree, CClientRemoteTree);
- if (_tree->queryServerId())
- ((CClientRemoteTree *)tree)->resetState(CPS_Changed, true);
- return tree;
- }
- else
- return PARENT::ownPTree(tree);
- }
- IPropertyTree *CClientRemoteTree::create(const char *name, IPTArrayValue *value, ChildMap *children, bool existing)
- {
- CClientRemoteTree *newTree = new CClientRemoteTree(name, value, children, connection);
- if (existing)
- {
- newTree->setServerId(queryServerId());
- setServerId(0);
- }
- return newTree;
- }
- IPropertyTree *CClientRemoteTree::create(MemoryBuffer &mb)
- {
- unsigned pos = mb.getPos();
- StringAttr name;
- mb.read(name);
- mb.reset(pos);
- CClientRemoteTree *tree = new CClientRemoteTree(connection);
- tree->deserializeSelfRT(mb);
- return tree;
- }
- void CClientRemoteTree::createChildMap()
- {
- children = new MonitoredChildMap(*this);
- }
- ChangeInfo *CClientRemoteTree::queryChanges()
- {
- return connection.queryChangeInfo(*this);
- }
- void CClientRemoteTree::setLocal(size32_t size, const void *data, bool _binary)
- {
- clearState(CPS_PropAppend);
- mergeState(CPS_Changed);
- PARENT::setLocal(size, data, _binary);
- }
- void CClientRemoteTree::appendLocal(size32_t size, const void *data, bool binary)
- {
- if (0 == size) return;
- if (0 != serverId)
- {
- if (0 != (CPS_PropAppend & state))
- {
- PARENT::appendLocal(size, data, binary);
- return;
- }
- else if (0 == (CPS_Changed & state))
- {
- if (value)
- {
- size32_t sz = value->queryValueSize();
- if (!binary && sz) --sz;
- if (sz)
- {
- mergeState(CPS_PropAppend);
- registerPropAppend(sz);
- PARENT::appendLocal(size, data, binary);
- return;
- }
- }
- else
- {
- if (STI_External & serverTreeInfo) // if it has, change will only be fetched on a get call
- {
- mergeState(CPS_PropAppend);
- registerPropAppend(0); // whole value on commit to be sent for external append.
- PARENT::appendLocal(size, data, binary);
- return;
- }
- }
- }
- }
- mergeState(CPS_Changed);
- PARENT::appendLocal(size, data, binary);
- }
- void CClientRemoteTree::addingNewElement(IPropertyTree &child, int pos)
- {
- ((CClientRemoteTree &)child).setState(CPS_New);
- #ifdef ENABLE_INSPOS
- if (pos >= 0)
- ((CRemoteTreeBase &)child).mergeState(CPS_InsPos);
- #endif
- PARENT::addingNewElement(child, pos);
- }
- void CClientRemoteTree::removingElement(IPropertyTree *tree, unsigned pos)
- {
- CRemoteTreeBase *child = QUERYINTERFACE(tree, CRemoteTreeBase); assertex(child);
- registerDeleted(child->queryName(), pos, child->queryServerId());
- PARENT::removingElement(tree, pos);
- }
- void CClientRemoteTree::setAttribute(const char *attr, const char *val)
- {
- PARENT::setAttribute(attr, val);
- mergeState(CPS_AttrChanges);
- registerAttrChange(attr);
- }
- bool CClientRemoteTree::removeAttribute(const char *attr)
- {
- if (PARENT::removeAttribute(attr))
- {
- registerDeletedAttr(attr);
- return true;
- }
- else
- return false;
- }
- void CClientRemoteTree::registerRenamed(const char *newName, const char *oldName, unsigned pos, __int64 id)
- {
- mergeState(CPS_Renames);
- if (queryStateChanges())
- connection.registerRenamed(*this, newName, oldName, pos, id);
- }
- void CClientRemoteTree::registerDeleted(const char *name, unsigned position, __int64 id)
- {
- if (id)
- {
- mergeState(CPS_Deletions);
- if (queryStateChanges())
- connection.registerDeleted(*this, name, position, id);
- }
- }
- void CClientRemoteTree::registerAttrChange(const char *attr)
- {
- mergeState(CPS_AttrChanges);
- if (queryStateChanges())
- connection.registerAttrChange(*this, attr);
- }
- void CClientRemoteTree::registerDeletedAttr(const char *attr)
- {
- mergeState(CPS_AttrDeletions);
- if (queryStateChanges())
- connection.registerDeletedAttr(*this, attr);
- }
- void CClientRemoteTree::registerPropAppend(size32_t l)
- {
- mergeState(CPS_PropAppend);
- if (queryStateChanges())
- connection.registerPropAppend(*this, l);
- }
- void CClientRemoteTree::clearChanges()
- {
- if (0 != (STI_External & serverTreeInfo) && 0 != (CPS_PropAppend & state))
- setProp(NULL, (char *)NULL);
- connection.clearChanges(*this);
- }
- // block these ops on other threads during a commit, otherwise can lead to internal sds inconsistency.
- void CClientRemoteTree::addProp(const char *xpath, const char *val)
- {
- CConnectionLock b(connection);
- CRemoteTreeBase::addProp(xpath, val);
- }
- void CClientRemoteTree::setProp(const char *xpath, const char *val)
- {
- CConnectionLock b(connection);
- CRemoteTreeBase::setProp(xpath, val);
- }
- void CClientRemoteTree::addPropInt64(const char *xpath, __int64 val)
- {
- CConnectionLock b(connection);
- CRemoteTreeBase::addPropInt64(xpath, val);
- }
- void CClientRemoteTree::setPropInt64(const char *xpath, __int64 val)
- {
- CConnectionLock b(connection);
- CRemoteTreeBase::setPropInt64(xpath, val);
- }
- void CClientRemoteTree::setPropBin(const char *xpath, size32_t size, const void *data)
- {
- CConnectionLock b(connection);
- CRemoteTreeBase::setPropBin(xpath, size, data);
- }
- IPropertyTree *CClientRemoteTree::setPropTree(const char *xpath, IPropertyTree *val)
- {
- CConnectionLock b(connection);
- return CRemoteTreeBase::setPropTree(xpath, val);
- }
- IPropertyTree *CClientRemoteTree::addPropTree(const char *xpath, IPropertyTree *val)
- {
- CConnectionLock b(connection);
- return CRemoteTreeBase::addPropTree(xpath, val);
- }
- bool CClientRemoteTree::removeProp(const char *xpath)
- {
- CConnectionLock b(connection);
- return CRemoteTreeBase::removeProp(xpath);
- }
- bool CClientRemoteTree::removeTree(IPropertyTree *child)
- {
- CConnectionLock b(connection);
- return CRemoteTreeBase::removeTree(child);
- }
- void CClientRemoteTree::checkExt() const
- {
- if (!connection.queryUseAppendOpt()) return;
- if (!value)
- {
- if (STI_External & serverTreeInfo)
- {
- MemoryBuffer mb;
- queryManager().getExternalValueFromServerId(serverId, mb);
- if (mb.length())
- {
- bool binary = IptFlagTst(flags, ipt_binary);
- const_cast<CClientRemoteTree *>(this)->setValue(new CPTValue(mb), binary);
- }
- else
- serverTreeInfo &= ~STI_External;
- }
- }
- else if (0 != (CPS_PropAppend & state))
- {
- if (STI_External & serverTreeInfo)
- {
- MemoryBuffer mb;
- bool binary = IptFlagTst(flags, ipt_binary);
- queryManager().getExternalValueFromServerId(serverId, mb);
- if (mb.length())
- {
- const_cast<CClientRemoteTree *>(this)->setValue(new CPTValue(mb), binary);
- assertex(queryStateChanges());
- connection.registerPropAppend(*const_cast<CClientRemoteTree *>(this), mb.length());
- if (value)
- value->getValue(mb, binary);
- }
- else
- serverTreeInfo &= ~STI_External;
- }
- }
- }
- bool CClientRemoteTree::isCompressed(const char *xpath) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::isCompressed(xpath);
- }
- bool CClientRemoteTree::getProp(const char *xpath, StringBuffer &ret) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::getProp(xpath, ret);
- }
- const char *CClientRemoteTree::queryProp(const char * xpath) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::queryProp(xpath);
- }
- bool CClientRemoteTree::getPropBool(const char *xpath, bool dft) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::getPropBool(xpath, dft);
- }
- __int64 CClientRemoteTree::getPropInt64(const char *xpath, __int64 dft) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::getPropInt64(xpath, dft);
- }
- bool CClientRemoteTree::getPropBin(const char *xpath, MemoryBuffer &ret) const
- {
- if (!xpath) checkExt();
- return CRemoteTreeBase::getPropBin(xpath, ret);
- }
- IPropertyTreeIterator *CClientRemoteTree::getElements(const char *xpath, IPTIteratorCodes flags) const
- {
- if (!serverId || !queryLazyFetch()
- || !xpath || '\0' == *xpath || ('*' == *xpath && '\0' == *(xpath+1)) || !connection.queryServerIterAvailable() || (0 == (flags & iptiter_remote) && !connection.queryServerIter()) )
- return CRemoteTreeBase::getElements(xpath, flags);
- if (!hasChildren()) // not necessarily local yet.
- return createNullPTreeIterator();
- // if it's a single id, then not worth getting matches from server as level either present or needed.
- const char *xxpath = xpath;
- if (isValidXPathStartChr(*xxpath))
- {
- do { ++xxpath; }
- while (isValidXPathChr(*xxpath));
- }
- if ('\0' == *xxpath || ('/' == *xxpath && '/' != *(xxpath+1)))
- return CRemoteTreeBase::getElements(xpath, flags);
- return connection.doGetElements(const_cast<CClientRemoteTree *>(this), xpath, flags);
- }
- void CClientRemoteTree::localizeElements(const char *xpath, bool allTail)
- {
- if (!serverId || !queryLazyFetch() || !connection.queryServerGetIdsAvailable()
- || !xpath || '\0' == *xpath || ('*' == *xpath && '\0' == *(xpath+1)) || !hasChildren())
- return;
- IPTIteratorCodes flags = iptiter_remoteget;
- if (allTail)
- flags = iptiter_remotegetbranch;
- Owned<IPropertyTreeIterator> iter = connection.doGetElements(this, xpath, flags);
- return;
- }
- void CClientRemoteTree::resetState(unsigned _state, bool sub)
- {
- state = _state;
- serverId = 0;
- if (sub)
- {
- IPropertyTreeIterator *iter = getElements("*");
- ForEach(*iter)
- {
- CClientRemoteTree &child = (CClientRemoteTree &)iter->query();
- child.resetState(state, sub);
- }
- iter->Release();
- }
- }
- IPropertyTree *CClientRemoteTree::collateData()
- {
- ChangeInfo *changes = queryChanges();
- struct ChangeTree
- {
- ChangeTree(IPropertyTree *donor=NULL) { ptree = LINK(donor); }
- ~ChangeTree() { ::Release(ptree); }
- inline void createTree() { assertex(!ptree); ptree = createPTree(RESERVED_CHANGE_NODE); }
- inline IPropertyTree *queryTree() { return ptree; }
- inline IPropertyTree *getTree() { return LINK(ptree); }
- inline IPropertyTree *queryCreateTree()
- {
- if (!ptree)
- ptree = createPTree(RESERVED_CHANGE_NODE);
- return ptree;
- }
- private:
- StringAttr name;
- IPropertyTree *ptree;
- } ct(changes?changes->tree:NULL);
- if (changes) changes->tree.clear();
- if (0 == serverId)
- {
- ct.createTree();
- Owned<IAttributeIterator> iter = getAttributes();
- if (iter->count())
- {
- IPropertyTree *t = createPTree();
- ForEach(*iter)
- t->setProp(iter->queryName(), queryProp(iter->queryName()));
- ct.queryTree()->addPropTree(ATTRCHANGE_TAG, t);
- }
- ct.queryTree()->setPropBool("@new", true);
- }
- else
- {
- if (ct.queryTree())
- {
- Linked<IPropertyTree> ac = ct.queryTree()->queryPropTree(ATTRCHANGE_TAG);
- if (ac)
- {
- ct.queryTree()->removeTree(ac);
- Owned<IAttributeIterator> iter = ac->getAttributes();
- IPropertyTree *t = createPTree();
- ForEach(*iter)
- t->setProp(iter->queryName(), queryProp(iter->queryName()));
- ct.queryTree()->addPropTree(ATTRCHANGE_TAG, t);
- }
- }
- }
- if ((CPS_Changed & state) || (0 == serverId && queryValue()))
- {
- ct.queryCreateTree()->setPropBool("@localValue", true);
- if (queryValue())
- {
- bool binary=isBinary(NULL);
- ((PTree *)ct.queryTree())->setValue(new CPTValue(queryValue()->queryValueRawSize(), queryValue()->queryValueRaw(), binary, true, isCompressed(NULL)), binary);
- }
- else
- ((PTree *)ct.queryTree())->setValue(new CPTValue(0, NULL, false, true, false), false);
- }
- else if (CPS_PropAppend & state)
- {
- assertex(serverId);
- IPropertyTree *pa = ct.queryTree()->queryPropTree(APPEND_TAG);
- assertex(pa);
- unsigned from = pa->getPropInt(NULL);
- ct.queryTree()->removeTree(pa);
- ct.queryCreateTree()->setPropBool("@appendValue", true);
- MemoryBuffer mb;
- bool binary=isBinary(NULL);
- queryValue()->getValue(mb, true);
- ((PTree *)ct.queryTree())->setValue(new CPTValue(mb.length()-from, mb.toByteArray()+from, binary), binary);
- }
- Owned<IPropertyTree> childTree;
- Owned<IPropertyTreeIterator> _iter = getElements("*");
- IPropertyTreeIterator *iter = _iter;
- if (iter->first())
- {
- while (iter->isValid())
- {
- CClientRemoteTree *child = (CClientRemoteTree *) &iter->query();
- childTree.setown(child->collateData());
- if (childTree)
- {
- if (0 == child->queryServerId())
- {
- if (CPS_InsPos & child->queryState())
- {
- int pos = findChild(child);
- assertex(NotFound != pos);
- childTree->setPropInt("@pos", pos+1);
- }
- }
- else
- {
- int pos = findChild(child);
- assertex(NotFound != pos);
- childTree->setPropInt("@pos", pos+1);
- childTree->setPropInt64("@id", child->queryServerId());
- }
- }
- if (childTree)
- ct.queryCreateTree()->addPropTree(RESERVED_CHANGE_NODE, childTree.getClear());
- iter->next();
- }
- }
- if (ct.queryTree())
- ct.queryTree()->setProp("@name", queryName());
- return ct.getTree();
- }
- void CClientRemoteTree::clearCommitChanges(MemoryBuffer *mb)
- {
- class Cop : implements IIteratorOperator
- {
- public:
- Cop(MemoryBuffer *_mb=NULL) : mb(_mb) { }
- virtual bool applyTop(IPropertyTree &_tree)
- {
- CClientRemoteTree &tree = (CClientRemoteTree &) _tree;
- tree.clearChanges();
- if (tree.queryState())
- tree.setState(0);
- return true;
- }
- virtual bool applyChild(IPropertyTree &parent, IPropertyTree &child, bool &levelBreak)
- {
- CClientRemoteTree &tree = (CClientRemoteTree &) child;
- if (mb && 0==tree.queryServerId())
- {
- __int64 serverId;
- mb->read(serverId);
- tree.setServerId(serverId);
- }
- return true;
- }
- private:
- MemoryBuffer *mb;
- } op(mb);
- CIterationOperation iop(op);
- iop.iterate(*this);
- }
- /////////////////////
- CClientSDSManager::CClientSDSManager()
- {
- CDaliVersion serverVersionNeeded("2.1"); // to ensure backward compatibility
- childrenCanBeMissing = queryDaliServerVersion().compare(serverVersionNeeded) >= 0;
- CDaliVersion serverVersionNeeded2("3.4"); // to ensure backward compatibility
- lazyExtFlag = queryDaliServerVersion().compare(serverVersionNeeded2) >= 0 ? DAMP_SDSCMD_LAZYEXT : 0;
- properties = NULL;
- IPropertyTree &props = queryProperties();
- CDaliVersion serverVersionNeeded3(SDS_SVER_MIN_GETXPATHS_CONNECT);
- if (queryDaliServerVersion().compare(serverVersionNeeded3) < 0)
- props.removeProp("Client/@serverIter");
- else
- props.setPropBool("Client/@serverIterAvailable", true);
- clientThrottleLimit = props.getPropInt("Client/Throttle/@limit", CLIENT_THROTTLE_LIMIT);
- clientThrottleDelay = props.getPropInt("Client/Throttle/@delay", CLIENT_THROTTLE_DELAY);
- CDaliVersion appendOptVersionNeeded(SDS_SVER_MIN_APPEND_OPT); // min version for append optimization
- props.setPropBool("Client/@useAppendOpt", queryDaliServerVersion().compare(appendOptVersionNeeded) >= 0);
- CDaliVersion serverVersionNeeded4(SDS_SVER_MIN_GETIDS); // min version for get xpath with server ids
- if (queryDaliServerVersion().compare(serverVersionNeeded4) >= 0)
- props.setPropBool("Client/@serverGetIdsAvailable", true);
- concurrentRequests.signal(clientThrottleLimit);
- }
- CClientSDSManager::~CClientSDSManager()
- {
- CriticalBlock block(connections.crit);
- SuperHashIteratorOf<CConnectionBase> iter(connections.queryBaseTable());
- ForEach(iter)
- {
- CRemoteConnection &conn = (CRemoteConnection &) iter.query();
- conn.setConnected(false);
- }
- ::Release(properties);
- }
- bool CClientSDSManager::sendRequest(CMessageBuffer &mb, bool throttle)
- {
- if (throttle)
- {
- bool avail = concurrentRequests.wait(clientThrottleDelay);
- if (!avail)
- WARNLOG("Excessive concurrent Dali SDS client transactions. Transaction delayed.");
- bool res;
- try { res = queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST); }
- catch (IException *)
- {
- if (avail)
- concurrentRequests.signal();
- throw;
- }
- if (avail)
- concurrentRequests.signal();
- return res;
- }
- else
- return queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST);
- }
- CRemoteTreeBase *CClientSDSManager::get(CRemoteConnection &connection, __int64 serverId)
- {
- CMessageBuffer mb;
- if (childrenCanBeMissing)
- mb.append((int)DAMP_SDSCMD_GET2 | lazyExtFlag);
- else
- mb.append((int)DAMP_SDSCMD_GET | lazyExtFlag);
- mb.append(connection.queryConnectionId());
- mb.append(serverId);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
-
- mb.read((int &)replyMsg);
-
- CClientRemoteTree *tree = NULL;
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- CDisableFetchChangeBlock block(connection);
- tree = new CClientRemoteTree(connection);
- tree->deserializeSelfRT(mb);
- break;
- }
- case DAMP_SDSREPLY_EMPTY:
- break;
- default:
- throwMbException("SDS Reply Error ", mb);
- }
- return tree;
- }
- void CClientSDSManager::getChildren(CRemoteTreeBase &parent, CRemoteConnection &connection, unsigned levels)
- {
- CMessageBuffer mb;
- if (childrenCanBeMissing)
- mb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
- else
- mb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
- mb.append(connection.queryConnectionId());
- mb.append(parent.queryServerId());
- mb.append(levels);
- mb.append((__int64)0); // terminator
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "fetching SDS branch");
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_EMPTY:
- return;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- CDisableFetchChangeBlock block(connection);
- if (childrenCanBeMissing)
- {
- bool r;
- mb.read(r);
- if (!r) return;
- }
- parent.deserializeChildrenRT(mb);
- }
- static void matchServerTree(CClientRemoteTree *local, IPropertyTree &matchTree, ICopyArrayOf<CClientRemoteTree> &matchedLocals, ICopyArrayOf<IPropertyTree> &matched, bool allTail, MemoryBuffer &mb)
- {
- Owned<IPropertyTreeIterator> matchIter = matchTree.getElements("*");
- if (matchIter->first())
- {
- if (!local || (local->hasChildren() && NULL == local->queryChildren()))
- {
- if (local)
- {
- matchedLocals.append(*local);
- matched.append(matchTree);
- }
- mb.append(matchTree.getPropInt64("@serverId"));
- mb.append((unsigned)1);
- }
- do
- {
- IPropertyTree &elem = matchIter->query();
- StringBuffer path(elem.queryName());
- path.append('[').append(elem.getPropInt("@pos")).append(']');
- CClientRemoteTree *child = local ? (CClientRemoteTree *)local->queryPropTree(path.str()) : NULL;
- matchServerTree(child, elem, matchedLocals, matched, allTail, mb);
- }
- while (matchIter->next());
- }
- else
- {
- if (!local || (local->hasChildren() && NULL == local->queryChildren()))
- {
- if (local)
- {
- matchedLocals.append(*local);
- matched.append(matchTree);
- }
- mb.append(matchTree.getPropInt64("@serverId"));
- mb.append(allTail ? (unsigned)0 : (unsigned)1);
- }
- }
- }
- void CClientSDSManager::ensureLocal(CRemoteConnection &connection, CRemoteTreeBase &_parent, IPropertyTree *serverMatchTree, IPTIteratorCodes flags)
- {
- CClientRemoteTree &parent = (CClientRemoteTree &)_parent;
- CMessageBuffer remoteGetMb;
- if (childrenCanBeMissing)
- remoteGetMb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
- else
- remoteGetMb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
- remoteGetMb.append(connection.queryConnectionId());
- ICopyArrayOf<CClientRemoteTree> matchedLocals;
- ICopyArrayOf<IPropertyTree> matched;
- bool getLeaves = iptiter_remotegetbranch == (flags & iptiter_remotegetbranch);
- CDisableFetchChangeBlock block(connection);
- matchServerTree(&parent, *serverMatchTree, matchedLocals, matched, getLeaves, remoteGetMb);
- if (0 == matched.ordinality())
- return;
- remoteGetMb.append((__int64)0);
- if (!sendRequest(remoteGetMb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "ensureLocal");
- SdsReply replyMsg;
- remoteGetMb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_EMPTY:
- return;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", remoteGetMb);
- default:
- assertex(false);
- }
- ForEachItemIn(m, matched)
- walkAndFill(matched.item(m), matchedLocals.item(m), remoteGetMb, childrenCanBeMissing);
- }
- void CClientSDSManager::getChildrenFor(CRTArray &childLessList, CRemoteConnection &connection, unsigned levels)
- {
- CMessageBuffer mb;
- if (childrenCanBeMissing)
- mb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
- else
- mb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
- mb.append(connection.queryConnectionId());
- ForEachItemIn(f, childLessList)
- {
- CRemoteTreeBase &parent = childLessList.item(f);
- if (parent.queryServerId())
- {
- mb.append(parent.queryServerId());
- mb.append(levels);
- }
- }
- mb.append((__int64)0); // terminator
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getChildrenFor");
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_EMPTY:
- return;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- CDisableFetchChangeBlock block(connection);
- ForEachItemIn(f2, childLessList)
- {
- CRemoteTreeBase &parent = childLessList.item(f2);
- parent.createChildMap();
- if (parent.queryServerId())
- {
- bool r;
- if (childrenCanBeMissing)
- mb.read(r);
- else
- r = true;
- if (r)
- parent.deserializeChildrenRT(mb);
- }
- }
- }
- IPropertyTreeIterator *CClientSDSManager::getElements(CRemoteConnection &connection, const char *xpath)
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETELEMENTS | lazyExtFlag);
- mb.append(connection.queryConnectionId());
- mb.append(xpath);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- CClientRemoteTree *tree = NULL;
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- unsigned count;
- mb.read(count);
- CDisableFetchChangeBlock block(connection);
- Owned<DaliPTArrayIterator> iter = new DaliPTArrayIterator();
- while (count--)
- {
- CClientRemoteTree *tree = new CClientRemoteTree(connection);
- iter->array.append(*tree);
- tree->deserializeSelfRT(mb);
- }
- return LINK(iter);
- }
- default:
- throwMbException("SDS Reply Error ", mb);
- }
- return NULL;
- }
- void CClientSDSManager::noteDisconnected(CRemoteConnection &connection)
- {
- connection.setConnected(false);
- connections.removeExact(&connection);
- }
- void CClientSDSManager::commit(CRemoteConnection &connection, bool *disconnectDeleteRoot)
- {
- CriticalBlock b(crit); // if >1 commit per client concurrently would cause problems with serverId.
- CClientRemoteTree *tree = (CClientRemoteTree *) connection.queryRoot();
- try
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_DATA);
- mb.append(connection.queryConnectionId());
- if (disconnectDeleteRoot)
- {
- mb.append((byte)(0x80 + 1)); // kludge, high bit to indicate new client format. (for backward compat.)
- mb.append(*disconnectDeleteRoot);
- }
- else
- mb.append((byte)0x80); // kludge, high bit to indicate new client format. (for backward compat.)
- bool lazyFetch = connection.setLazyFetch(false);
- Owned<IPropertyTree> changes = tree->collateData();
- connection.setLazyFetch(lazyFetch);
- if (NULL == disconnectDeleteRoot && !changes) return;
- if (changes) changes->serialize(mb);
- try
- {
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "committing");
- }
- catch (IDaliClient_Exception *e)
- {
- if (DCERR_server_closed == e->errorCode())
- {
- if (changes)
- WARNLOG("Dali server disconnect, failed to commit data");
- e->Release();
- if (disconnectDeleteRoot)
- noteDisconnected(connection);
- return; // JCSMORE does this really help, shouldn't it just throw?
- }
- else
- throw;
- }
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- bool lazyFetch = connection.setLazyFetch(false);
- // NOTE: this means that send collated data order and the following order have to match!
- // JCSMORE - true but.. hmm.. (could possibly have alternative lookup scheme)
- tree->clearCommitChanges(&mb);
- assertex(mb.getPos() == mb.length()); // must have read it all
- connection.setLazyFetch(lazyFetch);
- break;
- }
- case DAMP_SDSREPLY_EMPTY:
- break;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- }
- catch (IException *)
- {
- if (disconnectDeleteRoot)
- noteDisconnected(connection);
- throw;
- }
- if (disconnectDeleteRoot)
- noteDisconnected(connection);
- }
- void CClientSDSManager::changeMode(CRemoteConnection &connection, unsigned mode, unsigned timeout, bool suppressReloads)
- {
- CConnectionLock b(connection);
- if (mode & RTM_CREATE_MASK)
- throw MakeSDSException(SDSExcpt_BadMode, "calling changeMode");
- unsigned prevMode = connection.queryMode();
- if (RTM_MODE(prevMode, RTM_LOCK_WRITE) && !RTM_MODE(mode, RTM_LOCK_WRITE))
- commit(connection, NULL);
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_CHANGEMODE);
- mb.append(connection.queryConnectionId());
- mb.append(mode).append(timeout);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "changing mode");
- SdsReply replyMsg;
-
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- connection.setMode(mode);
- if (!suppressReloads)
- {
- if (RTM_MODE(mode, RTM_LOCK_WRITE) && !RTM_MODE(prevMode, RTM_LOCK_WRITE) && !RTM_MODE(prevMode, RTM_LOCK_READ))
- connection.reload();
- }
- break;
- }
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- }
- // ISDSManager impl.
- #define MIN_MCONNECT_SVER "1.5"
- IRemoteConnections *CClientSDSManager::connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout)
- {
- CDaliVersion serverVersionNeeded(MIN_MCONNECT_SVER);
- if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Multiple connect not supported by server versions prior to " MIN_MCONNECT_SVER);
- if (0 == id || id != myProcessSession())
- throw MakeSDSException(SDSExcpt_InvalidSessionId, ", in multi connect, sessionid=%" I64F "x", id);
- CMessageBuffer mb;
- mb.append((unsigned)DAMP_SDSCMD_MCONNECT | lazyExtFlag);
- mb.append(id).append(timeout);
- mConnect->serialize(mb);
- if (!sendRequest(mb, true))
- {
- StringBuffer s(", making multiple connect to ");
- getMConnectString(mConnect, s);
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "%s", s.str());
- }
- SdsReply replyMsg;
-
- Owned<CRemoteConnections> remoteConnections = new CRemoteConnections();
- unsigned c;
- for (c=0; c<mConnect->queryConnections(); c++)
- {
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- ConnectionId connId;
- mb.read(connId);
- StringAttr xpath;
- unsigned mode;
- mConnect->getConnectionDetails(c, xpath, mode);
- Owned<CRemoteConnection> conn = new CRemoteConnection(*this, connId, xpath, id, mode & ~RTM_CREATE_MASK, timeout);
- assertex(conn.get());
- if (queryProperties().getPropBool("Client/@LogConnection"))
- DBGLOG("SDSManager::connect() - IMultipleConnector: RemoteConnection ID<%" I64F "x>, timeout<%d>", connId, timeout);
- CClientRemoteTree *tree;
- { CDisableFetchChangeBlock block(*conn);
- tree = new CClientRemoteTree(*conn);
- tree->deserializeRT(mb);
- }
- conn->setRoot(tree);
- connections.replace(*conn);
- remoteConnections->add(LINK(conn));
- break;
- }
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- }
- return LINK(remoteConnections);
- }
- IRemoteConnection *CClientSDSManager::connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout)
- {
- if (0 == id || id != myProcessSession())
- throw MakeSDSException(SDSExcpt_InvalidSessionId, ", connecting to %s, sessionid=%" I64F "x", xpath, id);
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_CONNECT | lazyExtFlag);
- mb.append(id).append(mode).append(timeout);
- mb.append(xpath);
- if (!sendRequest(mb, true))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, ", connecting to %s", xpath);
- SdsReply replyMsg;
-
- mb.read((int &)replyMsg);
-
- CRemoteConnection *conn = NULL;
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- ConnectionId connId;
- mb.read(connId);
- conn = new CRemoteConnection(*this, connId, xpath, id, mode & ~RTM_CREATE_MASK, timeout);
- assertex(conn);
- if (queryProperties().getPropBool("Client/@LogConnection"))
- DBGLOG("SDSManager::connect(): xpath<%s>, RemoteConnection ID<%" I64F "x>, mode<%x>, timeout<%d>", xpath, connId, mode, timeout);
- CClientRemoteTree *tree;
- { CDisableFetchChangeBlock block(*conn);
- tree = new CClientRemoteTree(*conn);
- tree->deserializeRT(mb);
- }
- conn->setRoot(tree);
- connections.replace(*conn);
- break;
- }
- case DAMP_SDSREPLY_EMPTY:
- break;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- return conn;
- }
- SubscriptionId CClientSDSManager::subscribe(const char *xpath, ISDSSubscription ¬ify, bool sub, bool sendValue)
- {
- assertex(xpath);
- if (sub && sendValue)
- throw MakeSDSException(SDSExcpt_Unsupported, "Subscription to sub elements, with sendValue option unsupported");
- StringBuffer s;
- if ('/' != *xpath)
- {
- s.append('/').append(xpath);
- xpath = s.str();
- }
- CSDSSubscriberProxy *subscriber = new CSDSSubscriberProxy(xpath, sub, sendValue, notify);
- querySubscriptionManager(SDS_PUBLISHER)->add(subscriber, subscriber->getId());
- return subscriber->getId();
- }
- SubscriptionId CClientSDSManager::subscribeExact(const char *xpath, ISDSNodeSubscription ¬ify, bool sendValue)
- {
- if (queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) < 0)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " SDS_SVER_MIN_NODESUBSCRIBE " for subscribeExact");
- assertex(xpath);
- StringBuffer s;
- if ('/' != *xpath)
- {
- s.append('/').append(xpath);
- xpath = s.str();
- }
- CSDSNodeSubscriberProxy *subscriber = new CSDSNodeSubscriberProxy(xpath, sendValue, notify);
- querySubscriptionManager(SDSNODE_PUBLISHER)->add(subscriber, subscriber->getId());
- return subscriber->getId();
- }
- void CClientSDSManager::unsubscribe(SubscriptionId id)
- {
- querySubscriptionManager(SDS_PUBLISHER)->remove(id);
- }
- void CClientSDSManager::unsubscribeExact(SubscriptionId id)
- {
- if (queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) < 0)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " SDS_SVER_MIN_NODESUBSCRIBE " for unsubscribeExact");
- querySubscriptionManager(SDSNODE_PUBLISHER)->remove(id);
- }
- StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_DIAGNOSTIC);
- mb.append((int)cmd);
- if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnositc info");
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- switch (cmd)
- {
- case DIAG_CMD_STATS:
- formatUsageStats(mb, out);
- break;
- case DIAG_CMD_CONNECTIONS:
- formatConnections(mb, out);
- break;
- case DIAG_CMD_SUBSCRIBERS:
- formatSubscribers(mb, out);
- break;
- }
- break;
- }
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- return out;
- }
- ILockInfoCollection *CClientSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
- {
- CMessageBuffer msg;
- msg.append((int)DAMP_SDSCMD_DIAGNOSTIC);
- msg.append((int)DIAG_CMD_LOCKINFO);
- msg.append(ipPattern?ipPattern:"");
- msg.append(xpathPattern?xpathPattern:"");
- if (!queryCoven().sendRecv(msg, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getLocks");
- SdsReply replyMsg;
- msg.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", msg);
- default:
- assertex(false);
- }
- return deserializeLockInfoCollection(msg);
- }
- StringBuffer &CClientSDSManager::getUsageStats(StringBuffer &out)
- {
- return getInfo(DIAG_CMD_STATS, out);
- }
- StringBuffer &CClientSDSManager::getConnections(StringBuffer &out)
- {
- return getInfo(DIAG_CMD_CONNECTIONS, out);
- }
- StringBuffer &CClientSDSManager::getSubscribers(StringBuffer &out)
- {
- return getInfo(DIAG_CMD_SUBSCRIBERS, out);
- }
- // TODO
- StringBuffer &CClientSDSManager::getExternalReport(StringBuffer &out)
- {
- return out;
- }
- IPropertyTree &CClientSDSManager::queryProperties() const
- {
- if (properties) return *properties;
- CDaliVersion serverVersionNeeded("3.1");
- if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= 3.1 for getProperties usage");
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETPROPS);
- if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnostic info");
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- properties = createPTree(mb);
- if (!properties->hasProp("Client"))
- properties->setPropTree("Client", createPTree());
- return *properties;
- }
- IPropertyTree *CClientSDSManager::getXPaths(__int64 serverId, const char *xpath, bool getServerIds)
- {
- CMessageBuffer mb;
- mb.append((int)(getServerIds?DAMP_SDSCMD_GETXPATHSPLUSIDS:DAMP_SDSCMD_GETXPATHS));
- mb.append(serverId);
- mb.append(xpath);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- return createPTree(mb);
- case DAMP_SDSREPLY_EMPTY:
- return NULL;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- }
- throwUnexpected();
- }
- IPropertyTreeIterator *CClientSDSManager::getXPathsSortLimit(const char *baseXPath, const char *matchXPath, const char *sortBy, bool caseinsensitive, bool ascending, unsigned from, unsigned limit)
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETXPATHSCRITERIA);
- mb.append(baseXPath);
- mb.append(matchXPath);
- mb.append(sortBy);
- mb.append(caseinsensitive);
- mb.append(ascending);
- mb.append(from);
- mb.append(limit);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- break;
- case DAMP_SDSREPLY_EMPTY:
- return createNullPTreeIterator();
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- throwUnexpected();
- }
- Owned<IPropertyTree> matchTree = createPTree(mb);
- Owned<CRemoteConnection> conn = (CRemoteConnection *)connect(baseXPath, myProcessSession(), RTM_LOCK_READ, INFINITE);
- if (!conn)
- return createNullPTreeIterator();
- ensureLocal(*conn, (CRemoteTreeBase &)*conn->queryRoot(), matchTree);
- return new CXPathIterator(conn->queryRoot(), matchTree, iptiter_null);
- }
- void CClientSDSManager::getExternalValueFromServerId(__int64 serverId, MemoryBuffer &resMb)
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETEXTVALUE);
- mb.append(serverId);
- if (!sendRequest(mb))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- resMb.append(mb.length()-mb.getPos(), mb.toByteArray()+mb.getPos());
- return;
- case DAMP_SDSREPLY_EMPTY:
- return;
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- }
- throwUnexpected();
- }
- IPropertyTreeIterator *CClientSDSManager::getElementsRaw(const char *xpath, INode *remotedali, unsigned timeout)
- {
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETELEMENTSRAW);
- mb.append(xpath);
- bool ok;
- if (remotedali) {
- Owned<IGroup> grp = createIGroup(1,&remotedali);
- Owned<ICommunicator> comm = createCommunicator(grp,false);
- try {
- ok = comm->sendRecv(mb,RANK_RANDOM, MPTAG_DALI_SDS_REQUEST, timeout);
- }
- catch (IMP_Exception *e)
- {
- if (e->errorCode()!=MPERR_link_closed)
- throw;
- e->Release();
- throw createClientException(DCERR_server_closed);
- }
- }
- else
- ok = sendRequest(mb);
- if (!ok)
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
-
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- Owned<DaliPTArrayIterator> resultIterator = new DaliPTArrayIterator;
- unsigned count, c;
- mb.read(count);
- for (c=0; c<count; c++)
- {
- Owned<IPropertyTree> item = createPTree(mb);
- resultIterator->array.append(*LINK(item));
- }
- return LINK(resultIterator);
- }
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- }
- return NULL;
- }
- void CClientSDSManager::setConfigOpt(const char *opt, const char *value)
- {
- IPropertyTree &props = queryProperties();
- if (props.hasProp(opt) && (0 == strcmp(value, props.queryProp(opt))))
- return;
- ensurePTree(&queryProperties(), opt);
- queryProperties().setProp(opt, value);
- if (0 == strcmp("Client/Throttle/@limit", opt))
- {
- unsigned newV = props.getPropInt(opt);
- int diff = clientThrottleLimit-newV;
- if (diff)
- {
- if (diff>0) // new limit is lower than old
- {
- PROGLOG("Reducing concurrentThrottleLimit from %d to %d", clientThrottleLimit, newV);
- unsigned c=0;
- for (;;)
- {
- // generally won't be waiting, as would expect this option to typically be called just after component startup time.
- if (!concurrentRequests.wait(clientThrottleDelay))
- WARNLOG("Waiting on active requests to lower clientThrottleLimit");
- else
- {
- ++c;
- if (c == diff)
- break;
- }
- }
- }
- else
- {
- PROGLOG("Increasing clientThrottleLimit from %d to %d", clientThrottleLimit, newV);
- concurrentRequests.signal(-diff); // new limit is higher than old
- }
- clientThrottleLimit = newV;
- }
- }
- }
- #define MIN_QUERYCOUNT_SVER "3.8"
- unsigned CClientSDSManager::queryCount(const char *xpath)
- {
- CDaliVersion serverVersionNeeded(MIN_QUERYCOUNT_SVER);
- if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
- throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " MIN_QUERYCOUNT_SVER " for queryCount(<xpath>)");
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_GETCOUNT);
- mb.append(xpath);
- if (!sendRequest(mb, true))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, ", queryCount(%s)", xpath);
- unsigned count=0;
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
- if (DAMP_SDSREPLY_OK == replyMsg)
- mb.read(count);
- else
- {
- assertex(replyMsg == DAMP_SDSREPLY_ERROR);
- throwMbException("SDS Reply Error ", mb);
- }
- return count;
- }
- #define MIN_UPDTENV_SVER "3.9"
- bool CClientSDSManager::updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response)
- {
- CDaliVersion serverVersionNeeded(MIN_QUERYCOUNT_SVER);
- if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
- {
- // have to do the old fashioned way, from client
- Owned<IRemoteConnection> conn = querySDS().connect("/",myProcessSession(),0, INFINITE);
- if (conn)
- {
- Owned<IPropertyTree> root = conn->getRoot();
- Owned<IPropertyTree> oldEnvironment = root->getPropTree("Environment");
- if (oldEnvironment.get())
- {
- StringBuffer bakname;
- Owned<IFileIO> io = createUniqueFile(NULL, "environment", "bak", bakname);
- Owned<IFileIOStream> fstream = createBufferedIOStream(io);
- toXML(oldEnvironment, *fstream); // formatted (default)
- root->removeTree(oldEnvironment);
- }
- root->addPropTree("Environment", LINK(newEnv));
- root.clear();
- conn->commit();
- conn->close();
- StringBuffer messages;
- initClusterGroups(forceGroupUpdate, messages, oldEnvironment);
- if (messages.length())
- PROGLOG("CClientSDSManager::updateEnvironment: %s", messages.str());
- PROGLOG("Environment and node groups updated");
- }
- return true;
- }
- CMessageBuffer mb;
- mb.append((int)DAMP_SDSCMD_UPDTENV);
- newEnv->serialize(mb);
- mb.append(forceGroupUpdate);
- if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
- throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnositc info");
- bool result = false;
- StringAttr resultStr;
- SdsReply replyMsg;
- mb.read((int &)replyMsg);
- switch (replyMsg)
- {
- case DAMP_SDSREPLY_OK:
- {
- mb.read(result);
- mb.read(resultStr);
- response.append(resultStr);
- break;
- }
- case DAMP_SDSREPLY_ERROR:
- throwMbException("SDS Reply Error ", mb);
- default:
- assertex(false);
- }
- return result;
- }
- //////////////
- ISDSManager &querySDS()
- {
- CriticalBlock block(SDScrit);
- if (SDSManager)
- return *SDSManager;
- else if (!queryCoven().inCoven())
- {
- if (!SDSManager)
- SDSManager = new CClientSDSManager();
- return *SDSManager;
- }
- else
- {
- SDSManager = &querySDSServer();
- return *SDSManager;
- }
- }
- void closeSDS()
- {
- CriticalBlock block(SDScrit);
- if (SDSManager) {
- assertex(!queryCoven().inCoven()); // only called by client
- try {
- delete SDSManager;
- }
- catch (IMP_Exception *e)
- {
- if (e->errorCode()!=MPERR_link_closed)
- throw;
- EXCLOG(e, "closeSDS");
- e->Release();
- }
- catch (IDaliClient_Exception *e) {
- if (e->errorCode()!=DCERR_server_closed)
- throw;
- e->Release();
- }
- SDSManager = NULL;
- }
- }
|