|
@@ -2365,7 +2365,7 @@ public:
|
|
|
};
|
|
|
|
|
|
void TestSDS3(IGroup *group)
|
|
|
-{
|
|
|
+{
|
|
|
class TSDS1 : public CInterface, implements IThreadFactory
|
|
|
{
|
|
|
public:
|
|
@@ -2389,6 +2389,129 @@ void TestSDS3(IGroup *group)
|
|
|
pool.clear();
|
|
|
}
|
|
|
|
|
|
+void TestNodeSubs()
|
|
|
+{
|
|
|
+ class CNodeSubPool : public CSimpleInterfaceOf<IThreadFactory>
|
|
|
+ {
|
|
|
+ class CNodeSubscriber : public CSimpleInterfaceOf<ISDSNodeSubscription>
|
|
|
+ {
|
|
|
+ public:
|
|
|
+ virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
|
|
|
+ {
|
|
|
+ PROGLOG("CNodeSubscriber notified");
|
|
|
+ }
|
|
|
+ };
|
|
|
+ SubscriptionId sid;
|
|
|
+ CriticalSection sidCrit;
|
|
|
+ Owned<ISDSNodeSubscription> subscriber;
|
|
|
+
|
|
|
+ void test()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ unsigned t = getRandom()%5;
|
|
|
+ switch (t)
|
|
|
+ {
|
|
|
+ case 0:
|
|
|
+ {
|
|
|
+ // connect thread
|
|
|
+ PROGLOG("Creating SDS node");
|
|
|
+ Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_CREATE|RTM_LOCK_WRITE, INFINITE);
|
|
|
+ MilliSleep(5+getRandom()%50);
|
|
|
+ conn.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 1:
|
|
|
+ {
|
|
|
+ // node sub test
|
|
|
+ CriticalBlock b(sidCrit);
|
|
|
+ if (!sid)
|
|
|
+ {
|
|
|
+ PROGLOG("Subscribing to node");
|
|
|
+ sid = querySDS().subscribeExact("/nodesubtest", *subscriber, false);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 2:
|
|
|
+ {
|
|
|
+ // node sub test
|
|
|
+ CriticalBlock b(sidCrit);
|
|
|
+ if (sid)
|
|
|
+ {
|
|
|
+ PROGLOG("Unsubscribing to node");
|
|
|
+ querySDS().unsubscribeExact(sid);
|
|
|
+ sid = 0;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 3:
|
|
|
+ {
|
|
|
+ PROGLOG("Deleting node");
|
|
|
+ Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_LOCK_WRITE, INFINITE);
|
|
|
+ if (conn)
|
|
|
+ conn->close(true);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 4:
|
|
|
+ {
|
|
|
+ PROGLOG("Gathering subscriber info");
|
|
|
+ StringBuffer info;
|
|
|
+ querySDS().getSubscribers(info);
|
|
|
+ if (info.length())
|
|
|
+ PROGLOG("Subscribers: \n%s", info.str());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ PrintExceptionLog(e, NULL);
|
|
|
+ e->Release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ class CNodeSubThread : public CInterface, implements IPooledThread
|
|
|
+ {
|
|
|
+ CNodeSubPool &owner;
|
|
|
+ public:
|
|
|
+ IMPLEMENT_IINTERFACE;
|
|
|
+
|
|
|
+ CNodeSubThread(CNodeSubPool &_owner) : owner(_owner) { }
|
|
|
+ virtual void init(void *param)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ virtual void main()
|
|
|
+ {
|
|
|
+ owner.test();
|
|
|
+ }
|
|
|
+ virtual bool stop() { return true; }
|
|
|
+ virtual bool canReuse() { return true; }
|
|
|
+ };
|
|
|
+ public:
|
|
|
+ CNodeSubPool()
|
|
|
+ {
|
|
|
+ sid = 0;
|
|
|
+ subscriber.setown(new CNodeSubscriber());
|
|
|
+ }
|
|
|
+ virtual IPooledThread *createNew()
|
|
|
+ {
|
|
|
+ return new CNodeSubThread(*this);
|
|
|
+ }
|
|
|
+ } poolFactory;
|
|
|
+
|
|
|
+ Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000);
|
|
|
+
|
|
|
+ unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10;
|
|
|
+ for (unsigned t=0; t<tests; t++)
|
|
|
+ {
|
|
|
+ pool->start(NULL);
|
|
|
+ }
|
|
|
+
|
|
|
+ PrintLog("Joining all TSDSThread running threads");
|
|
|
+ pool->joinAll();
|
|
|
+ pool.clear();
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
void TestSDSXPaths()
|
|
|
{
|
|
|
const char *testXML =
|
|
@@ -2901,7 +3024,7 @@ void usage(const char *error=NULL)
|
|
|
{
|
|
|
if (error) printf("%s\n", error);
|
|
|
printf("usage: DATEST <server_ip:port>* [/test <name> [<test params...>] [/NITER <iterations>]\n");
|
|
|
- printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE\n");
|
|
|
+ printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE | NODESUBS\n");
|
|
|
printf("eg: datest . /test QTEST put -- one coven server running locally, running qtest with param \"put\"\n");
|
|
|
printf(" datest eq0001016 eq0001017 -- two coven servers, use default test %s\n", DEFAULT_TEST);
|
|
|
}
|
|
@@ -3048,6 +3171,7 @@ int main(int argc, char* argv[])
|
|
|
case 9: TestExternal(); break;
|
|
|
case 10: TestSubLocks(); break;
|
|
|
case 11: TestSDS3(group); break;
|
|
|
+ case 12: TestNodeSubs(); break;
|
|
|
}
|
|
|
}
|
|
|
else if (TEST("DFS"))
|
|
@@ -3074,6 +3198,8 @@ int main(int argc, char* argv[])
|
|
|
TestSDS2();
|
|
|
else if (TEST("SDS3"))
|
|
|
TestSDS3(group);
|
|
|
+ else if (TEST("NODESUBS"))
|
|
|
+ TestNodeSubs();
|
|
|
else if (TEST("XPATHS"))
|
|
|
TestSDSXPaths();
|
|
|
else if (TEST("STRESS"))
|