|
@@ -5380,3 +5380,344 @@ bool CWsWorkunitsEx::onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFi
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
+
|
|
|
+const char *CWsWorkunitsEx::gatherQueryFileCopyErrors(IArrayOf<IConstLogicalFileError> &errors, StringBuffer &errorMsg)
|
|
|
+{
|
|
|
+ if (!errors.ordinality())
|
|
|
+ return errorMsg.str();
|
|
|
+
|
|
|
+ errorMsg.append("Query File Copy Error(s):");
|
|
|
+ ForEachItemIn(i, errors)
|
|
|
+ {
|
|
|
+ IConstLogicalFileError &error = errors.item(i);
|
|
|
+ errorMsg.append(" ").append(error.getLogicalName()).append(": ");
|
|
|
+ errorMsg.append(error.getError()).append(";");
|
|
|
+ }
|
|
|
+ return errorMsg.str();
|
|
|
+}
|
|
|
+
|
|
|
+const char *CWsWorkunitsEx::gatherExceptionMessage(const IMultiException &me, StringBuffer &exceptionMsg)
|
|
|
+{
|
|
|
+ exceptionMsg.append("Exception(s):");
|
|
|
+ aindex_t count = me.ordinality();
|
|
|
+ for (aindex_t i=0; i<count; i++)
|
|
|
+ {
|
|
|
+ IException& e = me.item(i);
|
|
|
+ StringBuffer errMsg;
|
|
|
+ exceptionMsg.append(" ").append(e.errorCode()).append(": ");
|
|
|
+ exceptionMsg.append(e.errorMessage(errMsg).str()).append(";");
|
|
|
+ }
|
|
|
+ exceptionMsg.append("\n");
|
|
|
+ return exceptionMsg.str();
|
|
|
+}
|
|
|
+
|
|
|
+const char *CWsWorkunitsEx::gatherWUException(IConstWUExceptionIterator &it, StringBuffer &exceptionMsg)
|
|
|
+{
|
|
|
+ unsigned numErr = 0, numWRN = 0, numInf = 0, numAlert = 0;
|
|
|
+ ForEach(it)
|
|
|
+ {
|
|
|
+ IConstWUException & cur = it.query();
|
|
|
+ SCMStringBuffer src, msg, file;
|
|
|
+ exceptionMsg.append(" Exception: Code: ").append(cur.getExceptionCode());
|
|
|
+ exceptionMsg.append(" Source: ").append(cur.getExceptionSource(src).str());
|
|
|
+ exceptionMsg.append(" Message: ").append(cur.getExceptionMessage(msg).str());
|
|
|
+ exceptionMsg.append(" FileName: ").append(cur.getExceptionFileName(file).str());
|
|
|
+ exceptionMsg.append(" LineNo: ").append(cur.getExceptionLineNo());
|
|
|
+ exceptionMsg.append(" Column: ").append(cur.getExceptionColumn());
|
|
|
+ if (cur.getActivityId())
|
|
|
+ exceptionMsg.append(" ActivityId: ").append(cur.getActivityId());
|
|
|
+ if (cur.getPriority())
|
|
|
+ exceptionMsg.append(" Priority: ").append(cur.getPriority());
|
|
|
+ exceptionMsg.append(" Scope: ").append(cur.queryScope());
|
|
|
+
|
|
|
+ const char * label = "";
|
|
|
+ switch (cur.getSeverity())
|
|
|
+ {
|
|
|
+ default:
|
|
|
+ case SeverityError: label = "Error"; numErr++; break;
|
|
|
+ case SeverityWarning: label = "Warning"; numWRN++; break;
|
|
|
+ case SeverityInformation: label = "Info"; numInf++; break;
|
|
|
+ case SeverityAlert: label = "Alert"; numAlert++; break;
|
|
|
+ }
|
|
|
+ exceptionMsg.append(" Severity: ").append(label);
|
|
|
+ }
|
|
|
+ exceptionMsg.append(" Total error: ").append(numErr);
|
|
|
+ exceptionMsg.append(" warning: ").append(numWRN);
|
|
|
+ exceptionMsg.append(" info: ").append(numInf);
|
|
|
+ exceptionMsg.append(" alert: ").append(numAlert);
|
|
|
+ exceptionMsg.append("\n");
|
|
|
+ return exceptionMsg.str();
|
|
|
+}
|
|
|
+
|
|
|
+const char *CWsWorkunitsEx::gatherECLException(IArrayOf<IConstECLException> &exceptions, StringBuffer &exceptionMsg)
|
|
|
+{
|
|
|
+ unsigned errorCount = 0, warningCount = 0;
|
|
|
+ ForEachItemIn(i, exceptions)
|
|
|
+ {
|
|
|
+ IConstECLException &e = exceptions.item(i);
|
|
|
+ if (strieq(e.getSeverity(), "warning"))
|
|
|
+ {
|
|
|
+ warningCount++;
|
|
|
+ exceptionMsg.append(" Warning: ");
|
|
|
+ }
|
|
|
+ else if (strieq(e.getSeverity(), "error"))
|
|
|
+ {
|
|
|
+ errorCount++;
|
|
|
+ exceptionMsg.append(" Error: ");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (e.getSource())
|
|
|
+ exceptionMsg.append(e.getSource()).append(": ");
|
|
|
+ if (e.getFileName())
|
|
|
+ exceptionMsg.append(e.getFileName());
|
|
|
+ if (!e.getLineNo_isNull() && !e.getColumn_isNull())
|
|
|
+ exceptionMsg.appendf("(%d,%d): ", e.getLineNo(), e.getColumn());
|
|
|
+
|
|
|
+ exceptionMsg.appendf("%s C%d: %s;", e.getSeverity(), e.getCode(), e.getMessage());
|
|
|
+ }
|
|
|
+ exceptionMsg.append(" Total error: ").append(errorCount);
|
|
|
+ exceptionMsg.append(" warning: ").append(warningCount);
|
|
|
+ exceptionMsg.append("\n");
|
|
|
+ return exceptionMsg.str();
|
|
|
+}
|
|
|
+
|
|
|
+bool CWsWorkunitsEx::readDeployWUResponse(CWUDeployWorkunitResponse* deployResponse, StringBuffer &wuid, StringBuffer &result)
|
|
|
+{
|
|
|
+ const IMultiException &me = deployResponse->getExceptions();
|
|
|
+ if (me.ordinality())
|
|
|
+ gatherExceptionMessage(me, result);
|
|
|
+
|
|
|
+ const char *w = deployResponse->getWorkunit().getWuid();
|
|
|
+ if (isEmptyString(w))
|
|
|
+ {
|
|
|
+ result.appendf("Error: no workunit ID!");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ wuid.set(w);
|
|
|
+ const char *state = deployResponse->getWorkunit().getState();
|
|
|
+ bool isCompiled = (strieq(state, "compiled") || strieq(state, "completed"));
|
|
|
+ if (!isCompiled)
|
|
|
+ result.appendf("state: %s;", state);
|
|
|
+
|
|
|
+ gatherECLException(deployResponse->getWorkunit().getExceptions(), result);
|
|
|
+
|
|
|
+ return isCompiled;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsWorkunitsEx::addEclDefinitionActionResult(const char *eclDefinition, const char *result, const char *wuid,
|
|
|
+ const char *queryID, const char* strAction, bool logResult, IArrayOf<IConstWUEclDefinitionActionResult> &results)
|
|
|
+{
|
|
|
+ Owned<IEspWUEclDefinitionActionResult> res = createWUEclDefinitionActionResult();
|
|
|
+ if (!isEmptyString(eclDefinition))
|
|
|
+ res->setEclDefinition(eclDefinition);
|
|
|
+ res->setAction(strAction);
|
|
|
+ res->setResult(result);
|
|
|
+ if (!isEmptyString(wuid))
|
|
|
+ res->setWUID(wuid);
|
|
|
+ if (!isEmptyString(queryID))
|
|
|
+ res->setQueryID(queryID);
|
|
|
+ results.append(*res.getClear());
|
|
|
+ if (logResult)
|
|
|
+ PROGLOG("%s", result);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsWorkunitsEx::checkEclDefinitionSyntax(IEspContext &context, const char *target, const char *eclDefinition,
|
|
|
+ int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results)
|
|
|
+{
|
|
|
+ Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
|
+ NewWsWorkunit wu(factory, context);
|
|
|
+ wu->setAction(WUActionCheck);
|
|
|
+ wu.setQueryMain(eclDefinition);
|
|
|
+
|
|
|
+ StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit.clear()
|
|
|
+ wu->commit();
|
|
|
+ wu.clear();
|
|
|
+
|
|
|
+ WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, nullptr, 0, true, false, false);
|
|
|
+ waitForWorkUnitToComplete(wuid.str(), msToWait);
|
|
|
+
|
|
|
+ Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
|
|
|
+ WUState st = cw->getState();
|
|
|
+ bool wuTimeout = (st != WUStateAborted) && (st != WUStateCompleted) && (st != WUStateFailed);
|
|
|
+ VStringBuffer result(" WUSyntaxCheckECL for %s:", eclDefinition);
|
|
|
+ if (wuTimeout)
|
|
|
+ result.append(" timed out.");
|
|
|
+
|
|
|
+ gatherWUException(cw->getExceptions(), result);
|
|
|
+ addEclDefinitionActionResult(eclDefinition, result.str(), wuid.str(), nullptr, "SyntaxCheck", true, results);
|
|
|
+ cw.clear();
|
|
|
+
|
|
|
+ if (wuTimeout)
|
|
|
+ abortWorkUnit(wuid.str(), context.querySecManager(), context.queryUser());
|
|
|
+ if (!factory->deleteWorkUnit(wuid.str()))
|
|
|
+ {
|
|
|
+ result.setf(" Workunit %s cannot be deleted now. You may delete it when its status changes.", wuid.str());
|
|
|
+ addEclDefinitionActionResult(eclDefinition, result.str(), wuid.str(), nullptr, "SyntaxCheck", true, results);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+bool CWsWorkunitsEx::deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
|
|
|
+ int msToWait, StringBuffer &wuid, StringBuffer &result)
|
|
|
+{
|
|
|
+ Owned<CWUDeployWorkunitRequest> deployReq = new CWUDeployWorkunitRequest("WsWorkunits");
|
|
|
+ deployReq->setName(eclDefinition);
|
|
|
+ deployReq->setQueryMainDefinition(eclDefinition);
|
|
|
+ deployReq->setObjType("compressed_ecl_text");
|
|
|
+ deployReq->setCluster(target);
|
|
|
+ deployReq->setWait(msToWait);
|
|
|
+ deployReq->setFileName("");
|
|
|
+
|
|
|
+ Owned<CWUDeployWorkunitResponse> deployResponse = new CWUDeployWorkunitResponse("WsWorkunits");
|
|
|
+ onWUDeployWorkunit(context, *deployReq, *deployResponse);
|
|
|
+ return readDeployWUResponse(deployResponse, wuid, result);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsWorkunitsEx::deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
|
|
|
+ int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results)
|
|
|
+{
|
|
|
+ StringBuffer wuid, finalResult;
|
|
|
+ deployEclDefinition(context, target, eclDefinition, msToWait, wuid, finalResult);
|
|
|
+ addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), nullptr, "Deploy", true, results);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsWorkunitsEx::publishEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
|
|
|
+ int msToWait, IEspWUEclDefinitionActionRequest &req, IArrayOf<IConstWUEclDefinitionActionResult> &results)
|
|
|
+{
|
|
|
+ StringBuffer priorityReq = req.getPriority();
|
|
|
+ if (priorityReq.trim().length() && !isValidPriorityValue(priorityReq.str()))
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Invalid Priority: %s", priorityReq.str());
|
|
|
+ addEclDefinitionActionResult(eclDefinition, msg.str(), nullptr, nullptr, "Publish", true, results);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ StringBuffer memoryLimitReq = req.getMemoryLimit();
|
|
|
+ if (memoryLimitReq.trim().length() && !isValidMemoryValue(memoryLimitReq.str()))
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Invalid MemoryLimit: %s", memoryLimitReq.str());
|
|
|
+ addEclDefinitionActionResult(eclDefinition, msg.str(), nullptr, nullptr, "Publish", true, results);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ time_t timenow;
|
|
|
+ int startTime = time(&timenow);
|
|
|
+
|
|
|
+ //Do deploy first
|
|
|
+ StringBuffer wuid, finalResult;
|
|
|
+ if (!deployEclDefinition(context, target, eclDefinition, msToWait, wuid, finalResult))
|
|
|
+ {
|
|
|
+ addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), nullptr, "Publish", true, results);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int timeLeft = msToWait - (time(&timenow) - startTime);
|
|
|
+ if (timeLeft <= 0)
|
|
|
+ {
|
|
|
+ addEclDefinitionActionResult(eclDefinition, "Timed out after deployment", wuid.str(), nullptr, "Publish", true, results);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ //Do publish now
|
|
|
+ StringBuffer comment = req.getComment();
|
|
|
+ StringBuffer remoteDali = req.getRemoteDali();
|
|
|
+ StringBuffer sourceProcess = req.getSourceProcess();
|
|
|
+ int timeLimit = req.getTimeLimit();
|
|
|
+ int warnTimeLimit = req.getWarnTimeLimit();
|
|
|
+
|
|
|
+ Owned<CWUPublishWorkunitRequest> publishReq = new CWUPublishWorkunitRequest("WsWorkunits");
|
|
|
+ publishReq->setWuid(wuid.str());
|
|
|
+ publishReq->setCluster(target);
|
|
|
+ publishReq->setJobName(eclDefinition);
|
|
|
+
|
|
|
+ if (!remoteDali.trim().isEmpty())
|
|
|
+ publishReq->setRemoteDali(remoteDali.str());
|
|
|
+ if (!sourceProcess.trim().isEmpty())
|
|
|
+ publishReq->setSourceProcess(sourceProcess.str());
|
|
|
+ if (!priorityReq.isEmpty())
|
|
|
+ publishReq->setPriority(priorityReq.str());
|
|
|
+ if (comment.str()) //allow empty
|
|
|
+ publishReq->setComment(comment.str());
|
|
|
+
|
|
|
+ if (req.getDeletePrevious())
|
|
|
+ publishReq->setActivate(CWUQueryActivationMode_ActivateDeletePrevious);
|
|
|
+ else if (req.getSuspendPrevious())
|
|
|
+ publishReq->setActivate(CWUQueryActivationMode_ActivateSuspendPrevious);
|
|
|
+ else
|
|
|
+ publishReq->setActivate(req.getNoActivate() ? CWUQueryActivationMode_NoActivate : CWUQueryActivationMode_Activate);
|
|
|
+
|
|
|
+ publishReq->setWait(timeLeft);
|
|
|
+ publishReq->setNoReload(req.getNoReload());
|
|
|
+ publishReq->setDontCopyFiles(req.getDontCopyFiles());
|
|
|
+ publishReq->setAllowForeignFiles(req.getAllowForeign());
|
|
|
+ publishReq->setUpdateDfs(req.getUpdateDfs());
|
|
|
+ publishReq->setUpdateSuperFiles(req.getUpdateSuperfiles());
|
|
|
+ publishReq->setUpdateCloneFrom(req.getUpdateCloneFrom());
|
|
|
+ publishReq->setAppendCluster(!req.getDontAppendCluster());
|
|
|
+ publishReq->setIncludeFileErrors(true);
|
|
|
+
|
|
|
+ if (timeLimit != -1)
|
|
|
+ publishReq->setTimeLimit(timeLimit);
|
|
|
+ if (warnTimeLimit != (unsigned) -1)
|
|
|
+ publishReq->setWarnTimeLimit(warnTimeLimit);
|
|
|
+ if (!memoryLimitReq.isEmpty())
|
|
|
+ publishReq->setMemoryLimit(memoryLimitReq.str());
|
|
|
+
|
|
|
+ Owned<CWUPublishWorkunitResponse> publishResponse = new CWUPublishWorkunitResponse("WsWorkunits");
|
|
|
+ onWUPublishWorkunit(context, *publishReq, *publishResponse);
|
|
|
+
|
|
|
+ const char *id = publishResponse->getQueryId();
|
|
|
+ if (!isEmptyString(id))
|
|
|
+ {
|
|
|
+ const char *qs = publishResponse->getQuerySet();
|
|
|
+ finalResult.append(" ").append(qs ? qs : "").append('/').append(id).append(" published. ");
|
|
|
+ }
|
|
|
+ if (publishResponse->getReloadFailed())
|
|
|
+ finalResult.append(" Added to target, but request to reload queries on cluster failed.");
|
|
|
+
|
|
|
+ const IMultiException &me = publishResponse->getExceptions();
|
|
|
+ if (me.ordinality())
|
|
|
+ gatherExceptionMessage(me, finalResult);
|
|
|
+
|
|
|
+ gatherQueryFileCopyErrors(publishResponse->getFileErrors(), finalResult);
|
|
|
+ addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), id, "Publish", true, results);
|
|
|
+}
|
|
|
+
|
|
|
+bool CWsWorkunitsEx::onWUEclDefinitionAction(IEspContext &context, IEspWUEclDefinitionActionRequest &req, IEspWUEclDefinitionActionResponse &resp)
|
|
|
+{
|
|
|
+ try
|
|
|
+ {
|
|
|
+ CEclDefinitionActions action = req.getActionType();
|
|
|
+ if (action == EclDefinitionActions_Undefined)
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_INPUT,"Action not defined in onWUEclDefinitionAction.");
|
|
|
+
|
|
|
+ if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
|
|
|
+ throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to do WUEclDefinitionAction %s. Permission denied.", req.getActionTypeAsString());
|
|
|
+
|
|
|
+ StringBuffer target = req.getTarget();
|
|
|
+ if (target.trim().isEmpty())
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_INPUT,"Target not defined in onWUEclDefinitionAction.");
|
|
|
+
|
|
|
+ IArrayOf<IConstWUEclDefinitionActionResult> results;
|
|
|
+ StringArray &eclDefinitions = req.getEclDefinitions();
|
|
|
+ int msToWait = req.getMsToWait();
|
|
|
+ for (aindex_t i = 0; i < eclDefinitions.length(); i++)
|
|
|
+ {
|
|
|
+ StringBuffer eclDefinitionName = eclDefinitions.item(i);
|
|
|
+ if (eclDefinitionName.trim().isEmpty())
|
|
|
+ WARNLOG("Empty ECL Definition name in WUEclDefinitionAction request");
|
|
|
+ else if (action == CEclDefinitionActions_SyntaxCheck)
|
|
|
+ checkEclDefinitionSyntax(context, target.str(), eclDefinitionName.str(), msToWait, results);
|
|
|
+ else if (action == CEclDefinitionActions_Deploy)
|
|
|
+ deployEclDefinition(context, target.str(), eclDefinitionName.str(), msToWait, results);
|
|
|
+ else
|
|
|
+ publishEclDefinition(context, target.str(), eclDefinitionName.str(), msToWait, req, results);
|
|
|
+ }
|
|
|
+
|
|
|
+ resp.setActionResults(results);
|
|
|
+ }
|
|
|
+ catch(IException* e)
|
|
|
+ {
|
|
|
+ FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+}
|