|
@@ -86,7 +86,7 @@ void CWSDecoupledLogEx::init(IPropertyTree* cfg, const char* process, const char
|
|
|
if(!logThread)
|
|
|
throw MakeStringException(-1, "Failed to create update log thread for %s", agentName);
|
|
|
|
|
|
- CLogRequestReader* logRequestReader = logThread->getLogRequestReader();
|
|
|
+ ILogRequestReader* logRequestReader = logThread->getLogRequestReader();
|
|
|
if (!logRequestReader)
|
|
|
throw MakeStringException(-1, "CLogRequestReader not found for %s.", agentName);
|
|
|
|
|
@@ -100,40 +100,65 @@ void CWSDecoupledLogEx::init(IPropertyTree* cfg, const char* process, const char
|
|
|
|
|
|
bool CWSDecoupledLogEx::onGetLogAgentSetting(IEspContext& context, IEspGetLogAgentSettingRequest& req, IEspGetLogAgentSettingResponse& resp)
|
|
|
{
|
|
|
+ LogAgentAction action;
|
|
|
+ action.type = CLogAgentActions_GetSettings;
|
|
|
+
|
|
|
+ CLogAgentActionResults results;
|
|
|
+ WSDecoupledLogGetSettings act(action, results);
|
|
|
+ act.doAction(context, logGroups, req.getGroups());
|
|
|
+ resp.setSettings(results.queryGroupSettings());
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+bool CWSDecoupledLogEx::onPauseLog(IEspContext& context, IEspPauseLogRequest& req, IEspPauseLogResponse& resp)
|
|
|
+{
|
|
|
+ LogAgentAction action;
|
|
|
+ action.type = req.getPause() ? CLogAgentActions_Pause : CLogAgentActions_Resume;
|
|
|
+
|
|
|
+ CLogAgentActionResults results;
|
|
|
+ WSDecoupledLogPause act(action, results);
|
|
|
+ act.doAction(context, logGroups, req.getGroups());
|
|
|
+
|
|
|
+ resp.setStatuses(results.queryGroupStatus());
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+bool CWSDecoupledLogEx::onGetAckedLogFiles(IEspContext& context, IEspGetAckedLogFilesRequest& req, IEspGetAckedLogFilesResponse& resp)
|
|
|
+{
|
|
|
+ LogAgentAction action;
|
|
|
+ action.type = CLogAgentActions_GetAckedLogFileNames;
|
|
|
+
|
|
|
+ CLogAgentActionResults results;
|
|
|
+ WSDecoupledLogGetAckedLogFileNames act(action, results);
|
|
|
+ act.doAction(context, logGroups, req.getGroups());
|
|
|
+
|
|
|
+ resp.setAckedLogFilesInGroups(results.queryTankFilesInGroup());
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+bool CWSDecoupledLogEx::onCleanAckedFiles(IEspContext& context, IEspCleanAckedFilesRequest& req, IEspCleanAckedFilesResponse& resp)
|
|
|
+{
|
|
|
try
|
|
|
{
|
|
|
- IArrayOf<IEspLogAgentGroupSetting> groupSettingResp;
|
|
|
- IArrayOf<IConstLogAgentGroup>& groups = req.getGroups();
|
|
|
- if (!groups.ordinality())
|
|
|
- {
|
|
|
- for (auto ml : logGroups)
|
|
|
- getSettingsForLoggingAgentsInGroup(ml.second, nullptr, groupSettingResp);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- ForEachItemIn(i, groups)
|
|
|
- {
|
|
|
- IConstLogAgentGroup& g = groups.item(i);
|
|
|
- const char* gName = g.getGroupName();
|
|
|
- if (isEmptyString(gName))
|
|
|
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Group name not specified.");
|
|
|
-
|
|
|
- auto match = logGroups.find(gName);
|
|
|
- if (match != logGroups.end())
|
|
|
- {
|
|
|
- StringArray& agentNames = g.getAgentNames();
|
|
|
- getSettingsForLoggingAgentsInGroup(match->second, &agentNames, groupSettingResp);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Owned<IEspLogAgentGroupSetting> groupSetting = createLogAgentGroupSetting();
|
|
|
- groupSetting->setGroupName(gName);
|
|
|
- groupSetting->setGroupStatus("NotFound");
|
|
|
- groupSettingResp.append(*groupSetting.getClear());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- resp.setSettings(groupSettingResp);
|
|
|
+ const char* groupName = req.getGroupName();
|
|
|
+ if (isEmptyString(groupName))
|
|
|
+ throw makeStringException(ECLWATCH_INVALID_INPUT, "Group name not specified.");
|
|
|
+ auto match = logGroups.find(groupName);
|
|
|
+ if (match == logGroups.end())
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_INPUT, "Group %s not found.", groupName);
|
|
|
+
|
|
|
+ LogAgentAction action;
|
|
|
+ action.type = CLogAgentActions_CleanAckedLogFiles;
|
|
|
+ action.fileNames = &req.getFileNames();
|
|
|
+ if (!action.fileNames->length())
|
|
|
+ throw makeStringException(ECLWATCH_INVALID_INPUT, "File name not specified.");
|
|
|
+
|
|
|
+ CLogAgentActionResults results;
|
|
|
+ WSDecoupledLogCleanAckedLogFiles act(action, results);
|
|
|
+ act.doActionInGroup(match->second, nullptr);
|
|
|
}
|
|
|
catch(IException* e)
|
|
|
{
|
|
@@ -142,157 +167,184 @@ bool CWSDecoupledLogEx::onGetLogAgentSetting(IEspContext& context, IEspGetLogAge
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-bool CWSDecoupledLogEx::onPauseLog(IEspContext& context, IEspPauseLogRequest& req, IEspPauseLogResponse& resp)
|
|
|
+void WSDecoupledLogAction::doAction(IEspContext& context, std::map<std::string, Owned<WSDecoupledLogAgentGroup>>& allGroups,
|
|
|
+ IArrayOf<IConstLogAgentGroup>& groupsReq)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- IArrayOf<IEspLogAgentGroupStatus> groupStatusResp;
|
|
|
- bool pause = req.getPause();
|
|
|
- IArrayOf<IConstLogAgentGroup>& groups = req.getGroups();
|
|
|
- if (!groups.ordinality())
|
|
|
+ if (groupsReq.ordinality())
|
|
|
{
|
|
|
- for (auto ml : logGroups)
|
|
|
- pauseLoggingAgentsInGroup(ml.second, nullptr, pause, groupStatusResp);
|
|
|
+ checkGroupInput(allGroups, groupsReq);
|
|
|
+ ForEachItemIn(i, groupsReq)
|
|
|
+ {
|
|
|
+ IConstLogAgentGroup& g = groupsReq.item(i);
|
|
|
+ auto match = allGroups.find(g.getGroupName());
|
|
|
+ StringArray& agentNames = g.getAgentNames();
|
|
|
+ doActionInGroup(match->second, &agentNames);
|
|
|
+ }
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ForEachItemIn(i, groups)
|
|
|
+ for (auto ml : allGroups)
|
|
|
{
|
|
|
- IConstLogAgentGroup& g = groups.item(i);
|
|
|
- const char* gName = g.getGroupName();
|
|
|
- if (isEmptyString(gName))
|
|
|
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Group name not specified.");
|
|
|
-
|
|
|
- auto match = logGroups.find(gName);
|
|
|
- if (match != logGroups.end())
|
|
|
- {
|
|
|
- StringArray& agentNames = g.getAgentNames();
|
|
|
- pauseLoggingAgentsInGroup(match->second, &agentNames, pause, groupStatusResp);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Owned<IEspLogAgentGroupStatus> groupStatus = createLogAgentGroupStatus();
|
|
|
- groupStatus->setGroupName(gName);
|
|
|
- groupStatus->setGroupStatus("NotFound");
|
|
|
- groupStatusResp.append(*groupStatus.getClear());
|
|
|
- }
|
|
|
+ doActionInGroup(ml.second, nullptr);
|
|
|
}
|
|
|
}
|
|
|
- resp.setStatuses(groupStatusResp);
|
|
|
}
|
|
|
catch(IException* e)
|
|
|
{
|
|
|
FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
|
|
|
}
|
|
|
- return true;
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::getSettingsForLoggingAgentsInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames,
|
|
|
- IArrayOf<IEspLogAgentGroupSetting>& groupSettingResp)
|
|
|
+void WSDecoupledLogAction::checkGroupInput(std::map<std::string, Owned<WSDecoupledLogAgentGroup>>& allGroups,
|
|
|
+ IArrayOf<IConstLogAgentGroup>& groupsReq)
|
|
|
+{
|
|
|
+ ForEachItemIn(i, groupsReq)
|
|
|
+ {
|
|
|
+ IConstLogAgentGroup& g = groupsReq.item(i);
|
|
|
+ const char* gName = g.getGroupName();
|
|
|
+ if (isEmptyString(gName))
|
|
|
+ throw makeStringException(ECLWATCH_INVALID_INPUT, "Group name not specified.");
|
|
|
+
|
|
|
+ auto match = allGroups.find(gName);
|
|
|
+ if (match == allGroups.end())
|
|
|
+ throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "Group %s not found.", gName);
|
|
|
+
|
|
|
+ StringArray& agentNames = g.getAgentNames();
|
|
|
+ ForEachItemIn(j, agentNames)
|
|
|
+ {
|
|
|
+ const char* agentName = agentNames.item(j);
|
|
|
+ if (isEmptyString(agentName))
|
|
|
+ throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "%s: logging agent name not specified.", gName);
|
|
|
+
|
|
|
+ if (!match->second->getLoggingAgentThread(agentName))
|
|
|
+ throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "%s: logging agent %s not found.", gName, agentName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void WSDecoupledLogAction::doActionInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames)
|
|
|
{
|
|
|
- IArrayOf<IEspLogAgentSetting> agentSettingResp;
|
|
|
if (!agentNames || !agentNames->ordinality())
|
|
|
- getSettingsForAllLoggingAgentsInGroup(group, agentSettingResp);
|
|
|
+ {
|
|
|
+ std::map<std::string, Owned<IUpdateLogThread>>& agentThreadMap = group->getLoggingAgentThreads();
|
|
|
+ for (auto mt : agentThreadMap)
|
|
|
+ {
|
|
|
+ if (!doActionForAgent(mt.first.c_str(), mt.second))
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
else
|
|
|
{
|
|
|
ForEachItemIn(j, *agentNames)
|
|
|
{
|
|
|
const char* agentName = agentNames->item(j);
|
|
|
- if (isEmptyString(agentName))
|
|
|
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: logging agent name not specified.", group->getName());
|
|
|
-
|
|
|
- getLoggingAgentSettings(agentName, group->getLoggingAgentThread(agentName), agentSettingResp);
|
|
|
+ if (!doActionForAgent(agentName, group->getLoggingAgentThread(agentName)))
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- Owned<IEspLogAgentGroupSetting> groupSetting = createLogAgentGroupSetting();
|
|
|
+void WSDecoupledLogGetSettings::doActionInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames)
|
|
|
+{
|
|
|
+ groupSetting.setown(createLogAgentGroupSetting());
|
|
|
groupSetting->setGroupName(group->getName());
|
|
|
- groupSetting->setGroupStatus("Found");
|
|
|
const char* tankFileDir = group->getTankFileDir();
|
|
|
const char* tankFileMask = group->getTankFileMask();
|
|
|
groupSetting->setTankFileDir(tankFileDir);
|
|
|
if (!isEmptyString(tankFileMask))
|
|
|
groupSetting->setTankFileMask(tankFileMask);
|
|
|
- groupSetting->setAgentSettings(agentSettingResp);
|
|
|
- groupSettingResp.append(*groupSetting.getClear());
|
|
|
+
|
|
|
+ WSDecoupledLogAction::doActionInGroup(group, agentNames);
|
|
|
+
|
|
|
+ results.appendGroupSetting(groupSetting.getClear());
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::pauseLoggingAgentsInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames, bool pause,
|
|
|
- IArrayOf<IEspLogAgentGroupStatus>& groupStatusResp)
|
|
|
+bool WSDecoupledLogGetSettings::doActionForAgent(const char* agentName, IUpdateLogThread* agentThread)
|
|
|
{
|
|
|
- IArrayOf<IEspLogAgentStatus> agentStatusResp;
|
|
|
- if (!agentNames || !agentNames->ordinality())
|
|
|
- pauseAllLoggingAgentsInGroup(group, pause, agentStatusResp);
|
|
|
+ Owned<IEspLogAgentSetting> agentSetting = createLogAgentSetting();
|
|
|
+ agentSetting->setAgentName(agentName);
|
|
|
+
|
|
|
+ CLogRequestReaderSettings* settings = agentThread->getLogRequestReader()->getSettings();
|
|
|
+ if (!settings)
|
|
|
+ agentSetting->setAgentStatus("SettingsNotFound");
|
|
|
else
|
|
|
{
|
|
|
- ForEachItemIn(j, *agentNames)
|
|
|
- {
|
|
|
- const char* agentName = agentNames->item(j);
|
|
|
- if (isEmptyString(agentName))
|
|
|
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: logging agent name not specified.", group->getName());
|
|
|
-
|
|
|
- pauseLoggingAgent(agentName, group->getLoggingAgentThread(agentName), pause, agentStatusResp);
|
|
|
- }
|
|
|
+ agentSetting->setAgentStatus("SettingsFound");
|
|
|
+ agentSetting->setAckedFileList(settings->ackedFileList);
|
|
|
+ agentSetting->setAckedLogRequestFile(settings->ackedLogRequestFile);
|
|
|
+ agentSetting->setWaitSeconds(settings->waitSeconds);
|
|
|
+ agentSetting->setPendingLogBufferSize(settings->pendingLogBufferSize);
|
|
|
}
|
|
|
|
|
|
- Owned<IEspLogAgentGroupStatus> groupStatus = createLogAgentGroupStatus();
|
|
|
- groupStatus->setGroupName(group->getName());
|
|
|
- groupStatus->setGroupStatus("Found");
|
|
|
- groupStatus->setAgentStatuses(agentStatusResp);
|
|
|
- groupStatusResp.append(*groupStatus.getClear());
|
|
|
+ IArrayOf<IConstLogAgentSetting>& agentSettings = groupSetting->getAgentSettings();
|
|
|
+ agentSettings.append(*agentSetting.getClear());
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::pauseAllLoggingAgentsInGroup(WSDecoupledLogAgentGroup* group, bool pause, IArrayOf<IEspLogAgentStatus>& agentStatusResp)
|
|
|
+void WSDecoupledLogPause::doActionInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames)
|
|
|
{
|
|
|
- std::map<std::string, Owned<IUpdateLogThread>>& agentThreadMap = group->getLoggingAgentThreads();
|
|
|
- for (auto mt : agentThreadMap)
|
|
|
- pauseLoggingAgent(mt.first.c_str(), mt.second, pause, agentStatusResp);
|
|
|
+ groupStatus.setown(createLogAgentGroupStatus());
|
|
|
+ groupStatus->setGroupName(group->getName());
|
|
|
+
|
|
|
+ WSDecoupledLogAction::doActionInGroup(group, agentNames);
|
|
|
+
|
|
|
+ results.appendGroupStatus(groupStatus.getClear());
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::pauseLoggingAgent(const char* agentName, IUpdateLogThread* agentThread, bool pause, IArrayOf<IEspLogAgentStatus>& agentStatusResp)
|
|
|
+bool WSDecoupledLogPause::doActionForAgent(const char* agentName, IUpdateLogThread* agentThread)
|
|
|
{
|
|
|
- Owned<IEspLogAgentStatus> agentStatus = createLogAgentStatus();
|
|
|
- agentStatus->setAgentName(agentName);
|
|
|
+ agentThread->getLogRequestReader()->setPause((action.type == CLogAgentActions_Pause) ? true : false);
|
|
|
|
|
|
- if (!agentThread)
|
|
|
- agentStatus->setStatus("NotFound");
|
|
|
+ IArrayOf<IConstLogAgentStatus>& agentStatusInGroup = groupStatus->getAgentStatuses();
|
|
|
+ Owned<IEspLogAgentStatus> aStatus = createLogAgentStatus();
|
|
|
+ aStatus->setAgentName(agentName);
|
|
|
+ if (action.type == CLogAgentActions_Pause)
|
|
|
+ aStatus->setStatus("Pausing");
|
|
|
else
|
|
|
- {
|
|
|
- agentThread->getLogRequestReader()->setPause(pause);
|
|
|
- agentStatus->setStatus(pause ? "Pausing" : "Resuming");
|
|
|
- }
|
|
|
- agentStatusResp.append(*agentStatus.getClear());
|
|
|
+ aStatus->setStatus("Resuming");
|
|
|
+ agentStatusInGroup.append(*aStatus.getClear());
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::getSettingsForAllLoggingAgentsInGroup(WSDecoupledLogAgentGroup* group, IArrayOf<IEspLogAgentSetting>& agentSettingResp)
|
|
|
+void WSDecoupledLogGetAckedLogFileNames::doActionInGroup(WSDecoupledLogAgentGroup* group, StringArray* agentNames)
|
|
|
{
|
|
|
- std::map<std::string, Owned<IUpdateLogThread>>& agentThreadMap = group->getLoggingAgentThreads();
|
|
|
- for (auto mt : agentThreadMap)
|
|
|
- getLoggingAgentSettings(mt.first.c_str(), mt.second, agentSettingResp);
|
|
|
+ tankFilesInGroup.setown(createLogAgentGroupTankFiles());
|
|
|
+ tankFilesInGroup->setGroupName(group->getName());
|
|
|
+ tankFilesInGroup->setTankFileDir(group->getTankFileDir());
|
|
|
+
|
|
|
+ WSDecoupledLogAction::doActionInGroup(group, agentNames);
|
|
|
+
|
|
|
+ results.appendGroupTankFiles(tankFilesInGroup.getClear());
|
|
|
}
|
|
|
|
|
|
-void CWSDecoupledLogEx::getLoggingAgentSettings(const char* agentName, IUpdateLogThread* agentThread, IArrayOf<IEspLogAgentSetting>& agentSettingResp)
|
|
|
+bool WSDecoupledLogGetAckedLogFileNames::doActionForAgent(const char* agentName, IUpdateLogThread* agentThread)
|
|
|
{
|
|
|
- Owned<IEspLogAgentSetting> agentSetting = createLogAgentSetting();
|
|
|
- agentSetting->setAgentName(agentName);
|
|
|
-
|
|
|
- if (!agentThread)
|
|
|
- agentSetting->setAgentStatus("NotFound");
|
|
|
+ StringArray& ackedFiles = tankFilesInGroup->getTankFileNames();
|
|
|
+ //The ackedFiles stores the tank files which have been acked for all logging agents
|
|
|
+ //in an agent group. At the beginning, it is empty. The reportAckedLogFiles() will
|
|
|
+ //be called for the 1st logging agent. The ackedFiles is filled with the acked tank
|
|
|
+ //files in the 1st logging agent. If the ackedFiles is still empty, this method
|
|
|
+ //returns false and the outside loop for other logging agents in the group will be
|
|
|
+ //stopped. If the ackedFiles is not empty, the outside loop calls this method for
|
|
|
+ //the rest of logging agents in the group. For those logging agents, the
|
|
|
+ //removeUnknownAckedLogFiles() will be called because ackedFiles.length() != 0.
|
|
|
+ //In the removeUnknownAckedLogFiles(), if any file inside the ackedFiles has not
|
|
|
+ //been acked in that agent, the file should be removed from the ackedFiles. After
|
|
|
+ //the removeUnknownAckedLogFiles() call, if the ackedFiles is empty, the outside
|
|
|
+ //loop for the rest of logging agents in the group will be stopped.
|
|
|
+ if (!ackedFiles.length())
|
|
|
+ agentThread->getLogRequestReader()->reportAckedLogFiles(ackedFiles);
|
|
|
else
|
|
|
- {
|
|
|
- CLogRequestReaderSettings* settings = agentThread->getLogRequestReader()->getSettings();
|
|
|
- if (!settings)
|
|
|
- agentSetting->setAgentStatus("SettingsNotFound");
|
|
|
- else
|
|
|
- {
|
|
|
- agentSetting->setAgentStatus("Found");
|
|
|
- agentSetting->setAckedFileList(settings->ackedFileList);
|
|
|
- agentSetting->setAckedLogRequestFile(settings->ackedLogRequestFile);
|
|
|
- agentSetting->setWaitSeconds(settings->waitSeconds);
|
|
|
- agentSetting->setPendingLogBufferSize(settings->pendingLogBufferSize);
|
|
|
- }
|
|
|
- }
|
|
|
- agentSettingResp.append(*agentSetting.getClear());
|
|
|
+ agentThread->getLogRequestReader()->removeUnknownAckedLogFiles(ackedFiles);
|
|
|
+ return !ackedFiles.empty();
|
|
|
+}
|
|
|
+
|
|
|
+bool WSDecoupledLogCleanAckedLogFiles::doActionForAgent(const char* agentName, IUpdateLogThread* agentThread)
|
|
|
+{
|
|
|
+ agentThread->getLogRequestReader()->cleanAckedLogFiles(*action.fileNames);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
IUpdateLogThread* WSDecoupledLogAgentGroup::getLoggingAgentThread(const char* name)
|