ccd.hpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _CCD_INCL
  14. #define _CCD_INCL
  15. #include "jexcept.hpp"
  16. #include "jsocket.hpp"
  17. #include "jptree.hpp"
  18. #include "udplib.hpp"
  19. #include "portlist.h"
  20. #include "thorsoapcall.hpp"
  21. #include "thorxmlwrite.hpp"
  22. #include "jlog.hpp"
  23. #include "jstats.h"
  24. #include "roxie.hpp"
  25. #include "roxiedebug.ipp"
  26. #include "eclrtl.hpp"
  27. #include "workunit.hpp"
  28. #ifdef _WIN32
  29. #ifdef CCD_EXPORTS
  30. #define CCD_API __declspec(dllexport)
  31. #else
  32. #define CCD_API __declspec(dllimport)
  33. #endif
  34. #else
  35. #define CCD_API
  36. #endif
  37. #define PARALLEL_EXECUTE
  38. #define RE_FailedToLoadProcedure 0x1000
  39. #define RE_FailedToLoadSharedObject 0x2000
  40. #define MAXTRACELEVEL 100 // don't want traceLevel+1 to wrap to 0 in lsb
  41. #define MAX_CLUSTER_SIZE 1024
  42. #define UDP_QUEUE_SIZE 100
  43. #define UDP_SEND_QUEUE_SIZE 50
  44. #define ROXIE_STATEFILE_VERSION 2
  45. extern IException *MakeRoxieException(int code, const char *format, ...) __attribute__((format(printf, 2, 3)));
  46. extern Owned<ISocket> multicastSocket;
  47. extern size32_t channelWrite(unsigned channel, void const* buf, size32_t size);
  48. void addEndpoint(unsigned channel, const IpAddress &slaveIp, unsigned port);
  49. void openMulticastSocket();
  50. void joinMulticastChannel(unsigned channel);
  51. extern unsigned channels[MAX_CLUSTER_SIZE]; // list of all channel numbers for this node
  52. extern unsigned channelCount; // number of channels this node is doing
  53. extern unsigned subChannels[MAX_CLUSTER_SIZE]; // maps channel numbers to subChannels for this node
  54. extern bool suspendedChannels[MAX_CLUSTER_SIZE];// indicates suspended channels for this node
  55. extern unsigned numSlaves[MAX_CLUSTER_SIZE]; // number of slaves listening on this channel
  56. extern unsigned replicationLevel[MAX_CLUSTER_SIZE]; // Which copy of the data this channel uses on this slave
  57. extern unsigned myNodeIndex;
  58. #define OUTOFBAND_SEQUENCE 0x8000 // indicates an out-of-band reply
  59. #define OVERFLOWSEQUENCE_MAX 0x7fffu // Max value before we want to wrap (to avoid collision with flag)
  60. #define CONTINUE_SEQUENCE_SKIPTO 0x8000 // flag in continueSequence field indicating presence of skipTo data
  61. #define CONTINUESEQUENCE_MAX 0x7fffu // Max value before we want to wrap (to avoid collision with flag)
  62. #define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue
  63. #define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
  64. #define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
  65. #ifdef ROXIE_SLA_LOGIC
  66. #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)
  67. #else
  68. #define ROXIE_PRIORITY_MASK (ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY )
  69. #endif
  70. #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
  71. // Status information returned in the activityId field of the header:
  72. // note - any of these also also set sequence top bit to ensure not regarded as dup.
  73. #define ROXIE_ACTIVITY_SPECIAL_FIRST 0x3ffffff0u
  74. #define ROXIE_UNLOAD 0x3ffffff6u
  75. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  76. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  77. #define ROXIE_PING 0x3ffffff9u
  78. #define ROXIE_TRACEINFO 0x3ffffffau
  79. #define ROXIE_FILECALLBACK 0x3ffffffbu
  80. #define ROXIE_ALIVE 0x3ffffffcu
  81. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  82. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  83. #define ROXIE_EXCEPTION 0x3fffffffu
  84. #define ROXIE_ACTIVITY_SPECIAL_LAST 0x3fffffffu
  85. #define SUBCHANNEL_MASK 3
  86. #define SUBCHANNEL_BITS 2 // allows for up to 7-way redundancy in a 16-bit short retries flag, high bits used for indicators/flags
  87. //#define TIME_PACKETS
  88. #define ROXIE_FASTLANE 0x8000u // mask in retries indicating slave reply goes on the fast queue
  89. #define ROXIE_BROADCAST 0x4000u // mask in retries indicating original request was a broadcast
  90. #define ROXIE_RETRIES_MASK (~(ROXIE_FASTLANE|ROXIE_BROADCAST)) // retries bits mask
  91. #define QUERY_ABORTED 0xffffu // special value for retries to indicate abandoned query
  92. #ifdef _DEBUG
  93. #define MAX_DEBUGREQUEST_RETRIES 1
  94. #define DEBUGREQUEST_TIMEOUT 500000
  95. #else
  96. #define MAX_DEBUGREQUEST_RETRIES 3
  97. #define DEBUGREQUEST_TIMEOUT 5000
  98. #endif
  99. #define ROXIE_DALI_CONNECT_TIMEOUT 30000
  100. #define ABORT_POLL_PERIOD 5000
  101. class RemoteActivityId
  102. {
  103. public:
  104. hash64_t queryHash;
  105. unsigned activityId;
  106. inline bool isHighPriority() const { return (activityId & ROXIE_PRIORITY_MASK) == ROXIE_HIGH_PRIORITY; }
  107. inline bool isSLAPriority() const { return (activityId & ROXIE_PRIORITY_MASK) == ROXIE_SLA_PRIORITY; }
  108. inline RemoteActivityId(unsigned _activityId, hash64_t _queryHash)
  109. : activityId(_activityId), queryHash(_queryHash)
  110. {
  111. }
  112. inline MemoryBuffer &serialize(MemoryBuffer &out) const
  113. {
  114. return out.append(activityId).append(queryHash);
  115. }
  116. inline RemoteActivityId(MemoryBuffer &in)
  117. {
  118. in.read(activityId);
  119. in.read(queryHash);
  120. }
  121. };
  122. class RoxiePacketHeader
  123. {
  124. private:
  125. RoxiePacketHeader(const RoxiePacketHeader &source);
  126. public:
  127. unsigned short packetlength;
  128. unsigned short retries; // how many retries on this query, the high bits are used as flags, see above
  129. unsigned short overflowSequence;// Used if more than one packet-worth of data from server - eg keyed join. We don't mind if we wrap...
  130. unsigned short continueSequence;// Used if more than one chunk-worth of data from slave. We don't mind if we wrap
  131. unsigned activityId; // identifies the helper factory to be used (activityId in graph)
  132. hash64_t queryHash; // identifies the query
  133. ruid_t uid; // unique id
  134. unsigned serverIdx; // final result (server) destination
  135. #ifdef TIME_PACKETS
  136. unsigned tick;
  137. #endif
  138. unsigned short channel; // multicast family to send on
  139. inline RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  140. {
  141. packetlength = sizeof(RoxiePacketHeader);
  142. #ifdef TIME_PACKETS
  143. tick = 0;
  144. #endif
  145. init(_remoteId, _uid, _channel, _overflowSequence);
  146. }
  147. RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _activityId)
  148. {
  149. // Used to create the header to send a callback to originating server or an IBYTI to a buddy
  150. activityId = _activityId;
  151. uid = source.uid;
  152. queryHash = source.queryHash;
  153. serverIdx = source.serverIdx;
  154. channel = source.channel;
  155. overflowSequence = source.overflowSequence;
  156. continueSequence = source.continueSequence;
  157. if (_activityId >= ROXIE_ACTIVITY_SPECIAL_FIRST && _activityId <= ROXIE_ACTIVITY_SPECIAL_LAST)
  158. overflowSequence |= OUTOFBAND_SEQUENCE; // Need to make sure it is not treated as dup of actual reply in the udp layer
  159. retries = getSubChannelMask(channel) | (source.retries & ~ROXIE_RETRIES_MASK);
  160. #ifdef TIME_PACKETS
  161. tick = source.tick;
  162. #endif
  163. packetlength = sizeof(RoxiePacketHeader);
  164. }
  165. static unsigned getSubChannelMask(unsigned channel)
  166. {
  167. unsigned subChannel = subChannels[channel] - 1;
  168. return SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  169. }
  170. inline unsigned getSequenceId() const
  171. {
  172. return (((unsigned) overflowSequence) << 16) | (unsigned) continueSequence;
  173. }
  174. inline unsigned priorityHash() const
  175. {
  176. // Used to determine which slave to act as primary and which as secondary for a given packet (thus spreading the load)
  177. // It's important that we do NOT include channel (since that would result in different values for the different slaves responding to a broadcast)
  178. // We also don't include continueSequence since we'd prefer continuations to go the same way as original
  179. unsigned hash = hashc((const unsigned char *) &serverIdx, sizeof(serverIdx), 0);
  180. hash = hashc((const unsigned char *) &uid, sizeof(uid), hash);
  181. hash += overflowSequence; // MORE - is this better than hashing?
  182. if (traceLevel > 9)
  183. {
  184. StringBuffer s;
  185. DBGLOG("Calculating hash: %s hash was %d", toString(s).str(), hash);
  186. }
  187. return hash;
  188. }
  189. inline bool matchPacket(const RoxiePacketHeader &oh) const
  190. {
  191. // used when matching up a kill packet against a pending one...
  192. // DO NOT compare activityId - they are not supposed to match, since 0 in activityid identifies ibyti!
  193. return
  194. oh.uid==uid &&
  195. (oh.overflowSequence & ~OUTOFBAND_SEQUENCE) == (overflowSequence & ~OUTOFBAND_SEQUENCE) &&
  196. oh.continueSequence == continueSequence &&
  197. oh.serverIdx==serverIdx &&
  198. oh.channel==channel;
  199. }
  200. void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  201. {
  202. retries = 0;
  203. activityId = _remoteId.activityId;
  204. queryHash = _remoteId.queryHash;
  205. uid = _uid;
  206. serverIdx = myNodeIndex;
  207. channel = _channel;
  208. overflowSequence = _overflowSequence;
  209. continueSequence = 0;
  210. }
  211. StringBuffer &toString(StringBuffer &ret) const;
  212. bool allChannelsFailed()
  213. {
  214. unsigned mask = (1 << (numSlaves[channel] * SUBCHANNEL_BITS)) - 1;
  215. return (retries & mask) == mask;
  216. }
  217. bool retry()
  218. {
  219. bool worthRetrying = false;
  220. unsigned mask = SUBCHANNEL_MASK;
  221. for (unsigned subChannel = 0; subChannel < numSlaves[channel]; subChannel++)
  222. {
  223. unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
  224. if (subRetries != SUBCHANNEL_MASK)
  225. subRetries++;
  226. if (subRetries != SUBCHANNEL_MASK)
  227. worthRetrying = true;
  228. retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
  229. mask <<= SUBCHANNEL_BITS;
  230. }
  231. return worthRetrying;
  232. }
  233. inline void noteAlive(unsigned mask)
  234. {
  235. retries = (retries & ~mask);
  236. }
  237. inline void noteException(unsigned mask)
  238. {
  239. retries = (retries | mask);
  240. }
  241. inline void setException()
  242. {
  243. unsigned subChannel = subChannels[channel] - 1;
  244. retries |= SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  245. }
  246. unsigned thisChannelRetries()
  247. {
  248. unsigned shift = SUBCHANNEL_BITS * (subChannels[channel] - 1);
  249. unsigned mask = SUBCHANNEL_MASK << shift;
  250. return (retries & mask) >> shift;
  251. }
  252. };
  253. interface IRoxieQueryPacket : extends IInterface
  254. {
  255. virtual RoxiePacketHeader &queryHeader() const = 0;
  256. virtual const void *queryContinuationData() const = 0;
  257. virtual unsigned getContinuationLength() const = 0;
  258. virtual const byte *querySmartStepInfoData() const = 0;
  259. virtual unsigned getSmartStepInfoLength() const = 0;
  260. virtual const byte *queryTraceInfo() const = 0;
  261. virtual unsigned getTraceLength() const = 0;
  262. virtual const void *queryContextData() const = 0;
  263. virtual unsigned getContextLength() const = 0;
  264. virtual IRoxieQueryPacket *clonePacket(unsigned channel) const = 0;
  265. virtual unsigned hash() const = 0;
  266. virtual bool cacheMatch(const IRoxieQueryPacket *) const = 0; // note - this checks whether it's a repeat from server's point-of-view
  267. virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const = 0;
  268. };
  269. interface IQueryDll;
  270. // Global configuration info
  271. extern bool shuttingDown;
  272. extern unsigned numChannels;
  273. extern unsigned callbackRetries;
  274. extern unsigned callbackTimeout;
  275. extern unsigned lowTimeout;
  276. extern unsigned highTimeout;
  277. extern unsigned slaTimeout;
  278. extern unsigned headRegionSize;
  279. extern unsigned ccdMulticastPort;
  280. extern CriticalSection ccdChannelsCrit;
  281. extern IPropertyTree *ccdChannels;
  282. extern IPropertyTree *topology;
  283. extern MapStringTo<int> *preferredClusters;
  284. extern StringArray allQuerySetNames;
  285. extern bool allFilesDynamic;
  286. extern bool lockSuperFiles;
  287. extern bool crcResources;
  288. extern bool logFullQueries;
  289. extern bool blindLogging;
  290. extern bool debugPermitted;
  291. extern bool useRemoteResources;
  292. extern bool checkFileDate;
  293. extern bool lazyOpen;
  294. extern bool localSlave;
  295. extern bool ignoreOrphans;
  296. extern bool doIbytiDelay;
  297. extern unsigned initIbytiDelay;
  298. extern unsigned minIbytiDelay;
  299. extern bool copyResources;
  300. extern bool chunkingHeap;
  301. extern unsigned perChannelFlowLimit;
  302. extern unsigned parallelLoopFlowLimit;
  303. extern unsigned numServerThreads;
  304. extern unsigned numRequestArrayThreads;
  305. extern unsigned readTimeout;
  306. extern unsigned indexReadChunkSize;
  307. extern SocketEndpoint ownEP;
  308. extern unsigned maxBlockSize;
  309. extern unsigned maxLockAttempts;
  310. extern bool enableHeartBeat;
  311. extern bool checkVersion;
  312. extern unsigned memoryStatsInterval;
  313. extern unsigned pingInterval;
  314. extern unsigned socketCheckInterval;
  315. extern memsize_t defaultMemoryLimit;
  316. extern unsigned defaultTimeLimit[3];
  317. extern unsigned defaultWarnTimeLimit[3];
  318. extern unsigned defaultThorConnectTimeout;
  319. extern bool pretendAllOpt;
  320. extern ClientCertificate clientCert;
  321. extern bool useHardLink;
  322. extern unsigned maxFileAge[2];
  323. extern unsigned minFilesOpen[2];
  324. extern unsigned maxFilesOpen[2];
  325. extern unsigned restarts;
  326. extern bool checkCompleted;
  327. extern unsigned preabortKeyedJoinsThreshold;
  328. extern unsigned preabortIndexReadsThreshold;
  329. extern bool traceStartStop;
  330. extern bool traceServerSideCache;
  331. extern bool defaultTimeActivities;
  332. extern unsigned watchActivityId;
  333. extern unsigned testSlaveFailure;
  334. extern unsigned dafilesrvLookupTimeout;
  335. extern bool fastLaneQueue;
  336. extern unsigned mtu_size;
  337. extern StringBuffer fileNameServiceDali;
  338. extern StringBuffer roxieName;
  339. extern bool trapTooManyActiveQueries;
  340. extern unsigned maxEmptyLoopIterations;
  341. extern unsigned maxGraphLoopIterations;
  342. extern HardwareInfo hdwInfo;
  343. extern unsigned parallelAggregate;
  344. extern bool inMemoryKeysEnabled;
  345. extern unsigned __int64 minFreeDiskSpace;
  346. extern unsigned serverSideCacheSize;
  347. extern bool probeAllRows;
  348. extern bool steppingEnabled;
  349. extern bool simpleLocalKeyedJoins;
  350. extern bool enableKeyDiff;
  351. extern bool useTreeCopy;
  352. extern PTreeReaderOptions defaultXmlReadFlags;
  353. extern bool mergeSlaveStatistics;
  354. extern bool roxieMulticastEnabled; // enable use of multicast for sending requests to slaves
  355. extern bool preloadOnceData;
  356. extern bool reloadRetriesFailed;
  357. extern unsigned roxiePort; // If listening on multiple, this is the first. Used for lock cascading
  358. extern unsigned udpMulticastBufferSize;
  359. extern size32_t diskReadBufferSize;
  360. extern bool nodeCachePreload;
  361. extern unsigned nodeCacheMB;
  362. extern unsigned leafCacheMB;
  363. extern unsigned blobCacheMB;
  364. struct PartNoType
  365. {
  366. unsigned short partNo; // _n_of_400
  367. unsigned short fileNo; // superkey file number
  368. };
  369. extern unsigned statsExpiryTime;
  370. extern time_t startupTime;
  371. extern unsigned miscDebugTraceLevel;
  372. extern bool fieldTranslationEnabled;
  373. extern unsigned defaultParallelJoinPreload;
  374. extern unsigned defaultConcatPreload;
  375. extern unsigned defaultFetchPreload;
  376. extern unsigned defaultFullKeyedJoinPreload;
  377. extern unsigned defaultKeyedJoinPreload;
  378. extern unsigned defaultPrefetchProjectPreload;
  379. extern bool defaultCheckingHeap;
  380. extern unsigned slaveQueryReleaseDelaySeconds;
  381. extern StringBuffer logDirectory;
  382. extern StringBuffer pluginDirectory;
  383. extern StringBuffer pluginsList;
  384. extern StringBuffer queryDirectory;
  385. extern StringBuffer codeDirectory;
  386. extern StringBuffer tempDirectory;
  387. #undef UNIMPLEMENTED
  388. #undef throwUnexpected
  389. extern void doUNIMPLEMENTED(unsigned line, const char *file);
  390. #define UNIMPLEMENTED { doUNIMPLEMENTED(__LINE__, __FILE__); throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "UNIMPLEMENTED"); }
  391. #define throwUnexpected() throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal Error at %s(%d)", __FILE__, __LINE__)
  392. extern IRoxieQueryPacket *createRoxiePacket(void *data, unsigned length);
  393. extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &donor); // note: donor is empty after call
  394. extern void dumpBuffer(const char *title, const void *buf, unsigned recSize);
  395. inline unsigned getBondedChannel(unsigned partNo)
  396. {
  397. return ((partNo - 1) % numChannels) + 1;
  398. }
  399. extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
  400. extern unsigned getNextInstanceId();
  401. extern void closedown();
  402. extern void saveTopology();
  403. #define LOGGING_INTERCEPTED 0x01
  404. #define LOGGING_TIMEACTIVITIES 0x02
  405. #define LOGGING_DEBUGGERACTIVE 0x04
  406. #define LOGGING_BLIND 0x08
  407. #define LOGGING_TRACELEVELSET 0x10
  408. #define LOGGING_CHECKINGHEAP 0x20
  409. #define LOGGING_FLAGSPRESENT 0x40
  410. #define LOGGING_WUID 0x80
  411. class LogItem : public CInterface
  412. {
  413. friend class SlaveContextLogger;
  414. TracingCategory category;
  415. StringAttr prefix;
  416. StringAttr text;
  417. unsigned time;
  418. unsigned channel;
  419. unsigned statCode;
  420. unsigned __int64 statValue;
  421. unsigned statCount;
  422. public:
  423. LogItem(TracingCategory _category, const char *_prefix, unsigned _time, unsigned _channel, const char *_text)
  424. : category(_category), prefix(_prefix), time(_time), channel(_channel), text(_text)
  425. {
  426. statCode = 0;
  427. statValue = 0;
  428. statCount = 0;
  429. }
  430. LogItem(TracingCategory _category, unsigned _channel, unsigned _statCode, unsigned __int64 _statValue, unsigned _count)
  431. : category(_category), channel(_channel), statCode(_statCode), statValue(_statValue), statCount(_count)
  432. {
  433. time = 0;
  434. }
  435. inline bool isStatistics() const
  436. {
  437. return category==LOG_STATVALUES;
  438. }
  439. inline unsigned getStatCode() const
  440. {
  441. return statCode;
  442. }
  443. inline unsigned __int64 getStatValue() const
  444. {
  445. return statValue;
  446. }
  447. inline unsigned __int64 getStatCount() const
  448. {
  449. return statCount;
  450. }
  451. LogItem(MemoryBuffer &buf)
  452. {
  453. char c; buf.read(c); category = (TracingCategory) c;
  454. buf.read(channel);
  455. if (category==LOG_STATVALUES)
  456. {
  457. time = 0;
  458. buf.read(statCode);
  459. buf.read(statValue);
  460. buf.read(statCount);
  461. }
  462. else
  463. {
  464. buf.read(prefix);
  465. buf.read(text);
  466. buf.read(time);
  467. statCode = 0;
  468. statValue = 0;
  469. statCount = 0;
  470. }
  471. }
  472. void serialize(MemoryBuffer &buf)
  473. {
  474. buf.append((char) category);
  475. buf.append(channel);
  476. if (category==LOG_STATVALUES)
  477. {
  478. buf.append(statCode);
  479. buf.append(statValue);
  480. buf.append(statCount);
  481. }
  482. else
  483. {
  484. buf.append(prefix);
  485. buf.append(text);
  486. buf.append(time);
  487. }
  488. }
  489. static const char *getCategoryString(TracingCategory c)
  490. {
  491. switch (c)
  492. {
  493. case LOG_TRACING: return "TRACE";
  494. case LOG_ERROR: return "ERROR";
  495. case LOG_TIMING: return "TIMING";
  496. case LOG_STATISTICS: return "STATISTICS";
  497. case LOG_STATVALUES: return "STATVALUES";
  498. default: return "UNKNOWN";
  499. }
  500. }
  501. void toXML(StringBuffer &out)
  502. {
  503. out.append("<Log><Category>").append(getCategoryString(category)).append("</Category>");
  504. out.append("<Channel>").append(channel).append("</Channel>");
  505. out.append("<Time>").append(time/1000).append('.').appendf("%03d", time % 1000).append("</Time>");
  506. if (prefix)
  507. {
  508. out.append("<Prefix>");
  509. encodeXML(prefix, out);
  510. out.append("</Prefix>");
  511. }
  512. if (text)
  513. {
  514. out.append("<Text>");
  515. encodeXML(text, out);
  516. out.append("</Text>");
  517. }
  518. out.append("</Log>\n");
  519. }
  520. void outputXML(IXmlStreamFlusher &out)
  521. {
  522. StringBuffer b;
  523. toXML(b);
  524. out.flushXML(b, true);
  525. }
  526. };
  527. extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val);
  528. extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val);
  529. class StatsCollector : public CInterface, implements IInterface
  530. {
  531. unsigned __int64 *cumulative;
  532. unsigned *counts;
  533. mutable SpinLock lock;
  534. bool aborted;
  535. inline void init()
  536. {
  537. if (!cumulative)
  538. {
  539. cumulative = new unsigned __int64[STATS_SIZE];
  540. counts = new unsigned [STATS_SIZE];
  541. memset(cumulative, 0, STATS_SIZE * sizeof(cumulative[0]));
  542. memset(counts, 0, STATS_SIZE * sizeof(counts[0]));
  543. }
  544. }
  545. inline static const char *getStatCombineModeName(StatisticCombineType type)
  546. {
  547. switch(type)
  548. {
  549. case STATSMODE_COMBINE_SUM: return "sum";
  550. case STATSMODE_COMBINE_MAX: return "max";
  551. case STATSMODE_COMBINE_MIN: return "min";
  552. default:
  553. throwUnexpected();
  554. }
  555. }
  556. public:
  557. IMPLEMENT_IINTERFACE;
  558. StatsCollector()
  559. {
  560. // CAUTION: this object is reused by threadpooling - so be sure to update reset() method too!
  561. cumulative = NULL;
  562. counts = NULL;
  563. aborted = false;
  564. // CAUTION: this object is reused by threadpooling - so be sure to update reset() method too!
  565. }
  566. ~StatsCollector()
  567. {
  568. if (cumulative) delete [] cumulative;
  569. if (counts) delete [] counts;
  570. }
  571. void noteStatistic(unsigned statIdx, unsigned __int64 value, unsigned count)
  572. {
  573. SpinBlock b(lock);
  574. if (aborted)
  575. throw MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
  576. init();
  577. assert (statIdx < STATS_SIZE);
  578. switch (getStatCombineMode(statIdx))
  579. {
  580. case STATSMODE_COMBINE_SUM:
  581. cumulative[statIdx] += value;
  582. break;
  583. case STATSMODE_COMBINE_MAX:
  584. if (!counts[statIdx] || cumulative[statIdx] <= value)
  585. cumulative[statIdx] = value;
  586. break;
  587. case STATSMODE_COMBINE_MIN:
  588. if (!counts[statIdx] || cumulative[statIdx] >= value)
  589. cumulative[statIdx] = value;
  590. break;
  591. }
  592. counts[statIdx] += count;
  593. }
  594. void merge(const StatsCollector &from)
  595. {
  596. SpinBlock b(from.lock);
  597. if (from.cumulative)
  598. {
  599. for (unsigned i = 0; i < STATS_SIZE; i++)
  600. {
  601. if (from.counts[i])
  602. noteStatistic(i, from.cumulative[i], from.counts[i]);
  603. }
  604. }
  605. }
  606. void dumpStats(const IRoxieContextLogger &logctx) const
  607. {
  608. SpinBlock b(lock);
  609. if (cumulative)
  610. {
  611. for (unsigned i = 0; i < STATS_SIZE; i++)
  612. {
  613. if (counts[i])
  614. {
  615. StringBuffer prefix, text;
  616. logctx.getLogPrefix(prefix);
  617. text.appendf("%s - %"I64F"d (%d instances)", getStatName(i), cumulative[i], counts[i]);
  618. logctx.CTXLOGa(LOG_STATISTICS, prefix.str(), text.str());
  619. }
  620. }
  621. }
  622. }
  623. void dumpStats(IWorkUnit *wu) const
  624. {
  625. SpinBlock b(lock);
  626. if (cumulative)
  627. {
  628. for (unsigned i = 0; i < STATS_SIZE; i++)
  629. {
  630. if (counts[i])
  631. wu->setStatistic("roxie", "workunit", getStatShortName(i), getStatName(i), getStatMeasure(i), cumulative[i], counts[i], 0, false);
  632. }
  633. }
  634. }
  635. void toXML(StringBuffer &reply) const
  636. {
  637. SpinBlock b(lock);
  638. if (cumulative)
  639. {
  640. for (unsigned i = 0; i < STATS_SIZE; i++)
  641. {
  642. if (counts[i])
  643. {
  644. putStatsValue(reply, getStatName(i), getStatCombineModeName(getStatCombineMode(i)), counts[i]);
  645. }
  646. }
  647. }
  648. }
  649. void getNodeProgressInfo(IPropertyTree &node) const
  650. {
  651. SpinBlock b(lock);
  652. if (cumulative)
  653. {
  654. for (unsigned i = 0; i < STATS_SIZE; i++)
  655. {
  656. if (counts[i])
  657. {
  658. putStatsValue(&node, getStatShortName(i), getStatCombineModeName(getStatCombineMode(i)), counts[i]);
  659. }
  660. }
  661. }
  662. }
  663. void cascade(unsigned channel, const IRoxieContextLogger &logctx) const
  664. {
  665. SpinBlock b(lock);
  666. if (cumulative)
  667. {
  668. for (unsigned i = 0; i < STATS_SIZE; i++)
  669. {
  670. if (counts[i])
  671. {
  672. logctx.CTXLOGl(new LogItem(LOG_STATVALUES, channel, i, cumulative[i], counts[i]));
  673. }
  674. }
  675. }
  676. }
  677. void reset()
  678. {
  679. SpinBlock b(lock);
  680. if (cumulative) delete [] cumulative;
  681. if (counts) delete [] counts;
  682. cumulative = NULL;
  683. counts = NULL;
  684. aborted = false;
  685. }
  686. void requestAbort()
  687. {
  688. SpinBlock b(lock);
  689. aborted = true;
  690. }
  691. };
  692. class ContextLogger : public CInterface, implements IRoxieContextLogger
  693. {
  694. protected:
  695. mutable CriticalSection crit;
  696. unsigned start;
  697. unsigned ctxTraceLevel;
  698. mutable StatsCollector stats;
  699. mutable ITimeReporter *timeReporter;
  700. unsigned channel;
  701. public: // Not very clean but I don't care
  702. bool intercept;
  703. bool blind;
  704. mutable CIArrayOf<LogItem> log;
  705. private:
  706. ContextLogger(const ContextLogger &); // Disable copy constructor
  707. public:
  708. IMPLEMENT_IINTERFACE;
  709. ContextLogger()
  710. {
  711. ctxTraceLevel = traceLevel;
  712. intercept = false;
  713. blind = false;
  714. timeReporter = createStdTimeReporter();
  715. start = msTick();
  716. channel = 0;
  717. }
  718. ~ContextLogger()
  719. {
  720. ::Release(timeReporter);
  721. }
  722. void outputXML(IXmlStreamFlusher &out)
  723. {
  724. CriticalBlock b(crit);
  725. ForEachItemIn(idx, log)
  726. {
  727. log.item(idx).outputXML(out);
  728. }
  729. };
  730. virtual void CTXLOG(const char *format, ...) const __attribute__((format(printf, 2, 3)))
  731. {
  732. va_list args;
  733. va_start(args, format);
  734. CTXLOGva(format, args);
  735. va_end(args);
  736. }
  737. virtual void CTXLOGva(const char *format, va_list args) const
  738. {
  739. StringBuffer prefix, text;
  740. getLogPrefix(prefix);
  741. text.valist_appendf(format, args);
  742. DBGLOG("[%s] %s", prefix.str(), text.str());
  743. if (intercept)
  744. {
  745. CriticalBlock b(crit);
  746. log.append(* new LogItem(LOG_TRACING, prefix, msTick() - start, channel, text));
  747. flush(false, false);
  748. }
  749. }
  750. virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
  751. {
  752. if (category == LOG_TRACING)
  753. DBGLOG("[%s] %s", prefix, text);
  754. else
  755. DBGLOG("[%s] %s: %s", prefix, LogItem::getCategoryString(category), text);
  756. if (intercept)
  757. {
  758. CriticalBlock b(crit);
  759. log.append(* new LogItem(category, prefix, msTick() - start, channel, text));
  760. flush(false, false);
  761. }
  762. }
  763. virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const __attribute__((format(printf, 5, 6)))
  764. {
  765. va_list args;
  766. va_start(args, format);
  767. CTXLOGaeva(E, file, line, 0, format, args);
  768. va_end(args);
  769. }
  770. virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
  771. {
  772. CTXLOGaeva(E, file, line, 0, format, args);
  773. }
  774. virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const __attribute__((format(printf, 6, 7)))
  775. {
  776. va_list args;
  777. va_start(args, format);
  778. CTXLOGaeva(E, file, line, prefix, format, args);
  779. va_end(args);
  780. }
  781. virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
  782. {
  783. StringBuffer text;
  784. text.append("ERROR");
  785. if (E)
  786. text.append(": ").append(E->errorCode());
  787. if (file)
  788. text.appendf(": %s(%d) ", file, line);
  789. if (E)
  790. E->errorMessage(text.append(": "));
  791. if (format)
  792. {
  793. text.append(": ").valist_appendf(format, args);
  794. }
  795. LOG(MCoperatorProgress, unknownJob, "[%s] %s", prefix, text.str());
  796. if (intercept)
  797. {
  798. CriticalBlock b(crit);
  799. log.append(* new LogItem(LOG_ERROR, prefix, msTick() - start, channel, text));
  800. flush(false, false);
  801. }
  802. }
  803. virtual void CTXLOGl(LogItem *logItem) const
  804. {
  805. // NOTE - we don't actually print anything to logfile here - was already printed on slave
  806. CriticalBlock b(crit);
  807. log.append(*logItem);
  808. flush(false, false);
  809. }
  810. void setIntercept(bool _intercept)
  811. {
  812. intercept = _intercept;
  813. }
  814. void setBlind(bool _blind)
  815. {
  816. blind = _blind;
  817. }
  818. void setTraceLevel(unsigned _traceLevel)
  819. {
  820. ctxTraceLevel = _traceLevel;
  821. }
  822. virtual void flush(bool closing, bool aborted) const
  823. {
  824. }
  825. void dumpStats() const
  826. {
  827. stats.dumpStats(*this);
  828. }
  829. virtual void dumpStats(IWorkUnit *wu) const
  830. {
  831. stats.dumpStats(wu);
  832. }
  833. virtual bool isIntercepted() const
  834. {
  835. return intercept;
  836. }
  837. virtual bool isBlind() const
  838. {
  839. return blind;
  840. }
  841. virtual void noteStatistic(unsigned statCode, unsigned __int64 value, unsigned count) const
  842. {
  843. stats.noteStatistic(statCode, value, count);
  844. }
  845. virtual unsigned queryTraceLevel() const
  846. {
  847. return ctxTraceLevel;
  848. }
  849. inline ITimeReporter *queryTimer() const
  850. {
  851. return timeReporter;
  852. }
  853. void reset()
  854. {
  855. stats.reset();
  856. timer->reset();
  857. }
  858. };
  859. class StringContextLogger : public ContextLogger
  860. {
  861. StringAttr id;
  862. public:
  863. StringContextLogger(const char *_id) : id(_id)
  864. {
  865. }
  866. StringContextLogger()
  867. {
  868. }
  869. virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
  870. {
  871. return ret.append(id);
  872. }
  873. void set(const char *_id)
  874. {
  875. reset();
  876. id.set(_id);
  877. }
  878. };
  879. class SimpleContextLogger : public ContextLogger
  880. {
  881. unsigned instanceId;
  882. public:
  883. SimpleContextLogger(unsigned _instanceId) : instanceId(_instanceId)
  884. {
  885. }
  886. virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
  887. {
  888. return ret.append(instanceId);
  889. }
  890. };
  891. class SlaveContextLogger : public StringContextLogger
  892. {
  893. mutable Owned<IMessagePacker> output;
  894. mutable bool anyOutput;
  895. bool traceActivityTimes;
  896. bool debuggerActive;
  897. bool checkingHeap;
  898. IpAddress ip;
  899. StringAttr wuid;
  900. public:
  901. SlaveContextLogger();
  902. SlaveContextLogger(IRoxieQueryPacket *packet);
  903. void set(IRoxieQueryPacket *packet);
  904. virtual void flush(bool closing, bool aborted) const;
  905. inline bool queryTraceActivityTimes() const { return traceActivityTimes; }
  906. inline bool queryDebuggerActive() const { return debuggerActive; }
  907. inline bool queryCheckingHeap() const { return checkingHeap; }
  908. inline void setDebuggerActive(bool _active) { debuggerActive = _active; }
  909. inline const StatsCollector &queryStats() const
  910. {
  911. return stats;
  912. }
  913. inline void requestAbort()
  914. {
  915. stats.requestAbort();
  916. }
  917. inline const char *queryWuid()
  918. {
  919. return wuid.get();
  920. }
  921. };
  922. #endif