123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define da_decl DECL_EXPORT
- #include "platform.h"
- #include "jlib.hpp"
- #include "jmisc.hpp"
- #include "jsuperhash.hpp"
- #include "daclient.hpp"
- // TBD local Coven subscriptions
- //#define SUPRESS_REMOVE_ABORTED
- #define TRACE_QWAITING
- #include "dacoven.hpp"
- #include "mpbuff.hpp"
- #include "mpcomm.hpp"
- #include "mputil.hpp"
- #include "daserver.hpp"
- #include "dasubs.ipp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- enum MSubscriptionRequestKind {
- MSR_REMOVE_SUBSCRIPTION_PRIMARY,
- MSR_ADD_SUBSCRIPTION_PRIMARY,
- MSR_REMOVE_SUBSCRIPTION_SECONDARY,
- MSR_ADD_SUBSCRIPTION_SECONDARY
- };
- class CSubscriptionStub: implements ISubscription, public CInterface
- { // Server (Coven) side
- unsigned tag;
- MemoryAttr data;
- SubscriptionId sid;
- INode *dst;
- bool hasaborted;
- public:
- IMPLEMENT_IINTERFACE;
- CSubscriptionStub(unsigned _tag,SubscriptionId _sid,size32_t _datalen, const byte *_data,INode *_dst)
- : data(_datalen,_data)
- {
- tag = _tag;
- sid = _sid;
- dst = LINK(_dst);
- hasaborted = false;
- }
- virtual ~CSubscriptionStub()
- {
- unlink();
- dst->Release();
- }
- const MemoryAttr &queryData()
- {
- return data;
- }
-
- void notify(MemoryBuffer &returndata) // if returns false should unsubscribe
- {
- if (hasaborted) {
- throw MakeStringException(-1,"Subscription notification aborted");
- return;
- }
- size32_t dlen = returndata.length();
- CMessageBuffer mb;
- mb.append(tag).append(sid).append(dlen).append(returndata);
- try {
- if (!queryWorldCommunicator().send(mb,dst,MPTAG_DALI_SUBSCRIPTION_FULFILL,1000*60*3)) {
- // Must reply in 3 Minutes
- // Kludge to avoid locking SDS on blocked client
- hasaborted = true;
- StringBuffer tmp;
- throw MakeStringException(-1,"Subscription notification to %s timed out",dst->endpoint().getUrlStr(tmp).str());
- return;
- }
- }
- catch (IMP_Exception *e) {
- PrintExceptionLog(e,"Dali CSubscriptionStub");
- hasaborted = true;
- throw;
- }
- }
- void abort()
- {
- hasaborted = true;
- }
- bool aborted()
- {
- return hasaborted;
- }
- void unlink();
- INode &queryNode() { return *dst; }
- unsigned queryTag() { return tag; }
- SubscriptionId querySubscriptionId() { return sid; }
- StringBuffer &getDetails(StringBuffer &buf)
- {
- StringBuffer ep;
- return buf.appendf("%16" I64F "X: %s %s",sid,dst->endpoint().getUrlStr(ep).str(),hasaborted?"aborted":"");
- }
- };
- static class CDaliPublisher
- {
- public:
- virtual ISubscriptionManager *queryManager(unsigned tag) = 0;
- virtual void stop() = 0;
- virtual ~CDaliPublisher() {}
- } *DaliPublisher;
- class CDaliPublisherServer: public IDaliServer, public Thread, implements CDaliPublisher, implements IConnectionMonitor
- {
- ICopyArrayOf<CSubscriptionStub> stubs;
- IArrayOf<ISubscriptionManager> managers;
- UnsignedArray tags;
- CheckedCriticalSection tagsect;
- CheckedCriticalSection stubsect;
- bool stopped;
- ReadWriteLock processlock;
- public:
- IMPLEMENT_IINTERFACE;
- CDaliPublisherServer()
- : Thread("CDaliPublisherServer")
- {
- stopped = true;
- }
- ~CDaliPublisherServer()
- {
- stopped = true;
- managers.kill();
- }
- void start()
- {
- Thread::start();
- }
- void ready()
- {
- addMPConnectionMonitor(this);
- }
- void suspend()
- {
- PROGLOG("Suspending subscriptions");
- removeMPConnectionMonitor(this);
- processlock.lockWrite();
- PROGLOG("Suspended subscriptions");
- }
- void stop()
- {
- if (!stopped) {
- stopped = true;
- queryCoven().cancel(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST);
- }
- processlock.unlockWrite();
- join();
- }
- int run()
- {
- ICoven &coven=queryCoven();
- CMessageHandler<CDaliPublisherServer> handler("CDaliPublisherServer",this,&CDaliPublisherServer::processMessage,NULL, 100);
- CMessageBuffer mb;
- stopped = false;
- while (!stopped) {
- try {
- mb.clear();
- #ifdef TRACE_QWAITING
- unsigned waiting = coven.probe(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST,NULL);
- if ((waiting!=0)&&(waiting%10==0))
- DBGLOG("QPROBE: MPTAG_DALI_SUBSCRIPTION_REQUEST has %d waiting",waiting);
- #endif
- if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST,NULL))
- handler.handleMessage(mb);
- else
- stopped = true;
- }
- catch (IException *e)
- {
- EXCLOG(e, "CDaliPublisherServer");
- e->Release();
- }
- }
- return 0;
- }
- void processMessage(CMessageBuffer &mb)
- {
- ReadLockBlock block(processlock);
- if (stopped)
- return;
- ICoven &coven=queryCoven();
- int fn;
- mb.read(fn);
- SubscriptionId sid;
- unsigned subtag;
- ISubscriptionManager *manager;
- switch (fn) {
- case MSR_ADD_SUBSCRIPTION_PRIMARY:
- case MSR_ADD_SUBSCRIPTION_SECONDARY:
- {
- Owned<IException> exception;
- Owned<CSubscriptionStub> sub;
- try
- {
- SubscriptionId sid;
- mb.read(subtag).read(sid);
- Owned<INode> subscriber = deserializeINode(mb);
- size32_t dsize;
- mb.read(dsize);
- sub.setown(new CSubscriptionStub(subtag,sid,dsize,mb.readDirect(dsize),subscriber));
- mb.clear();
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- removeAborted();
- }
- manager = queryManager(subtag);
- if (manager) {
- if (fn==MSR_ADD_SUBSCRIPTION_PRIMARY) {
- rank_t n = coven.queryGroup().ordinality();
- rank_t mr = coven.queryGroup().rank();
- for (rank_t r = 0;r<n;r++) {
- if (r!=mr) {
- int fn = MSR_ADD_SUBSCRIPTION_SECONDARY;
- mb.clear().append(fn).append(subtag).append(sid);
- subscriber->serialize(mb);
- mb.append(dsize).append(dsize,sub->queryData().get());
- coven.sendRecv(mb,r,MPTAG_DALI_SUBSCRIPTION_REQUEST);
- // should check for server failure here
- }
- }
- }
- manager->add(sub.getLink(),sid);
- }
- }
- catch (IException *e) {
- exception.setown(e);
- sub.clear();
- }
- unsigned retry=0;
- if (exception)
- serializeException(exception, mb);
- while (!coven.reply(mb,60000)) {
- StringBuffer eps;
- DBGLOG("MSR_ADD_SUBSCRIPTION_PRIMARY reply timed out to %s try %d",mb.getSender().getUrlStr(eps).str(),retry+1);
- if (retry++==3)
- return;
- }
- if (sub)
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- stubs.append(*sub);
- }
- }
- break;
- case MSR_REMOVE_SUBSCRIPTION_PRIMARY:
- case MSR_REMOVE_SUBSCRIPTION_SECONDARY:
- {
- unsigned tstart = msTick();
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- removeAborted();
- mb.read(subtag);
- mb.read(sid);
- manager = queryManager(subtag);
- if (manager) {
- if (fn==MSR_REMOVE_SUBSCRIPTION_PRIMARY) {
- rank_t n = coven.queryGroup().ordinality();
- rank_t mr = coven.queryGroup().rank();
- for (rank_t r = 0;r<n;r++) {
- if (r!=mr) {
- mb.clear().append(MSR_REMOVE_SUBSCRIPTION_SECONDARY).append(subtag).append(sid);
- coven.sendRecv(mb,r,MPTAG_DALI_SUBSCRIPTION_REQUEST);
- // should check for server failure here
- }
- }
- }
- manager->remove(sid);
- }
- mb.clear();
- }
- coven.reply(mb);
- unsigned telapsed=msTick()-tstart;
- if (telapsed>1000)
- LOG(MCerror, unknownJob, "MSR_REMOVE_SUBSCRIPTION_PRIMARY.1 took %dms",telapsed);
- }
- break;
- }
- }
- void nodeDown(rank_t rank)
- {
- assertex(!"TBD");
- }
- ISubscriptionManager *queryManager(unsigned tag)
- {
- CHECKEDCRITICALBLOCK(tagsect,60000);
- unsigned i = tags.find(tag);
- if (i==NotFound)
- return NULL;
- return &managers.item(i);
- }
- void registerSubscriptionManager(unsigned tag, ISubscriptionManager *manager)
- {
- CHECKEDCRITICALBLOCK(tagsect,60000);
- tags.append(tag);
- manager->Link();
- managers.append(*manager);
- }
- void unlink(CSubscriptionStub *stub)
- {
- unsigned tstart = msTick();
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- stubs.zap(*stub);
- }
- unsigned telapsed=msTick()-tstart;
- if (telapsed>1000)
- LOG(MCerror, unknownJob, "CDaliPublisherServer::unlink took %dms",telapsed);
- }
-
- void onClose(SocketEndpoint &ep)
- {
- // mark stub closed
- unsigned tstart = msTick();
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- ForEachItemIn(i, stubs)
- {
- CSubscriptionStub &stub = stubs.item(i);
- if (stub.queryNode().endpoint().equals(ep)) {
- stub.abort();
- }
- }
- unsigned telapsed=msTick()-tstart;
- if (telapsed>1000)
- LOG(MCerror, unknownJob, "CDaliPublisherServer::onClose took %dms",telapsed);
- }
- }
- void removeAborted()
- {
- #ifdef SUPRESS_REMOVE_ABORTED
- return;
- #endif
- // called from critical section
- CIArrayOf<CSubscriptionStub> toremove;
- ForEachItemIn(i, stubs)
- {
- CSubscriptionStub &stub = stubs.item(i);
- if (stub.aborted()) {
- stub.Link();
- toremove.append(stub);
- }
- }
- if (toremove.ordinality()) {
- CHECKEDCRITICALUNBLOCK(stubsect,60000);
- ForEachItemIn(i2, toremove) {
- CSubscriptionStub &stub = toremove.item(i2);
- queryManager(stub.queryTag())->remove(stub.querySubscriptionId());
- }
- }
- }
- StringBuffer &getSubscriptionList(StringBuffer &buf)
- {
- unsigned tstart = msTick();
- {
- CHECKEDCRITICALBLOCK(stubsect,60000);
- ForEachItemIn(i, stubs)
- {
- CSubscriptionStub &stub = stubs.item(i);
- stub.getDetails(buf).append('\n');
- }
- }
- unsigned telapsed=msTick()-tstart;
- if (telapsed>1000)
- LOG(MCerror, unknownJob, "CDaliPublisherServer::getSubscriptionList took %dms",telapsed);
- return buf;
- }
- } *daliPublisherServer = NULL;
- StringBuffer &getSubscriptionList(StringBuffer &buf)
- {
- if (daliPublisherServer)
- daliPublisherServer->getSubscriptionList(buf);
- return buf;
- }
- void CSubscriptionStub::unlink()
- {
- if (daliPublisherServer)
- daliPublisherServer->unlink(this);
- }
- class CDaliSubscriptionManagerStub: implements ISubscriptionManager, public CInterface
- {
- // Client side
- unsigned tag;
- IArrayOf<ISubscription> subscriptions;
- Int64Array ids;
- CriticalSection subscriptionsect;
- public:
- IMPLEMENT_IINTERFACE;
- CDaliSubscriptionManagerStub(unsigned _tag)
- {
- tag = _tag;
- }
- ~CDaliSubscriptionManagerStub()
- {
- subscriptions.kill();
- }
- void add(ISubscription *subs,SubscriptionId id)
- {
- {
- CriticalBlock block(subscriptionsect);
- ids.append(id);
- subscriptions.append(*subs);
- }
- int fn = MSR_ADD_SUBSCRIPTION_PRIMARY;
- CMessageBuffer mb;
- mb.append(fn).append(tag).append(id);
- queryMyNode()->serialize(mb);
- const MemoryAttr &data = subs->queryData();
- size32_t dlen = (size32_t)data.length();
- mb.append(dlen);
- mb.append(dlen,data.get());
- try
- {
- queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SUBSCRIPTION_REQUEST);
- if (mb.length())
- throw deserializeException(mb);
- }
- catch (IException *e)
- {
- PrintExceptionLog(e,"Dali CDaliSubscriptionManagerStub::add");
- {
- CriticalBlock block(subscriptionsect);
- unsigned idx = ids.find(id);
- if (NotFound != idx)
- {
- ids.remove(idx);
- subscriptions.remove(idx);
- }
- }
- throw;
- }
- }
- void remove(SubscriptionId id)
- {
- CriticalBlock block(subscriptionsect);
- unsigned idx = ids.find(id);
- if (idx == NotFound)
- return;
- int fn = MSR_REMOVE_SUBSCRIPTION_PRIMARY;
- CMessageBuffer mb;
- mb.append(fn);
- mb.append(tag);
- mb.append(id);
- try {
- queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SUBSCRIPTION_REQUEST);
- }
- catch (IDaliClient_Exception *e) {
- PrintExceptionLog(e,"Dali CDaliSubscriptionManagerStub::remove");
- e->Release();
- }
- subscriptions.remove(idx);
- ids.remove(idx);
- }
- void notify(SubscriptionId id,MemoryBuffer &mb)
- {
- Linked<ISubscription> item;
- {
- CriticalBlock block(subscriptionsect);
- unsigned i = ids.find(id);
- if (i == NotFound)
- return;
- item.set(&subscriptions.item(i));
- }
- item->notify(mb);
- }
- void abort()
- {
- PrintLog("CDaliSubscriptionManagerStub aborting");
- CriticalBlock block(subscriptionsect);
- ForEachItemIn(i,subscriptions) {
- subscriptions.item(i).abort();
- }
- subscriptions.kill();
- ids.kill();
- PrintLog("CDaliSubscriptionManagerStub aborted");
- }
- };
- class CDaliPublisherClient: public Thread, public CDaliPublisher
- {
- CIArrayOf<CDaliSubscriptionManagerStub> managers;
- UnsignedArray tags;
- CheckedCriticalSection tagsect;
- bool stopped;
- public:
- CDaliPublisherClient()
- : Thread("CDaliPublisherClient")
- {
- stopped = true;
- start();
- }
- ~CDaliPublisherClient()
- {
- managers.kill();
- }
- ISubscriptionManager *queryManager(unsigned tag)
- {
- CHECKEDCRITICALBLOCK(tagsect,60000);
- unsigned i = tags.find(tag);
- if (i!=NotFound)
- return &managers.item(i);
- CDaliSubscriptionManagerStub *stub = new CDaliSubscriptionManagerStub(tag);
- tags.append(tag);
- managers.append(*stub);
- return stub;
- }
- int run()
- {
- ICoven &coven=queryCoven();
- CMessageHandler<CDaliPublisherClient> handler("CDaliPublisherClientMessages",this,&CDaliPublisherClient::processMessage);
- stopped = false;
- CMessageBuffer mb;
- stopped = false;
- while (!stopped) {
- mb.clear();
- try {
- #ifdef TRACE_QWAITING
- unsigned waiting = coven.probe(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL,NULL);
- if ((waiting!=0)&&(waiting%10==0))
- DBGLOG("QPROBE: MPTAG_DALI_SUBSCRIPTION_REQUEST has %d waiting",waiting);
- #endif
- if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL,NULL))
- handler.handleMessage(mb);
- else
- stopped = true;
- }
- catch (IException *e) {
- EXCLOG(e,"CDaliPublisherClient::run");
- e->Release();
- stopped = true;
- }
- }
- return 0;
- }
- void processMessage(CMessageBuffer &mb)
- {
- //ICoven &coven=queryCoven();
- //ICommunicator &comm=coven.queryComm();
- unsigned tag;
- mb.read(tag);
- SubscriptionId id;
- mb.read(id);
- unsigned i = tags.find(tag);
- if (i!=NotFound) {
- MemoryBuffer qb;
- size32_t dlen;
- mb.read(dlen);
- qb.append(dlen,mb.readDirect(dlen)); // this is bit inefficient - perhaps could be improved
- managers.item(i).notify(id,qb);
- }
- }
- void ready()
- {
- }
- void stop()
- {
- if (!stopped) {
- stopped = true;
- queryCoven().cancel(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL);
- }
- join();
- }
- };
- IDaliServer *createDaliPublisherServer()
- {
- assertex(!daliPublisherServer); // initialization problem
- daliPublisherServer = new CDaliPublisherServer();
- DaliPublisher = daliPublisherServer;
- return daliPublisherServer;
- }
- static CriticalSection subscriptionCrit;
- ISubscriptionManager *querySubscriptionManager(unsigned tag)
- {
- CriticalBlock block(subscriptionCrit);
- if (!DaliPublisher) {
- ICoven &coven=queryCoven();
- assertex(!coven.inCoven()); // Check not Coven server (if occurs - not initialized correctly;
- DaliPublisher = new CDaliPublisherClient();
- }
- return DaliPublisher->queryManager(tag);
- }
- void closeSubscriptionManager()
- {
- CriticalBlock block(subscriptionCrit);
- if (DaliPublisher) {
- try {
- DaliPublisher->stop();
- }
- catch (IMP_Exception *e)
- {
- if (e->errorCode()!=MPERR_link_closed)
- throw;
- e->Release();
- }
- catch (IDaliClient_Exception *e) {
- if (e->errorCode()!=DCERR_server_closed)
- throw;
- e->Release();
- }
- delete DaliPublisher;
- DaliPublisher = NULL;
- }
- }
- void registerSubscriptionManager(unsigned tag, ISubscriptionManager *manager)
- {
- assertex(daliPublisherServer); // initialization order check
- daliPublisherServer->registerSubscriptionManager(tag,manager);
- }
|