ccdsnmp.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <algorithm>
  14. #include "jlog.hpp"
  15. #include "jtime.hpp"
  16. #include "jptree.hpp"
  17. #include "jqueue.tpp"
  18. #include "mpbase.hpp"
  19. #include "math.h"
  20. #include "ccdsnmp.hpp"
  21. #include "jhtree.hpp"
  22. #include "thirdparty.h"
  23. #include "roxiemem.hpp"
  24. #include "udplib.hpp"
  25. #define DEFAULT_PULSE_INTERVAL 30
  26. atomic_t queryCount;
  27. RoxieQueryStats unknownQueryStats;
  28. RoxieQueryStats loQueryStats;
  29. RoxieQueryStats hiQueryStats;
  30. RoxieQueryStats slaQueryStats;
  31. RoxieQueryStats combinedQueryStats;
  32. atomic_t retriesIgnoredPrm;
  33. atomic_t retriesIgnoredSec;
  34. atomic_t retriesNeeded;
  35. atomic_t retriesReceivedPrm;
  36. atomic_t retriesReceivedSec;
  37. atomic_t retriesSent;
  38. atomic_t rowsIn;
  39. atomic_t ibytiPacketsFromSelf;
  40. atomic_t ibytiPacketsSent;
  41. atomic_t ibytiPacketsWorked;
  42. atomic_t ibytiPacketsHalfWorked;
  43. atomic_t ibytiPacketsReceived;
  44. atomic_t ibytiPacketsTooLate;
  45. atomic_t ibytiNoDelaysPrm;
  46. atomic_t ibytiNoDelaysSec;
  47. atomic_t packetsSent;
  48. atomic_t packetsReceived;
  49. atomic_t resultsReceived;
  50. atomic_t indexRecordsRead;
  51. atomic_t postFiltered;
  52. atomic_t abortsSent;
  53. atomic_t activitiesStarted;
  54. atomic_t activitiesCompleted;
  55. atomic_t diskReadStarted;
  56. atomic_t diskReadCompleted;
  57. atomic_t globalSignals;
  58. atomic_t globalLocks;
  59. atomic_t numFilesToProcess;
  60. CriticalSection counterCrit;
  61. unsigned queueLength;
  62. unsigned maxQueueLength;
  63. unsigned rowsOut;
  64. unsigned maxScanLength;
  65. unsigned totScanLength;
  66. unsigned totScans;
  67. unsigned meanScanLength;
  68. #ifdef TIME_PACKETS
  69. unsigned __int64 packetWaitElapsed;
  70. unsigned packetWaitMax;
  71. atomic_t packetWaitCount;
  72. unsigned __int64 packetRunElapsed;
  73. unsigned packetRunMax;
  74. atomic_t packetRunCount;
  75. #endif
  76. unsigned lastQueryDate = 0;
  77. unsigned lastQueryTime = 0;
  78. unsigned slavesActive = 0;
  79. unsigned maxSlavesActive = 0;
  80. #define addMetric(a, b) doAddMetric(a, #a, b)
  81. interface INamedMetric : extends IInterface
  82. {
  83. virtual long getValue() = 0;
  84. virtual bool isCumulative() = 0;
  85. virtual void resetValue() = 0;
  86. };
  87. interface ITimerCallback : extends IInterface
  88. {
  89. virtual void onTimer() = 0;
  90. };
  91. class AtomicMetric : implements INamedMetric, public CInterface
  92. {
  93. atomic_t &counter;
  94. bool cumulative;
  95. public:
  96. IMPLEMENT_IINTERFACE;
  97. AtomicMetric(atomic_t &_counter, bool _cumulative)
  98. : counter(_counter), cumulative(_cumulative)
  99. {
  100. }
  101. virtual long getValue()
  102. {
  103. return atomic_read(&counter);
  104. }
  105. virtual bool isCumulative()
  106. {
  107. return cumulative;
  108. }
  109. virtual void resetValue()
  110. {
  111. if (cumulative)
  112. atomic_set(&counter, 0);
  113. }
  114. };
  115. class RelaxedAtomicMetric : implements INamedMetric, public CInterface
  116. {
  117. RelaxedAtomic<unsigned> &counter;
  118. const bool cumulative;
  119. public:
  120. IMPLEMENT_IINTERFACE;
  121. RelaxedAtomicMetric(RelaxedAtomic<unsigned> &_counter, bool _cumulative)
  122. : counter(_counter), cumulative(_cumulative)
  123. {
  124. }
  125. virtual long getValue()
  126. {
  127. return counter.load();
  128. }
  129. virtual bool isCumulative()
  130. {
  131. return cumulative;
  132. }
  133. virtual void resetValue()
  134. {
  135. if (cumulative)
  136. counter.store(0);
  137. }
  138. };
  139. class CounterMetric : implements INamedMetric, public CInterface
  140. {
  141. protected:
  142. unsigned &counter;
  143. bool cumulative;
  144. public:
  145. IMPLEMENT_IINTERFACE;
  146. CounterMetric(unsigned &_counter, bool _cumulative)
  147. : counter(_counter), cumulative(_cumulative)
  148. {
  149. }
  150. virtual long getValue()
  151. {
  152. CriticalBlock c(counterCrit);
  153. return counter;
  154. }
  155. virtual bool isCumulative()
  156. {
  157. return cumulative;
  158. }
  159. virtual void resetValue()
  160. {
  161. if (cumulative)
  162. {
  163. CriticalBlock c(counterCrit);
  164. counter = 0;
  165. }
  166. }
  167. };
  168. typedef unsigned (*AccessorFunction)();
  169. class FunctionMetric : implements INamedMetric, public CInterface
  170. {
  171. AccessorFunction accessor;
  172. public:
  173. IMPLEMENT_IINTERFACE;
  174. FunctionMetric(AccessorFunction _accessor)
  175. : accessor(_accessor)
  176. {
  177. }
  178. virtual long getValue()
  179. {
  180. return accessor();
  181. }
  182. virtual bool isCumulative() { return false; }
  183. virtual void resetValue() { }
  184. };
  185. class UserMetric : implements INamedMetric, public CInterface
  186. {
  187. protected:
  188. Owned <IUserMetric> metric;
  189. public:
  190. IMPLEMENT_IINTERFACE;
  191. UserMetric(const char *name, const char *regex)
  192. {
  193. metric.setown(createUserMetric(name, regex));
  194. }
  195. virtual long getValue()
  196. {
  197. return (long) metric->queryCount();
  198. }
  199. virtual bool isCumulative() { return true; }
  200. virtual void resetValue()
  201. {
  202. metric->reset();
  203. }
  204. };
  205. class TickProvider : public Thread
  206. {
  207. IArrayOf<ITimerCallback> listeners;
  208. CriticalSection crit;
  209. Semaphore stopped;
  210. void doTicks()
  211. {
  212. CriticalBlock c(crit);
  213. ForEachItemIn(idx, listeners)
  214. {
  215. listeners.item(idx).onTimer();
  216. }
  217. }
  218. public:
  219. TickProvider() : Thread("TickProvider")
  220. {
  221. }
  222. int run()
  223. {
  224. for (;;)
  225. {
  226. if (stopped.wait(10000))
  227. break;
  228. doTicks();
  229. }
  230. return 0;
  231. }
  232. void addListener(ITimerCallback *l)
  233. {
  234. listeners.append(*LINK(l));
  235. }
  236. void stop()
  237. {
  238. stopped.signal();
  239. join();
  240. }
  241. };
  242. class IntervalMetric : implements INamedMetric, implements ITimerCallback, public CInterface
  243. {
  244. Linked<INamedMetric> base;
  245. CriticalSection crit;
  246. unsigned lastSnapshotTime;
  247. long lastSnapshotValue;
  248. unsigned minInterval;
  249. long value;
  250. void takeSnapshot()
  251. {
  252. CriticalBlock c(crit);
  253. unsigned now = msTick();
  254. unsigned period = now - lastSnapshotTime;
  255. if (period >= minInterval)
  256. {
  257. long newValue = base->getValue();
  258. value = ((newValue - lastSnapshotValue) * 1000) / period;
  259. lastSnapshotTime = now;
  260. lastSnapshotValue = newValue;
  261. }
  262. }
  263. public:
  264. IMPLEMENT_IINTERFACE;
  265. IntervalMetric(INamedMetric *_base, unsigned _minInterval=1000) : base(_base), minInterval(_minInterval)
  266. {
  267. lastSnapshotTime = msTick();
  268. lastSnapshotValue = 0;
  269. value = 0;
  270. }
  271. virtual void onTimer()
  272. {
  273. takeSnapshot();
  274. }
  275. virtual long getValue()
  276. {
  277. takeSnapshot();
  278. return value;
  279. }
  280. virtual bool isCumulative() { return false; }
  281. virtual void resetValue()
  282. {
  283. CriticalBlock c(crit);
  284. lastSnapshotTime = msTick();
  285. lastSnapshotValue = 0;
  286. value = 0;
  287. }
  288. };
  289. class RatioMetric : implements INamedMetric, public CInterface
  290. {
  291. atomic_t &counter;
  292. unsigned __int64 &elapsed;
  293. public:
  294. IMPLEMENT_IINTERFACE;
  295. RatioMetric(atomic_t &_counter, unsigned __int64 &_elapsed) : counter(_counter) , elapsed(_elapsed) {}
  296. virtual long getValue()
  297. {
  298. CriticalBlock c(counterCrit);
  299. unsigned count = atomic_read(&counter);
  300. if (count)
  301. return (unsigned) (elapsed / count);
  302. else
  303. return 0;
  304. }
  305. virtual bool isCumulative() { return true; }
  306. virtual void resetValue()
  307. {
  308. CriticalBlock c(counterCrit);
  309. atomic_set(&counter, 0);
  310. }
  311. };
  312. class UnsignedRatioMetric : implements INamedMetric, public CInterface
  313. {
  314. unsigned &counter;
  315. unsigned __int64 &elapsed;
  316. public:
  317. IMPLEMENT_IINTERFACE;
  318. UnsignedRatioMetric(unsigned &_counter, unsigned __int64 &_elapsed) : counter(_counter) , elapsed(_elapsed) {}
  319. virtual long getValue()
  320. {
  321. CriticalBlock c(counterCrit);
  322. if (counter)
  323. return (unsigned) (elapsed / counter);
  324. else
  325. return 0;
  326. }
  327. virtual bool isCumulative() { return true; }
  328. virtual void resetValue()
  329. {
  330. CriticalBlock c(counterCrit);
  331. counter = 0;
  332. }
  333. };
  334. class CRoxieMetricsManager : implements IRoxieMetricsManager, public CInterface
  335. {
  336. public:
  337. IMPLEMENT_IINTERFACE;
  338. CRoxieMetricsManager();
  339. ~CRoxieMetricsManager();
  340. virtual long getValue(const char * name);
  341. void dumpMetrics();
  342. StringBuffer &getMetrics(StringBuffer &xml);
  343. void resetMetrics();
  344. void doAddMetric(atomic_t &counter, const char *name, unsigned interval);
  345. void doAddMetric(RelaxedAtomic<unsigned> &counter, const char *name, unsigned interval);
  346. void doAddMetric(unsigned &counter, const char *name, unsigned interval);
  347. void doAddMetric(INamedMetric *n, const char *name, unsigned interval);
  348. void doAddMetric(AccessorFunction function, const char *name, unsigned interval);
  349. void addRatioMetric(atomic_t &counter, const char *name, unsigned __int64 &elapsed);
  350. void addRatioMetric(unsigned &counter, const char *name, unsigned __int64 &elapsed);
  351. void addUserMetric(const char *name, const char *regex);
  352. private:
  353. MapStringToMyClassViaBase<INamedMetric, INamedMetric> metricMap;
  354. bool started;
  355. TickProvider ticker;
  356. };
  357. void RoxieQueryStats::addMetrics(CRoxieMetricsManager *snmpManager, const char *prefix, unsigned interval)
  358. {
  359. StringBuffer name;
  360. snmpManager->doAddMetric(count, name.clear().append(prefix).append("QueryCount"), interval);
  361. snmpManager->doAddMetric(failedCount, name.clear().append(prefix).append("QueryFailed"), interval);
  362. snmpManager->doAddMetric(active, name.clear().append(prefix).append("QueryActive"), 0);
  363. snmpManager->doAddMetric(maxTime, name.clear().append(prefix).append("QueryMaxTime"), 0);
  364. snmpManager->doAddMetric(minTime, name.clear().append(prefix).append("QueryMinTime"), 0);
  365. snmpManager->addRatioMetric(count, name.clear().append(prefix).append("QueryAverageTime"), totalTime);
  366. }
  367. using roxiemem::getHeapAllocated;
  368. using roxiemem::getHeapPercentAllocated;
  369. using roxiemem::getDataBufferPages;
  370. using roxiemem::getDataBuffersActive;
  371. CRoxieMetricsManager::CRoxieMetricsManager()
  372. {
  373. started = false;
  374. addMetric(maxQueueLength, 0);
  375. addMetric(queryCount, 1000);
  376. unknownQueryStats.addMetrics(this, "unknown", 1000);
  377. loQueryStats.addMetrics(this, "lo", 1000);
  378. hiQueryStats.addMetrics(this, "hi", 1000);
  379. slaQueryStats.addMetrics(this, "sla", 1000);
  380. combinedQueryStats.addMetrics(this, "all", 1000);
  381. addMetric(retriesIgnoredPrm, 1000);
  382. addMetric(retriesIgnoredSec, 1000);
  383. addMetric(retriesNeeded, 1000);
  384. addMetric(retriesReceivedPrm, 1000);
  385. addMetric(retriesReceivedSec, 1000);
  386. addMetric(retriesSent, 1000);
  387. addMetric(rowsIn, 1000);
  388. addMetric(rowsOut, 1000);
  389. addMetric(ibytiPacketsFromSelf, 1000);
  390. addMetric(ibytiPacketsSent, 1000);
  391. addMetric(ibytiPacketsWorked, 1000);
  392. addMetric(ibytiPacketsHalfWorked, 1000);
  393. addMetric(ibytiPacketsReceived, 1000);
  394. addMetric(ibytiPacketsTooLate, 1000);
  395. #ifndef NO_IBYTI_DELAYS_COUNT
  396. addMetric(ibytiNoDelaysPrm, 1000);
  397. addMetric(ibytiNoDelaysSec, 1000);
  398. #endif
  399. addMetric(packetsReceived, 1000);
  400. addMetric(packetsSent, 1000);
  401. addMetric(resultsReceived, 1000);
  402. addMetric(slavesActive, 0);
  403. addMetric(maxSlavesActive, 0);
  404. addMetric(indexRecordsRead, 1000);
  405. addMetric(postFiltered, 1000);
  406. addMetric(abortsSent, 0);
  407. addMetric(activitiesStarted, 1000);
  408. addMetric(activitiesCompleted, 1000);
  409. addMetric(diskReadStarted, 0);
  410. addMetric(diskReadCompleted, 0);
  411. addMetric(globalSignals, 0);
  412. addMetric(globalLocks, 0);
  413. addMetric(restarts, 0);
  414. addMetric(nodesLoaded, 1000);
  415. addMetric(cacheHits, 1000);
  416. addMetric(cacheAdds, 1000);
  417. addMetric(leafCacheHits, 1000);
  418. addMetric(leafCacheAdds, 1000);
  419. addMetric(nodeCacheHits, 1000);
  420. addMetric(nodeCacheAdds, 1000);
  421. addMetric(preloadCacheHits, 0);
  422. addMetric(preloadCacheAdds, 0);
  423. addMetric(unwantedDiscarded, 1000);
  424. addMetric(getHeapAllocated, 0);
  425. addMetric(getHeapPercentAllocated, 0);
  426. addMetric(getDataBufferPages, 0);
  427. addMetric(getDataBuffersActive, 0);
  428. addMetric(maxScanLength, 0);
  429. addMetric(totScanLength, 0);
  430. addMetric(totScans, 0);
  431. addMetric(meanScanLength, 0);
  432. addMetric(lastQueryDate, 0);
  433. addMetric(lastQueryTime, 0);
  434. #ifdef TIME_PACKETS
  435. addMetric(packetWaitMax, 0);
  436. addMetric(packetRunMax, 0);
  437. addRatioMetric(packetRunCount, "packetRunAverage", packetRunElapsed);
  438. addRatioMetric(packetWaitCount, "packetWaitAverage", packetWaitElapsed);
  439. #endif
  440. addMetric(numFilesToProcess, 0);
  441. ticker.start();
  442. }
  443. void CRoxieMetricsManager::doAddMetric(atomic_t &counter, const char *name, unsigned interval)
  444. {
  445. doAddMetric(new AtomicMetric(counter, interval != 0), name, interval);
  446. }
  447. void CRoxieMetricsManager::doAddMetric(RelaxedAtomic<unsigned> &counter, const char *name, unsigned interval)
  448. {
  449. doAddMetric(new RelaxedAtomicMetric(counter, interval != 0), name, interval);
  450. }
  451. void CRoxieMetricsManager::doAddMetric(unsigned &counter, const char *name, unsigned interval)
  452. {
  453. doAddMetric(new CounterMetric(counter, interval != 0), name, interval);
  454. }
  455. void CRoxieMetricsManager::doAddMetric(INamedMetric *n, const char *name, unsigned interval)
  456. {
  457. if (interval)
  458. {
  459. StringBuffer fname(name);
  460. fname.append("/s");
  461. IntervalMetric *im = new IntervalMetric(n, interval);
  462. ticker.addListener(im);
  463. metricMap.setValue(fname.str(), im);
  464. im->Release();
  465. }
  466. metricMap.setValue(name, n);
  467. n->Release();
  468. }
  469. void CRoxieMetricsManager::doAddMetric(AccessorFunction function, const char *name, unsigned interval)
  470. {
  471. assertex(interval==0);
  472. doAddMetric(new FunctionMetric(function), name, interval);
  473. }
  474. void CRoxieMetricsManager::addRatioMetric(unsigned &counter, const char *name, unsigned __int64 &elapsed)
  475. {
  476. doAddMetric(new UnsignedRatioMetric(counter, elapsed), name, 0);
  477. }
  478. void CRoxieMetricsManager::addUserMetric(const char *name, const char *regex)
  479. {
  480. doAddMetric(new UserMetric(name, regex), name, 0);
  481. }
  482. long CRoxieMetricsManager::getValue(const char * name)
  483. {
  484. long ret = 0;
  485. INamedMetric *m = metricMap.getValue(name);
  486. if (m)
  487. ret = m->getValue();
  488. #ifdef _DEBUG
  489. DBGLOG("getValue(%s) returning %ld", name, ret);
  490. #endif
  491. return ret;
  492. }
  493. CRoxieMetricsManager::~CRoxieMetricsManager()
  494. {
  495. ticker.stop();
  496. if (started)
  497. dumpMetrics();
  498. }
  499. void CRoxieMetricsManager::dumpMetrics()
  500. {
  501. HashIterator metrics(metricMap);
  502. ForEach(metrics)
  503. {
  504. IMapping &cur = metrics.query();
  505. INamedMetric *m = (INamedMetric *) *metricMap.mapToValue(&cur);
  506. if (m->isCumulative())
  507. {
  508. const char *name = (const char *) cur.getKey();
  509. long val = m->getValue();
  510. DBGLOG("TOTALS: %s = %ld", name, val);
  511. }
  512. }
  513. }
  514. StringBuffer &CRoxieMetricsManager::getMetrics(StringBuffer &xml)
  515. {
  516. xml.append("<Metrics>\n");
  517. HashIterator metrics(metricMap);
  518. ForEach(metrics)
  519. {
  520. IMapping &cur = metrics.query();
  521. INamedMetric *m = (INamedMetric *) *metricMap.mapToValue(&cur);
  522. const char *name = (const char *) cur.getKey();
  523. long val = m->getValue();
  524. xml.appendf(" <Metric name=\"%s\" value=\"%ld\"/>\n", name, val);
  525. }
  526. xml.append("</Metrics>\n");
  527. return xml;
  528. }
  529. void CRoxieMetricsManager::resetMetrics()
  530. {
  531. HashIterator metrics(metricMap);
  532. ForEach(metrics)
  533. {
  534. IMapping &cur = metrics.query();
  535. INamedMetric *m = (INamedMetric *) *metricMap.mapToValue(&cur);
  536. m->resetValue();
  537. }
  538. }
  539. IRoxieMetricsManager *createRoxieMetricsManager()
  540. {
  541. return new CRoxieMetricsManager();
  542. }
  543. Owned<IRoxieMetricsManager> roxieMetrics;
  544. // Requirements for query stats: Want to be able to ask:
  545. // 1. Average response time for query since ....??last restart of roxie??(or
  546. // maybe last 24 hours)
  547. // 2. Average response time of query in last hours
  548. // 3. Quantity of searches on each query since........??last restart of roxie??
  549. // (or maybe last 24 hours)
  550. // 4. Quantity of searches in last hour
  551. // 5. Breakout of number of searches for each query with certain standard deviations
  552. // Implementation notes: In production usage at present there may be up to 100000 queries on a given node per day,
  553. // - it is likely that this will increase and that roxie may be restarted less frequently if we start loading data
  554. // on-the-fly.
  555. // We probably need a mechanism of concentric ring buffers to ensure that we don't end up using excessive memory
  556. // OR we could log the info to disk (roll every hour?) (so restarts would not destroy) and scan it when asked... can cache results if we want.
  557. // The info is already in the log file...
  558. // A separate process tailing the log file?
  559. // If we do the internal concentric buffers route:
  560. // If hour (or minute, or minute/5, or whatever minimum granularity we pick) has changed since last noted a query,
  561. // shuffle...
  562. // we will be able to give results for any hour (x:00 to x+1:00), day (midnight to midnight), 5 minute period, etc
  563. // as well as current hour-so-far, current day-so-far
  564. // and also last 5 minutes, last 60 minutes.
  565. // OR - just use a fixed size circular buffer and discard info once full. Anyone needing to know aggregated info
  566. // about more than the last (say) 64k queries will have to start grepping logs.
  567. // Probably better to use expanding array with max size as may queries will not get anywhere near.
  568. // A binchop to find start-time/end-time ...
  569. // OR each time we fill up we aggregate all records over 2 hours old into a hour-based array. 8760 elements if stay up for a year...
  570. // Anything asking about a time period that starts > 2 hours ago will give approximate answer (and indicate that it did so).
  571. // We can arrange so that all info < 2 hours old is in one circular buffer, all info > 2 hours old is aggregated into hourly slices
  572. // The last of the hourly slices is then incomplete. OR is it easier to aggregate as we go and discard data > 1 hour old?
  573. // Probably.
  574. // If we have per-hour aggregate data available indefinitely, and last-hour data available in full, we should be ok
  575. // If we want to be able to see other stats with same detail as time, we will need to abstract this out a bit more.
  576. #define NUM_BUCKETS 16 //<32,<64,<125,<250,<500,<1s,<2,<4,<8,<16,<32,<64s,<128s,<256s,more
  577. #define SLOT_LENGTH 3600 // in seconds. Granularity of aggregation
  578. class CQueryStatsAggregator : public CInterface, implements IQueryStatsAggregator
  579. {
  580. class QueryStatsRecord : public CInterface
  581. {
  582. // one of these per query in last hour...
  583. private:
  584. time_t startTime; // more interesting than end-time
  585. unsigned elapsedTimeMs;
  586. unsigned memUsed;
  587. unsigned slavesReplyLen;
  588. unsigned bytesOut;
  589. bool failed;
  590. public:
  591. QueryStatsRecord(time_t _startTime, bool _failed, unsigned _elapsedTimeMs, unsigned _memused, unsigned _slavesReplyLen, unsigned _bytesOut)
  592. {
  593. startTime = _startTime;
  594. failed = _failed;
  595. elapsedTimeMs = _elapsedTimeMs;
  596. memUsed = _memused;
  597. slavesReplyLen = _slavesReplyLen;
  598. bytesOut = _bytesOut;
  599. }
  600. bool expired(time_t timeNow, unsigned expirySeconds)
  601. {
  602. return difftime(timeNow, startTime) > expirySeconds;
  603. }
  604. bool inRange(time_t from, time_t to)
  605. {
  606. return difftime(startTime, from) >= 0 && difftime(to, startTime) > 0;
  607. }
  608. inline bool isOlderThan(time_t otherTime)
  609. {
  610. return difftime(startTime, otherTime) < 0;
  611. }
  612. static int compareTime(CInterface * const *_l, CInterface* const *_r)
  613. {
  614. QueryStatsRecord *l = (QueryStatsRecord *)*_l;
  615. QueryStatsRecord *r = (QueryStatsRecord *)*_r;
  616. return l->elapsedTimeMs - r->elapsedTimeMs;
  617. }
  618. static void getStats(IPropertyTree &result, CIArrayOf<QueryStatsRecord> &useStats, time_t from, time_t to)
  619. {
  620. QueryStatsAggregateRecord aggregator(from, to);
  621. ForEachItemIn(idx, useStats)
  622. {
  623. QueryStatsRecord &r = useStats.item(idx);
  624. aggregator.noteQuery(r.failed, r.elapsedTimeMs, r.memUsed, r.slavesReplyLen, r.bytesOut);
  625. }
  626. aggregator.getStats(result, false);
  627. // Add in the exact percentiles
  628. if (useStats.length())
  629. {
  630. unsigned percentile97Pos = (useStats.length() * 97) / 100;
  631. useStats.sort(QueryStatsRecord::compareTime);
  632. result.setPropInt("percentile97", useStats.item(percentile97Pos).elapsedTimeMs);
  633. }
  634. else
  635. result.setPropInt("percentile97", 0);
  636. }
  637. static bool checkOlder(const void *_left, const void *_right)
  638. {
  639. QueryStatsRecord *left = (QueryStatsRecord *) _left;
  640. QueryStatsRecord *right = (QueryStatsRecord *) _right;
  641. return left->isOlderThan(right->startTime);
  642. }
  643. };
  644. class QueryStatsAggregateRecord : public CInterface
  645. {
  646. // one of these per hour...
  647. private:
  648. unsigned countTotal;
  649. unsigned countFailed;
  650. unsigned __int64 totalTimeMs;
  651. unsigned __int64 totalTimeMsSquared;
  652. unsigned __int64 totalMemUsed;
  653. unsigned __int64 totalSlavesReplyLen;
  654. unsigned __int64 totalBytesOut;
  655. unsigned maxTimeMs;
  656. unsigned minTimeMs;
  657. time_t startTime;
  658. time_t endTime;
  659. unsigned buckets[NUM_BUCKETS];
  660. public:
  661. QueryStatsAggregateRecord(time_t _startTime, time_t _endTime)
  662. {
  663. startTime = _startTime;
  664. endTime = _endTime;
  665. countTotal = 0;
  666. countFailed = 0;
  667. totalTimeMs = 0;
  668. totalTimeMsSquared = 0;
  669. totalMemUsed = 0;
  670. totalSlavesReplyLen = 0;
  671. totalBytesOut = 0;
  672. maxTimeMs = 0;
  673. minTimeMs = 0;
  674. memset(buckets, 0, sizeof(buckets));
  675. }
  676. bool inRange(time_t from, time_t to)
  677. {
  678. return (difftime(startTime, from) >= 0 && difftime(to, endTime) > 0);
  679. }
  680. bool matches(time_t queryTime)
  681. {
  682. return (difftime(queryTime, startTime) >= 0 && difftime(queryTime, endTime) < 0);
  683. }
  684. bool older(time_t queryTime)
  685. {
  686. return difftime(queryTime, endTime) >= 0;
  687. }
  688. void mergeStats(QueryStatsAggregateRecord &other)
  689. {
  690. // NOTE - we could (if we understood stats) try to interpolate when requested time ranges do not include
  691. // the whole of the block being merged. But I think it's better and easier to return stats for the full time periods
  692. // and return indication of what the time period actually being reported is.
  693. startTime = std::min(startTime, other.startTime);
  694. endTime = std::max(endTime, other.endTime);
  695. totalTimeMs += other.totalTimeMs;
  696. totalTimeMsSquared += other.totalTimeMsSquared;
  697. totalMemUsed += other.totalMemUsed;
  698. totalSlavesReplyLen += other.totalSlavesReplyLen;
  699. totalBytesOut += other.totalBytesOut;
  700. maxTimeMs = std::max(maxTimeMs, other.maxTimeMs);
  701. if (other.countTotal)
  702. minTimeMs = countTotal ? std::min(minTimeMs, other.minTimeMs) : other.minTimeMs;
  703. // NOTE - update coutTotal AFTER minTimeMs or the check for zero is wrong.
  704. countTotal += other.countTotal;
  705. countFailed += other.countFailed;
  706. for (unsigned bucketIndex = 0; bucketIndex < NUM_BUCKETS; bucketIndex++)
  707. {
  708. buckets[bucketIndex] += other.buckets[bucketIndex];
  709. }
  710. }
  711. void noteQuery(bool failed, unsigned elapsedTimeMs, unsigned memUsed, unsigned slavesReplyLen, unsigned bytesOut)
  712. {
  713. totalTimeMs += elapsedTimeMs;
  714. unsigned __int64 timeSquared = elapsedTimeMs;
  715. timeSquared *= timeSquared;
  716. totalTimeMsSquared += timeSquared;
  717. totalMemUsed += memUsed;
  718. totalSlavesReplyLen += slavesReplyLen;
  719. totalBytesOut += bytesOut;
  720. if (elapsedTimeMs > maxTimeMs)
  721. maxTimeMs = elapsedTimeMs;
  722. if (countTotal==0 || elapsedTimeMs < minTimeMs)
  723. minTimeMs = elapsedTimeMs;
  724. unsigned bucketIdx;
  725. if (elapsedTimeMs <= 32)
  726. bucketIdx = 0;
  727. else if (elapsedTimeMs <= 64)
  728. bucketIdx = 1;
  729. else
  730. {
  731. bucketIdx = 2;
  732. unsigned bucketMax = 125;
  733. while (elapsedTimeMs > bucketMax)
  734. {
  735. bucketIdx++;
  736. if (bucketIdx == NUM_BUCKETS-1)
  737. break;
  738. bucketMax *= 2;
  739. }
  740. }
  741. buckets[bucketIdx]++;
  742. countTotal++;
  743. if (failed)
  744. countFailed++;
  745. }
  746. void getStats(IPropertyTree &result, bool estimatePercentiles)
  747. {
  748. CDateTime dt;
  749. StringBuffer s;
  750. dt.set(startTime);
  751. result.setProp("startTime", dt.getString(s.clear(), true).str());
  752. dt.set(endTime);
  753. result.setProp("endTime", dt.getString(s.clear(), true).str());
  754. result.setPropInt64("countTotal", countTotal);
  755. result.setPropInt64("countFailed", countFailed);
  756. result.setPropInt64("averageTimeMs", countTotal ? totalTimeMs/countTotal : 0);
  757. result.setPropInt64("averageMemUsed", countTotal ? totalMemUsed/countTotal : 0);
  758. result.setPropInt64("averageSlavesReplyLen", countTotal ? totalSlavesReplyLen/countTotal : 0);
  759. result.setPropInt64("averageBytesOut", countTotal ? totalBytesOut/countTotal : 0);
  760. // MORE - do something funky and statistical using totalTimeMsSquared
  761. result.setPropInt("maxTimeMs", maxTimeMs);
  762. result.setPropInt("minTimeMs", minTimeMs);
  763. if (estimatePercentiles)
  764. {
  765. // We can tell which bucket the 97th percentile is in...
  766. unsigned percentile97 = (unsigned) (((countTotal * 97.0) / 100.0)+0.5);
  767. unsigned belowMe = 0;
  768. unsigned bucketLimit = 32;
  769. for (unsigned bucketIndex = 0; bucketIndex < NUM_BUCKETS; bucketIndex++)
  770. {
  771. belowMe += buckets[bucketIndex];
  772. if (belowMe >= percentile97)
  773. {
  774. if (bucketLimit > maxTimeMs)
  775. bucketLimit = maxTimeMs;
  776. result.setPropInt("percentile97", bucketLimit);
  777. result.setPropBool("percentile97/@estimate", true);
  778. break;
  779. }
  780. if (bucketLimit == 64)
  781. bucketLimit = 125;
  782. else
  783. bucketLimit += bucketLimit;
  784. }
  785. }
  786. }
  787. };
  788. QueueOf<QueryStatsRecord, false> recent;
  789. CIArrayOf<QueryStatsAggregateRecord> aggregated; // stored with most recent first
  790. unsigned expirySeconds; // time to keep exact info (rather than just aggregated)
  791. StringAttr queryName;
  792. SpinLock lock; // MORE: This could be held this for a while. Is this significant? Should it be a CriticalSection?
  793. QueryStatsAggregateRecord &findAggregate(time_t startTime)
  794. {
  795. unsigned idx = 0;
  796. while (aggregated.isItem(idx))
  797. {
  798. QueryStatsAggregateRecord &thisSlot = aggregated.item(idx);
  799. if (thisSlot.matches(startTime))
  800. return thisSlot; // This is the most common case!
  801. else if (thisSlot.older(startTime))
  802. break;
  803. idx++;
  804. }
  805. time_t slotStart;
  806. time_t slotEnd;
  807. calcSlotStartTime(startTime, SLOT_LENGTH, slotStart, slotEnd);
  808. QueryStatsAggregateRecord *newSlot = new QueryStatsAggregateRecord(slotStart, slotEnd);
  809. aggregated.add(*newSlot, idx);
  810. return *newSlot;
  811. }
  812. static void calcSlotStartTime(time_t queryTime, unsigned slotLengthSeconds, time_t &slotStart, time_t &slotEnd)
  813. {
  814. assertex (slotLengthSeconds == 3600); // Haven't written any code to support anything else yet!
  815. struct tm queryTimeExpanded;
  816. localtime_r(&queryTime, &queryTimeExpanded);
  817. queryTimeExpanded.tm_min = 0;
  818. queryTimeExpanded.tm_sec = 0;
  819. slotStart = mktime(&queryTimeExpanded);
  820. queryTimeExpanded.tm_sec = slotLengthSeconds;
  821. slotEnd = mktime(&queryTimeExpanded);
  822. }
  823. static CQueryStatsAggregator globalStatsAggregator;
  824. static SpinLock queryStatsCrit;
  825. public:
  826. static CIArrayOf<CQueryStatsAggregator> queryStatsAggregators;
  827. virtual void Link(void) const { CInterface::Link(); }
  828. virtual bool Release(void) const
  829. {
  830. if (CInterface::Release())
  831. return true;
  832. SpinBlock b(queryStatsCrit);
  833. if (!IsShared())
  834. {
  835. queryStatsAggregators.zap(* const_cast<CQueryStatsAggregator*>(this));
  836. return true;
  837. }
  838. return false;
  839. }
  840. CQueryStatsAggregator(const char *_queryName, unsigned _expirySeconds)
  841. : queryName(_queryName)
  842. {
  843. expirySeconds = _expirySeconds;
  844. SpinBlock b(queryStatsCrit); // protect the global list
  845. queryStatsAggregators.append(*LINK(this));
  846. }
  847. ~CQueryStatsAggregator()
  848. {
  849. while (recent.ordinality())
  850. {
  851. recent.dequeue()->Release();
  852. }
  853. }
  854. static IPropertyTree *getAllQueryStats(bool includeQueries, time_t from, time_t to)
  855. {
  856. Owned<IPTree> result = createPTree("QueryStats", ipt_fast);
  857. if (includeQueries)
  858. {
  859. SpinBlock b(queryStatsCrit);
  860. ForEachItemIn(idx, queryStatsAggregators)
  861. {
  862. CQueryStatsAggregator &thisQuery = queryStatsAggregators.item(idx);
  863. result->addPropTree("Query", thisQuery.getStats(from, to));
  864. }
  865. }
  866. result->addPropTree("Global", globalStatsAggregator.getStats(from, to));
  867. return result.getClear();
  868. }
  869. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsedTimeMs, unsigned memUsed, unsigned slavesReplyLen, unsigned bytesOut)
  870. {
  871. time_t timeNow;
  872. time(&timeNow);
  873. SpinBlock b(lock);
  874. if (expirySeconds)
  875. {
  876. QueryStatsRecord *statsRec = new QueryStatsRecord(startTime, failed, elapsedTimeMs, memUsed, slavesReplyLen, bytesOut);
  877. recent.enqueue(statsRec, QueryStatsRecord::checkOlder);
  878. }
  879. // Now remove any that have expired
  880. if (expirySeconds != (unsigned) -1)
  881. {
  882. while (recent.ordinality() && recent.head()->expired(timeNow, expirySeconds))
  883. {
  884. recent.dequeue()->Release();
  885. }
  886. }
  887. QueryStatsAggregateRecord &aggregator = findAggregate(startTime);
  888. aggregator.noteQuery(failed, elapsedTimeMs, memUsed, slavesReplyLen, bytesOut);
  889. }
  890. virtual IPropertyTree *getStats(time_t from, time_t to)
  891. {
  892. time_t timeNow;
  893. time(&timeNow);
  894. Owned<IPropertyTree> result = createPTree("Query", ipt_fast);
  895. result->setProp("@id", queryName);
  896. if (expirySeconds && difftime(timeNow, from) <= expirySeconds)
  897. {
  898. // we can calculate exactly
  899. CIArrayOf<QueryStatsRecord> useStats;
  900. {
  901. SpinBlock b(lock); // be careful not to take too much time in here! If it gets to take a while, we will have to rethink
  902. ForEachQueueItemIn(idx, recent)
  903. {
  904. QueryStatsRecord *rec = recent.item(idx);
  905. if (rec->inRange(from, to))
  906. {
  907. rec->Link();
  908. useStats.append(*rec);
  909. }
  910. }
  911. // Spinlock is released here, and we process the useStats array at our leisure...
  912. }
  913. QueryStatsRecord::getStats(*result, useStats, from, to);
  914. }
  915. else // use aggregate stats - result will be inexact
  916. {
  917. QueryStatsAggregateRecord aggregator(from, to);
  918. {
  919. SpinBlock b(lock);
  920. ForEachItemInRev(idx, aggregated)
  921. {
  922. QueryStatsAggregateRecord &thisSlot = aggregated.item(idx);
  923. if (thisSlot.inRange(from, to))
  924. aggregator.mergeStats(thisSlot);
  925. else if (thisSlot.older(from))
  926. break;
  927. }
  928. // Spinlock is released here, and we process the aggregator at our leisure...
  929. }
  930. aggregator.getStats(*result, true);
  931. }
  932. return result.getClear();
  933. }
  934. static inline IQueryStatsAggregator *queryGlobalStatsAggregator()
  935. {
  936. return &globalStatsAggregator;
  937. }
  938. };
  939. CIArrayOf<CQueryStatsAggregator> CQueryStatsAggregator::queryStatsAggregators;
  940. CQueryStatsAggregator CQueryStatsAggregator::globalStatsAggregator(NULL, SLOT_LENGTH);
  941. SpinLock CQueryStatsAggregator::queryStatsCrit; //MORE: Should probably be a critical section
  942. IQueryStatsAggregator *queryGlobalQueryStatsAggregator()
  943. {
  944. return CQueryStatsAggregator::queryGlobalStatsAggregator();
  945. }
  946. IQueryStatsAggregator *createQueryStatsAggregator(const char *_queryName, unsigned _expirySeconds)
  947. {
  948. return new CQueryStatsAggregator(_queryName, _expirySeconds);
  949. }
  950. IPropertyTree *getAllQueryStats(bool includeQueries, time_t from, time_t to)
  951. {
  952. return CQueryStatsAggregator::getAllQueryStats(includeQueries, from, to);
  953. }
  954. //=======================================================================================================
  955. #ifdef _USE_CPPUNIT
  956. #include <cppunit/extensions/HelperMacros.h>
  957. class StatsAggregatorTest : public CppUnit::TestFixture
  958. {
  959. CPPUNIT_TEST_SUITE( StatsAggregatorTest );
  960. CPPUNIT_TEST(test1);
  961. CPPUNIT_TEST_SUITE_END();
  962. protected:
  963. void test1()
  964. {
  965. struct tm tmStruct;
  966. tmStruct.tm_isdst = 0;
  967. tmStruct.tm_hour = 12;
  968. tmStruct.tm_min = 34;
  969. tmStruct.tm_sec = 56;
  970. tmStruct.tm_mday = 14;
  971. tmStruct.tm_mon = 3;
  972. tmStruct.tm_year = 2005-1900;
  973. Owned<IQueryStatsAggregator> s = createQueryStatsAggregator("TestQuery", 0);
  974. // MORE - scope for much more testing here...
  975. for (unsigned i = 0; i < 100; i++)
  976. {
  977. s->noteQuery(mktime(&tmStruct), false, i, 8000, 10000, 55);
  978. tmStruct.tm_sec++;
  979. }
  980. tmStruct.tm_hour = 11;
  981. s->noteQuery(mktime(&tmStruct), false, 80000, 4000, 5000, 66);
  982. tmStruct.tm_hour = 0;
  983. tmStruct.tm_min = 0;
  984. tmStruct.tm_sec = 0;
  985. time_t start = mktime(&tmStruct);
  986. tmStruct.tm_hour = 24;
  987. time_t end = mktime(&tmStruct);
  988. {
  989. Owned<IPropertyTree> p = s->getStats(start, end);
  990. StringBuffer stats;
  991. toXML(p, stats);
  992. DBGLOG("%s", stats.str());
  993. }
  994. {
  995. Owned<IPropertyTree> p = getAllQueryStats(true, start, end);
  996. StringBuffer stats;
  997. toXML(p, stats);
  998. DBGLOG("%s", stats.str());
  999. }
  1000. s.clear();
  1001. }
  1002. };
  1003. CPPUNIT_TEST_SUITE_REGISTRATION( StatsAggregatorTest );
  1004. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( StatsAggregatorTest, "StatsAggregatorTest" );
  1005. #endif