daaudit.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jfile.hpp"
  16. #include "jmisc.hpp"
  17. #include "jtime.hpp"
  18. #include "jregexp.hpp"
  19. #include "jexcept.hpp"
  20. #include "jsort.hpp"
  21. #include "jptree.hpp"
  22. #include "mpbuff.hpp"
  23. #include "mpcomm.hpp"
  24. #include "mputil.hpp"
  25. #include "mputil.hpp"
  26. #include "daserver.hpp"
  27. #include "daclient.hpp"
  28. #include "daaudit.hpp"
  29. #ifdef _MSC_VER
  30. #pragma warning (disable : 4355)
  31. #endif
  32. LogMsgCategory const daliAuditLogCat(MSGAUD_audit, MSGCLS_information, DefaultDetail);
  33. enum MAuditRequestKind {
  34. MAR_QUERY
  35. };
  36. #define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite
  37. #define BUFFSIZE (0x4000)
  38. #define MAXLINESIZE (0x1000)
  39. #define DATEEND 10
  40. #define TIMEEND 19
  41. class CDaliAuditServer: public IDaliServer, public Thread
  42. { // Server side
  43. bool stopped;
  44. CriticalSection handlemessagesect;
  45. StringAttr auditdir;
  46. static int compfile(IInterface * const *v1, IInterface * const *v2) // for bAdd only
  47. {
  48. IFile *e1 = (IFile *)*v1;
  49. IFile *e2 = (IFile *)*v2;
  50. return strcmp(e1->queryFilename(),e2->queryFilename());
  51. }
  52. bool matchrow(const char *row,const char *match)
  53. {
  54. loop {
  55. char m = *match;
  56. if (!m)
  57. break;
  58. match++;
  59. char c = *row;
  60. row++;
  61. if (c=='\n')
  62. return false;
  63. if (m==',') { // skip fields
  64. while (c!=',') {
  65. c = *row;
  66. if (c=='\n')
  67. return (*match==0);
  68. row++;
  69. }
  70. }
  71. else {
  72. loop {
  73. if (m!=c)
  74. return false;
  75. m = *(match++);
  76. c = *(row++);
  77. if (c=='\n')
  78. return (m==0);
  79. if (!m)
  80. return (c==',');
  81. if (m==',') {
  82. if (c!=',')
  83. return false;
  84. break;
  85. }
  86. }
  87. }
  88. }
  89. return true;
  90. }
  91. public:
  92. void testmatch()
  93. {
  94. assertex(matchrow("Aa,Bb,Cc\n","Aa,Bb,Cc"));
  95. assertex(matchrow("Aa,Bb,Cc\n","Aa,,Cc"));
  96. assertex(matchrow("Aa,Bb,Cc\n","Aa,Bb,"));
  97. assertex(matchrow("Aa,Bb,Cc\n",",Bb,Cc"));
  98. assertex(matchrow("Aa,Bb,Cc\n",",Bb,"));
  99. assertex(matchrow("Aa,Bb,Cc\n",",Bb"));
  100. assertex(matchrow("Aa,Bb,Cc\n",",,Cc"));
  101. assertex(matchrow("A\n","A"));
  102. assertex(!matchrow("A\n","A,"));
  103. assertex(!matchrow("A\n",",A"));
  104. assertex(!matchrow("Aa,Bb,Cc\n","Aa,bBb,Cc"));
  105. assertex(!matchrow("Aa,Bb,Cc\n","Aa,Bbb,Cc"));
  106. assertex(!matchrow("Aa,Bcb,Cc\n","Aa,Bb,Cc"));
  107. assertex(!matchrow("Aa\n","Aa,Bb,Cc"));
  108. assertex(!matchrow("Aa,Bb,Ccd\n","Aa,Bb,Cc"));
  109. }
  110. unsigned scan(const CDateTime &from,const CDateTime &to,const char *match,unsigned start,unsigned maxn,MemoryBuffer &res, bool fixlocal)
  111. {
  112. if (!match)
  113. match = "";
  114. StringBuffer fromstr;
  115. from.getString(fromstr,fixlocal);
  116. fromstr.setCharAt(DATEEND,' ');
  117. const char *frommatch = fromstr.str();
  118. StringBuffer tostr;
  119. to.getString(tostr,fixlocal);
  120. tostr.setCharAt(DATEEND,' ');
  121. const char *tomatch = tostr.str();
  122. CDateTime dt;
  123. Owned<IFile> dir = createIFile(auditdir.get());
  124. Owned<IDirectoryIterator> files = dir->directoryFiles("DaAudit.*");
  125. IArrayOf<IFile> filelist;
  126. StringBuffer fname;
  127. ForEach(*files) {
  128. files->getName(fname.clear());
  129. unsigned fyear = atoi(fname.str()+8);
  130. unsigned fmonth = atoi(fname.str()+13);
  131. unsigned fday = atoi(fname.str()+16);
  132. if (fday&&fmonth&&fyear) {
  133. dt.setDate(fyear,fmonth,fday);
  134. if ((dt.compareDate(to)<=0)&&(dt.compareDate(from)>=0)) {
  135. IInterface *e = &files->get();
  136. bool added;
  137. filelist.bAdd(e,compfile,added);
  138. }
  139. }
  140. }
  141. unsigned n = 0;
  142. ForEachItemIn(fi,filelist) {
  143. Owned<IFileIO> fio = filelist.item(fi).openShared(IFOread,IFSHfull);
  144. if (!fio) {
  145. StringBuffer fn;
  146. files->getName(fn);
  147. WARNLOG("Could not open %s",fn.str());
  148. continue;
  149. }
  150. bool eof=false;
  151. offset_t fpos = 0;
  152. MemoryAttr mba;
  153. char *buf = (char *)mba.allocate(BUFFSIZE+MAXLINESIZE+1);
  154. size32_t lbsize = 0;
  155. char *p=NULL;
  156. loop {
  157. Retry:
  158. if (lbsize<MAXLINESIZE) {
  159. if (!eof) {
  160. if (lbsize&&(p!=buf))
  161. memmove(buf,p,lbsize);
  162. p = buf;
  163. size32_t rd = fio->read(fpos,BUFFSIZE,buf+lbsize);
  164. if (rd==0) {
  165. eof = true;
  166. while (lbsize&&(buf[lbsize-1]!='\n'))
  167. lbsize--; // remove unfinished line
  168. if (lbsize==0)
  169. break;
  170. }
  171. else {
  172. lbsize += rd;
  173. fpos += rd;
  174. }
  175. }
  176. else if (lbsize==0)
  177. break;
  178. }
  179. if ((p[4]!='-')&&(p[13]=='-')) {
  180. p+=9; // kludge while msgid prefix about
  181. lbsize-=9;
  182. goto Retry;
  183. }
  184. size32_t len = 0;
  185. while ((len<MAXLINESIZE)&&(p[len]!='\n'))
  186. len++;
  187. if ((len>TIMEEND+1)&&(memcmp(p,frommatch,TIMEEND)>=0)&&(memcmp(p,tomatch,TIMEEND)<0)) {
  188. if (!*match||matchrow(p+TIMEEND+2,match)) {
  189. if (start)
  190. start--;
  191. else {
  192. res.append(len+1,p);
  193. n++;
  194. if (n>=maxn)
  195. return n;
  196. }
  197. }
  198. }
  199. len++;
  200. lbsize-=len;
  201. p+=len;
  202. }
  203. }
  204. return n;
  205. }
  206. public:
  207. IMPLEMENT_IINTERFACE;
  208. CDaliAuditServer(const char *_auditdir)
  209. : auditdir(_auditdir),Thread("CDaliAuditServer")
  210. {
  211. stopped = true;
  212. }
  213. ~CDaliAuditServer()
  214. {
  215. }
  216. void start()
  217. {
  218. Thread::start();
  219. }
  220. void ready()
  221. {
  222. }
  223. void suspend()
  224. {
  225. }
  226. void stop()
  227. {
  228. if (!stopped) {
  229. stopped = true;
  230. queryCoven().cancel(RANK_ALL,MPTAG_DALI_AUDIT_REQUEST);
  231. }
  232. join();
  233. }
  234. int run()
  235. {
  236. ICoven &coven=queryCoven();
  237. CMessageBuffer mb;
  238. stopped = false;
  239. while (!stopped) {
  240. try {
  241. mb.clear();
  242. if (coven.recv(mb,RANK_ALL,MPTAG_DALI_AUDIT_REQUEST,NULL)) {
  243. processMessage(mb); // not synchronous to ensure queue operations handled in correct order
  244. }
  245. else
  246. stopped = true;
  247. }
  248. catch (IException *e)
  249. {
  250. EXCLOG(e, "CDaliAuditServer:run");
  251. e->Release();
  252. stopped = true;
  253. }
  254. }
  255. return 0;
  256. }
  257. void processMessage(CMessageBuffer &mb)
  258. {
  259. CriticalBlock block(handlemessagesect);
  260. ICoven &coven=queryCoven();
  261. int fn;
  262. mb.read(fn);
  263. unsigned ret = 0;
  264. try {
  265. switch (fn) {
  266. case MAR_QUERY:
  267. {
  268. CDateTime from;
  269. CDateTime to;
  270. StringAttr match;
  271. unsigned start;
  272. unsigned maxn;
  273. from.deserialize(mb);
  274. to.deserialize(mb);
  275. mb.read(match).read(start).read(maxn);
  276. bool fixlocal = false;
  277. if (mb.remaining()>sizeof(bool))
  278. mb.read(fixlocal);
  279. mb.clear().append((unsigned)0).append((unsigned)0);
  280. unsigned n = scan(from,to,match,start,maxn,mb,fixlocal);
  281. mb.writeDirect(sizeof(unsigned),sizeof(unsigned),&n);
  282. }
  283. break;
  284. }
  285. }
  286. catch (IException *e)
  287. {
  288. StringBuffer s;
  289. e->errorMessage(s);
  290. EXCLOG(e, "Audit Request Server - handleMessage");
  291. mb.clear().append(e->errorCode()).append(s.str());
  292. e->Release();
  293. }
  294. coven.reply(mb);
  295. }
  296. void nodeDown(rank_t rank)
  297. {
  298. assertex(!"TBD");
  299. }
  300. };
  301. IDaliServer *createDaliAuditServer(const char *auditdir)
  302. {
  303. return new CDaliAuditServer(auditdir);
  304. }
  305. unsigned queryAuditLogs(const CDateTime &from,const CDateTime &to, const char *match,StringAttrArray &out,unsigned start, unsigned max)
  306. {
  307. CMessageBuffer mb;
  308. mb.append((int)MAR_QUERY);
  309. from.serialize(mb);
  310. to.serialize(mb);
  311. if (!match||(strcmp(match,"*")==0))
  312. match = "";
  313. bool fixlocal = true;
  314. mb.append(match).append(start).append(max).append(fixlocal);
  315. queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_AUDIT_REQUEST);
  316. unsigned ret;
  317. mb.read(ret);
  318. if (ret) {
  319. StringAttr except;
  320. mb.read(except);
  321. throw MakeStringException(ret,"Audit Request Server Exception: %s",except.get());
  322. }
  323. unsigned n;
  324. mb.read(n);
  325. const char *base = mb.toByteArray()+mb.getPos();
  326. const char *p = base;
  327. unsigned l = mb.length()-mb.getPos();
  328. for (unsigned i=0;i<n;i++) {
  329. const char *q=p;
  330. while (((unsigned)(q-base)<l)&&(*q!='\n'))
  331. q++;
  332. out.append(*new StringAttrItem(p,q-p));
  333. p = q+1;
  334. }
  335. return n;
  336. }