123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include "schedulectrl.hpp"
- #include "scheduleread.hpp"
- #include "eventqueue.hpp"
- #include "daclient.hpp"
- void usage(int exitval = 2)
- {
- printf("Usage:\n"
- "scheduleadmin help\n"
- "scheduleadmin <DALI> add <WUID>\n"
- "scheduleadmin <DALI> remove <WUID>\n"
- "scheduleadmin <DALI> removeall\n"
- "scheduleadmin <DALI> servers\n"
- "scheduleadmin <DALI> list <eclserver> (<event name>)\n"
- "scheduleadmin <DALI> monitor <eclserver> (<event name>)\n"
- "scheduleadmin <DALI> cleanup\n"
- "scheduleadmin <DALI> push eventname eventtext (<wuid>)\n"
- #if _DEBUG
- "scheduleadmin <DALI> testpull <eclserver queue>\n"
- #endif
- );
- releaseAtoms();
- ExitModuleObjects();
- exit(exitval);
- }
- class AdminScheduleMonitor : public CInterface
- {
- private:
- class SubscriptionProxy : public CInterface, implements IScheduleSubscriber
- {
- public:
- SubscriptionProxy(AdminScheduleMonitor const * _owner) : owner(_owner) {}
- IMPLEMENT_IINTERFACE;
- virtual void notify() { owner->notify(); }
- private:
- AdminScheduleMonitor const * owner;
- };
- public:
- AdminScheduleMonitor(char const * serverName, char const * eventName, bool subscribe)
- {
- if(subscribe)
- {
- subsProxy.setown(new SubscriptionProxy(this));
- reader.setown(getSubscribingScheduleReader(serverName, LINK(subsProxy), eventName));
- }
- else
- {
- reader.setown(getScheduleReader(serverName, eventName));
- }
- }
- ~AdminScheduleMonitor() { if(subsProxy) printf("...done\n"); }
- void dump() const
- {
- dumpTime();
- Owned<IScheduleReaderIterator> iter(reader->getIterator());
- StringBuffer buff;
- while(iter->isValidEventName())
- {
- printf("%s\n", iter->getEventName(buff.clear()).str());
- while(iter->isValidEventText())
- {
- printf(" %s\n", iter->getEventText(buff.clear()).str());
- while(iter->isValidWuid())
- {
- printf(" %s\n", iter->getWuid(buff.clear()).str());
- iter->nextWuid();
- }
- iter->nextEventText();
- }
- iter->nextEventName();
- }
- if(subsProxy) printf("monitoring...\n");
- }
- void notify() const
- {
- printf("\n----------------------------------------\n\n");
- dump();
- }
- private:
- void dumpTime() const
- {
- CDateTime time;
- StringBuffer tstr;
- time.setNow();
- time.getString(tstr, false);
- printf("%s\n\n", tstr.str());
- }
-
- private:
- Owned<SubscriptionProxy> subsProxy;
- Owned<IScheduleReader> reader;
- };
- class AdminScheduleEventTester : public CInterface, implements IExceptionHandler
- {
- public:
- class EventExecutor : public CInterface, implements IScheduleEventExecutor
- {
- public:
- EventExecutor() {}
- IMPLEMENT_IINTERFACE;
- virtual void execute(char const * wuid, char const * name, char const * text)
- {
- CDateTime nowdt;
- StringBuffer nowstr;
- nowdt.setNow();
- nowdt.getString(nowstr);
- printf("Pass event to workunit %s: name=%s text=%s (at %s)\n", wuid, name, text, nowstr.str());
- }
- };
- AdminScheduleEventTester(char const * _serverName) : serverName(_serverName) {}
- void run()
- {
- printf("watching event queue...\n");
- Owned<EventExecutor> executor(new EventExecutor);
- Owned<IScheduleEventProcessor> processor(getScheduleEventProcessor(serverName.get(), LINK(executor), this));
- processor->start();
- getchar();
- processor->stop();
- printf("...done\n");
- }
- virtual bool fireException(IException *e) { StringBuffer msg; IERRLOG("Scheduler error (skipping event): %d: %s", e->errorCode(), e->errorMessage(msg).str()); e->Release(); return true; }
- private:
- StringAttr serverName;
- };
- interface IScheduleTask : extends IInterface
- {
- public:
- virtual void doit() = 0;
- };
- class AddScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- AddScheduleTask(char const * _wuid) : wuid(_wuid) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit () { scheduleWorkUnit(wuid.get()); }
- private:
- StringAttr wuid;
- };
- class RemoveScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- RemoveScheduleTask(char const * _wuid) : wuid(_wuid) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit () { descheduleWorkUnit(wuid.get()); }
- private:
- StringAttr wuid;
- };
- class RemoveAllScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- RemoveAllScheduleTask() {}
- IMPLEMENT_IINTERFACE;
- virtual void doit () { descheduleAllWorkUnits(); }
- };
- class ListServersScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- ListServersScheduleTask() {}
- IMPLEMENT_IINTERFACE;
- virtual void doit()
- {
- Owned<ISchedulerListIterator> iter = getSchedulerList();
- for(iter->first(); iter->isValid(); iter->next())
- printf("%s\n", iter->query());
- }
- };
- class ReadNamedScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- ReadNamedScheduleTask(char const * _serverName, char const * _eventName, bool _subscribe) : serverName(_serverName), eventName(_eventName), subscribe(_subscribe) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit() { Owned<AdminScheduleMonitor> monitor(new AdminScheduleMonitor(serverName.get(), eventName.get(), subscribe)); monitor->dump(); if(subscribe) getchar(); }
- private:
- StringAttr serverName;
- StringAttr eventName;
- bool subscribe;
- };
- class ReadFullScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- ReadFullScheduleTask(char const * _serverName, bool _subscribe) : serverName(_serverName), subscribe(_subscribe) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit() { Owned<AdminScheduleMonitor> monitor(new AdminScheduleMonitor(serverName.get(), NULL, subscribe)); monitor->dump(); if(subscribe) getchar(); }
- private:
- StringAttr serverName;
- bool subscribe;
- };
- class CleanupScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- CleanupScheduleTask() {}
- IMPLEMENT_IINTERFACE;
- virtual void doit() { cleanupWorkUnitSchedule(); }
- };
- class PushScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- PushScheduleTask(char const * _name, char const * _text, const char * _target)
- : name(_name), text(_text), target(_target) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit()
- {
- Owned<IScheduleEventPusher> pusher(getScheduleEventPusher());
- unsigned count = pusher->push(name.get(), text.get(), target.get());
- PROGLOG("Pushed event to %u active schedulers", count);
- }
- private:
- StringAttr name;
- StringAttr text;
- StringAttr target;
- };
- #ifdef _DEBUG
- class TestPullScheduleTask : public CInterface, implements IScheduleTask
- {
- public:
- TestPullScheduleTask(char const * _serverName) : serverName(_serverName) {}
- IMPLEMENT_IINTERFACE;
- virtual void doit() { Owned<AdminScheduleEventTester> tester(new AdminScheduleEventTester(serverName.get())); tester->run(); }
- private:
- StringAttr serverName;
- };
- #endif
- int main(int argc, char * const * argv)
- {
- InitModuleObjects();
- if((argc==2) && (stricmp(argv[1], "help")==0))
- usage(0);
- if(argc<3) usage();
- Owned<IScheduleTask> task;
- try
- {
- char const * cmd = argv[2];
- if(stricmp(cmd, "add")==0)
- if(argc==4)
- task.setown(new AddScheduleTask(argv[3]));
- else
- usage();
- else if(stricmp(cmd, "remove")==0)
- if(argc==4)
- task.setown(new RemoveScheduleTask(argv[3]));
- else
- usage();
- else if(stricmp(cmd, "removeall")==0)
- if(argc==3)
- task.setown(new RemoveAllScheduleTask());
- else
- usage();
- else if(stricmp(cmd, "servers")==0)
- if(argc==3)
- task.setown(new ListServersScheduleTask());
- else
- usage();
- else if(stricmp(cmd, "list")==0)
- {
- if(argc==5)
- task.setown(new ReadNamedScheduleTask(argv[3], argv[4], false));
- else if(argc==4)
- task.setown(new ReadFullScheduleTask(argv[3], false));
- else
- usage();
- }
- else if(stricmp(cmd, "monitor")==0)
- {
- if(argc==5)
- task.setown(new ReadNamedScheduleTask(argv[3], argv[4], true));
- else if(argc==4)
- task.setown(new ReadFullScheduleTask(argv[3], true));
- else
- usage();
- }
- else if(stricmp(cmd, "cleanup")==0)
- if(argc==3)
- task.setown(new CleanupScheduleTask());
- else
- usage();
- else if(stricmp(cmd, "push")==0)
- if(argc==5)
- task.setown(new PushScheduleTask(argv[3], argv[4], NULL));
- else if(argc==6)
- task.setown(new PushScheduleTask(argv[3], argv[4], argv[5]));
- else
- usage();
- #ifdef _DEBUG
- else if(stricmp(cmd, "testpull")==0)
- if(argc==4)
- task.setown(new TestPullScheduleTask(argv[3]));
- else
- usage();
- #endif
- else
- usage();
- Owned<IGroup> serverGroup = createIGroup(argv[1], DALI_SERVER_PORT);
- initClientProcess(serverGroup, DCR_ScheduleAdmin);
- task->doit();
- closedownClientProcess();
- }
- catch(IException * e)
- {
- EXCLOG(e);
- e->Release();
- }
- releaseAtoms();
- ExitModuleObjects();
- return 0;
- }
|