logthread.hpp 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _LOGTHREAD_HPP__
  14. #define _LOGTHREAD_HPP__
  15. #include "jthread.hpp"
  16. #include "jqueue.tpp"
  17. #include "loggingagentbase.hpp"
  18. #include "LogFailSafe.hpp"
  19. #define DEFAULTREADLOGREQUESTWAITSECOND 15 //How often to read log request from a tank file
  20. #define DEFAULTPENDINGLOGBUFFERSIZE 100 //Max. # of log requests the pending log buffer store before flushing out.
  21. class CLogRequestReaderSettings : public CSimpleInterface
  22. {
  23. public:
  24. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  25. CLogRequestReaderSettings() { };
  26. StringAttr tankFileDir;
  27. StringBuffer ackedFileList, ackedLogRequestFile;
  28. unsigned waitSeconds = DEFAULTREADLOGREQUESTWAITSECOND;
  29. unsigned pendingLogBufferSize = DEFAULTPENDINGLOGBUFFERSIZE;
  30. };
  31. class CLogThread;
  32. interface ILogRequestReader : extends IThreaded
  33. {
  34. virtual CLogRequestReaderSettings* getSettings() = 0;
  35. virtual void setPause(bool pause) = 0;
  36. virtual void reportAckedLogFiles(StringArray& ackedLogFiles) = 0;
  37. virtual void removeUnknownAckedLogFiles(StringArray& ackedLogFiles) = 0;
  38. virtual void cleanAckedLogFiles(StringArray& fileNames) = 0;
  39. };
  40. class CLogRequestReader : public CInterface, implements ILogRequestReader
  41. {
  42. Owned<CLogRequestReaderSettings> settings;
  43. StringArray newAckedLogFiles;
  44. StringAttr lastTankFile;
  45. offset_t lastTankFilePos = 0;
  46. std::set<std::string> ackedLogFileCheckList, ackedLogRequests;
  47. GuidSet pendingLogGUIDs;
  48. GuidMap pendingLogs; //used every time when go through tank files to avoid duplicated log requests
  49. Linked<CLogThread> logThread;
  50. CThreaded threaded;
  51. bool stopping = false;
  52. bool paused = false;
  53. Semaphore sem;
  54. CriticalSection crit;
  55. void readAcked(const char* fileName, std::set<std::string>& acked);
  56. void readLogRequest();
  57. void findTankFileNotFinished(StringAttr& tankFileNotFinished);
  58. StringBuffer& getTankFileTimeString(const char* fileName, StringBuffer& timeString);
  59. bool readLogRequestsFromTankFile(const char* fileName, StringAttr& tankFileNotFinished, offset_t& tankFileNotFinishedPos);
  60. offset_t getReadFilePos(const char* fileName);
  61. bool parseLogRequest(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& data);
  62. void addToAckedLogFileList(const char* fileName, const char* fileNameWithPath);
  63. void addPendingLogsToQueue();
  64. void updateAckedFileList();
  65. void updateAckedLogRequestList();
  66. void addNewAckedFileList(const char* list, StringArray& fileNames);
  67. public:
  68. CLogRequestReader(CLogRequestReaderSettings* _settings, CLogThread* _logThread)
  69. : settings(_settings), logThread(_logThread), threaded("LogRequestReader")
  70. {
  71. threaded.init(this);
  72. };
  73. ~CLogRequestReader();
  74. virtual void threadmain() override;
  75. void addACK(const char* GUID);
  76. virtual CLogRequestReaderSettings* getSettings() override { return settings; };
  77. virtual void setPause(bool pause) override { paused = pause; };
  78. virtual void reportAckedLogFiles(StringArray& ackedLogFiles) override;
  79. virtual void removeUnknownAckedLogFiles(StringArray& ackedLogFiles) override;
  80. virtual void cleanAckedLogFiles(StringArray& fileNames) override;
  81. };
  82. interface IUpdateLogThread : extends IInterface
  83. {
  84. virtual int run() = 0;
  85. virtual void start() = 0;
  86. virtual void stop() = 0;
  87. virtual IEspLogAgent* getLogAgent() = 0;
  88. virtual bool hasService(LOGServiceType service) = 0;
  89. virtual bool queueLog(IEspUpdateLogRequest* logRequest) = 0;
  90. virtual bool queueLog(IEspUpdateLogRequestWrap* logRequest) = 0;
  91. virtual void sendLog() = 0;
  92. virtual ILogRequestReader* getLogRequestReader() = 0;
  93. };
  94. class CLogThread : public Thread , implements IUpdateLogThread
  95. {
  96. bool stopping;
  97. StringAttr agentName;
  98. int maxLogQueueLength;
  99. int signalGrowingQueueAt;
  100. unsigned maxLogRetries; // Max. # of attempts to send log message
  101. Owned<IEspLogAgent> logAgent;
  102. QueueOf<IInterface, false> logQueue;
  103. CriticalSection logQueueCrit;
  104. Semaphore m_sem;
  105. bool ensureFailSafe;
  106. Owned<ILogFailSafe> logFailSafe;
  107. struct tm m_startTime;
  108. StringAttr tankFileDir;
  109. Owned<CLogRequestReader> logRequestReader;
  110. unsigned serializeLogRequestContent(IEspUpdateLogRequestWrap* request, StringBuffer& logData);
  111. bool enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid);
  112. void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
  113. IEspUpdateLogRequestWrap* readJobQueue();
  114. IEspUpdateLogRequestWrap* checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest);
  115. void checkAndCreateFile(const char* fileName);
  116. public:
  117. IMPLEMENT_IINTERFACE;
  118. CLogThread();
  119. CLogThread(IPropertyTree* _agentConfig, const char* _service, const char* _agentName, IEspLogAgent* _logAgent = nullptr, const char* _tankFile = nullptr);
  120. virtual ~CLogThread();
  121. IEspLogAgent* getLogAgent() {return logAgent;};
  122. virtual CLogRequestReader* getLogRequestReader() {return logRequestReader;};
  123. bool hasService(LOGServiceType service)
  124. {
  125. return logAgent->hasService(service);
  126. }
  127. int run();
  128. void start();
  129. void stop();
  130. bool queueLog(IEspUpdateLogRequest* logRequest);
  131. bool queueLog(IEspUpdateLogRequestWrap* logRequest);
  132. void sendLog();
  133. IEspUpdateLogRequestWrap* unserializeLogRequestContent(const char* logData, bool decompress);
  134. void checkPendingLogs(bool oneRecordOnly=false);
  135. void checkRollOver();
  136. };
  137. extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName, const char* _tankFile, IEspLogAgent* _logAgent);
  138. #endif // _LOGTHREAD_HPP__