logthread.hpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _LOGTHREAD_HPP__
  14. #define _LOGTHREAD_HPP__
  15. #include "jthread.hpp"
  16. #include "jqueue.tpp"
  17. #include "loggingagentbase.hpp"
  18. #include "LogFailSafe.hpp"
  19. #define DEFAULTREADLOGREQUESTWAITSECOND 15 //How often to read log request from a tank file
  20. #define DEFAULTPENDINGLOGBUFFERSIZE 100 //Max. # of log requests the pending log buffer store before flushing out.
  21. class CLogRequestReaderSettings : public CSimpleInterface
  22. {
  23. public:
  24. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  25. CLogRequestReaderSettings() { };
  26. StringAttr tankFileDir;
  27. StringBuffer ackedFileList, ackedLogRequestFile;
  28. unsigned waitSeconds = DEFAULTREADLOGREQUESTWAITSECOND;
  29. unsigned pendingLogBufferSize = DEFAULTPENDINGLOGBUFFERSIZE;
  30. };
  31. class CLogThread;
  32. class CLogRequestReader : public CInterface, implements IThreaded
  33. {
  34. Owned<CLogRequestReaderSettings> settings;
  35. StringArray newAckedLogFiles;
  36. StringAttr lastTankFile;
  37. offset_t lastTankFilePos = 0;
  38. std::set<std::string> ackedLogFileCheckList, ackedLogRequests;
  39. GuidSet pendingLogGUIDs;
  40. GuidMap pendingLogs; //used every time when go through tank files to avoid duplicated log requests
  41. Linked<CLogThread> logThread;
  42. CThreaded threaded;
  43. bool stopping = false;
  44. bool paused = false;
  45. Semaphore sem;
  46. CriticalSection crit;
  47. void readAcked(const char* fileName, std::set<std::string>& acked);
  48. void readLogRequest();
  49. void findTankFileNotFinished(StringAttr& tankFileNotFinished);
  50. StringBuffer& getTankFileTimeString(const char* fileName, StringBuffer& timeString);
  51. bool readLogRequestsFromTankFile(const char* fileName, StringAttr& tankFileNotFinished, offset_t& tankFileNotFinishedPos);
  52. offset_t getReadFilePos(const char* fileName);
  53. bool parseLogRequest(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& data);
  54. void addToAckedLogFileList(const char* fileName, const char* fileNameWithPath);
  55. void addPendingLogsToQueue();
  56. void updateAckedFileList();
  57. void updateAckedLogRequestList();
  58. public:
  59. CLogRequestReader(CLogRequestReaderSettings* _settings, CLogThread* _logThread)
  60. : settings(_settings), logThread(_logThread), threaded("LogRequestReader")
  61. {
  62. threaded.init(this);
  63. };
  64. ~CLogRequestReader();
  65. virtual void threadmain() override;
  66. void addACK(const char* GUID);
  67. virtual CLogRequestReaderSettings* getSettings() { return settings; };
  68. void setPause(bool pause)
  69. {
  70. paused = pause;
  71. };
  72. };
  73. interface IUpdateLogThread : extends IInterface
  74. {
  75. virtual int run() = 0;
  76. virtual void start() = 0;
  77. virtual void stop() = 0;
  78. virtual IEspLogAgent* getLogAgent() = 0;
  79. virtual bool hasService(LOGServiceType service) = 0;
  80. virtual bool queueLog(IEspUpdateLogRequest* logRequest) = 0;
  81. virtual bool queueLog(IEspUpdateLogRequestWrap* logRequest) = 0;
  82. virtual void sendLog() = 0;
  83. virtual CLogRequestReader* getLogRequestReader() = 0;
  84. };
  85. class CLogThread : public Thread , implements IUpdateLogThread
  86. {
  87. bool stopping;
  88. StringAttr agentName;
  89. int maxLogQueueLength;
  90. int signalGrowingQueueAt;
  91. unsigned maxLogRetries; // Max. # of attempts to send log message
  92. Owned<IEspLogAgent> logAgent;
  93. QueueOf<IInterface, false> logQueue;
  94. CriticalSection logQueueCrit;
  95. Semaphore m_sem;
  96. bool ensureFailSafe;
  97. Owned<ILogFailSafe> logFailSafe;
  98. struct tm m_startTime;
  99. StringAttr tankFileDir;
  100. Owned<CLogRequestReader> logRequestReader;
  101. unsigned serializeLogRequestContent(IEspUpdateLogRequestWrap* request, StringBuffer& logData);
  102. bool enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid);
  103. void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
  104. IEspUpdateLogRequestWrap* readJobQueue();
  105. IEspUpdateLogRequestWrap* checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest);
  106. void checkAndCreateFile(const char* fileName);
  107. public:
  108. IMPLEMENT_IINTERFACE;
  109. CLogThread();
  110. CLogThread(IPropertyTree* _agentConfig, const char* _service, const char* _agentName, IEspLogAgent* _logAgent = nullptr, const char* _tankFile = nullptr);
  111. virtual ~CLogThread();
  112. IEspLogAgent* getLogAgent() {return logAgent;};
  113. virtual CLogRequestReader* getLogRequestReader() {return logRequestReader;};
  114. bool hasService(LOGServiceType service)
  115. {
  116. return logAgent->hasService(service);
  117. }
  118. int run();
  119. void start();
  120. void stop();
  121. bool queueLog(IEspUpdateLogRequest* logRequest);
  122. bool queueLog(IEspUpdateLogRequestWrap* logRequest);
  123. void sendLog();
  124. IEspUpdateLogRequestWrap* unserializeLogRequestContent(const char* logData, bool decompress);
  125. void checkPendingLogs(bool oneRecordOnly=false);
  126. void checkRollOver();
  127. };
  128. extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName, const char* _tankFile, IEspLogAgent* _logAgent);
  129. #endif // _LOGTHREAD_HPP__