|
@@ -70,8 +70,6 @@ using roxiemem::OwnedRoxieString;
|
|
|
|
|
|
#define MONITOR_ECLAGENT_STATUS
|
|
|
|
|
|
-static const char XMLHEADER[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
|
|
|
-
|
|
|
//#define ROOT_DRIVE "c:"
|
|
|
|
|
|
//#define DEFAULT_REALTHOR_HOST "localhost"
|
|
@@ -90,6 +88,7 @@ constexpr LogMsgCategory MCresolve = MCprogress(100); // Category used to
|
|
|
constexpr LogMsgCategory MCrunlock = MCprogress(100); // Category used to inform about run lock progress
|
|
|
|
|
|
Owned<IPropertyTree> agentTopology;
|
|
|
+Owned<IProperties> cmdLineArgs;
|
|
|
|
|
|
MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
|
{
|
|
@@ -98,6 +97,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
|
MODULE_EXIT()
|
|
|
{
|
|
|
agentTopology.clear();
|
|
|
+ cmdLineArgs.clear();
|
|
|
}
|
|
|
|
|
|
//-----------------------------------------------------------------------------------------------------
|
|
@@ -156,6 +156,7 @@ public:
|
|
|
CHThorDebugSocketListener(CHThorDebugContext * _debugContext)
|
|
|
: Thread("CHThorDebugSocketListener"), debugContext(_debugContext)
|
|
|
{
|
|
|
+ port = 0;
|
|
|
running = false;
|
|
|
suspended = false;
|
|
|
unsigned poolSize = 10;//MORE : What is a good threadcoutn?
|
|
@@ -509,8 +510,8 @@ public:
|
|
|
|
|
|
//=======================================================================================
|
|
|
|
|
|
-EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, IProperties *_globals, IPropertyTree *_config, ILogMsgHandler * _logMsgHandler)
|
|
|
- : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), globals(_globals), config(_config), logMsgHandler(_logMsgHandler)
|
|
|
+EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler)
|
|
|
+ : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler)
|
|
|
{
|
|
|
isAborting = false;
|
|
|
isStandAloneExe = false;
|
|
@@ -532,7 +533,7 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
|
|
|
StringAttrAdaptor adaptor(clusterType);
|
|
|
wuRead->getDebugValue("targetClusterType", adaptor);
|
|
|
pluginMap = NULL;
|
|
|
- stopAfter = globals->getPropInt("-limit",-1);
|
|
|
+ stopAfter = agentTopology->getPropInt("@limit",-1);
|
|
|
|
|
|
RemoteFilename logfile;
|
|
|
logfile.setLocalPath(_logname);
|
|
@@ -700,7 +701,7 @@ RecordTranslationMode EclAgent::getLayoutTranslationMode() const
|
|
|
if(wu->hasDebugValue("layoutTranslation"))
|
|
|
wu->getDebugValue("layoutTranslation", val);
|
|
|
else
|
|
|
- config->getProp("@fieldTranslationEnabled", val.s);
|
|
|
+ agentTopology->getProp("@fieldTranslationEnabled", val.s);
|
|
|
return getTranslationMode(val.str());
|
|
|
}
|
|
|
|
|
@@ -1431,26 +1432,6 @@ ILocalOrDistributedFile *EclAgent::resolveLFN(const char *fname, const char *err
|
|
|
return ldFile.getClear();
|
|
|
}
|
|
|
|
|
|
-static void getFileSize(IFile * file, unsigned __int64 & gotSize)
|
|
|
-{
|
|
|
- gotSize = file->size();
|
|
|
-}
|
|
|
-
|
|
|
-static unsigned __int64 getFileSize(IFile * file)
|
|
|
-{
|
|
|
- unsigned __int64 size;
|
|
|
- try
|
|
|
- {
|
|
|
- getFileSize(file, size);
|
|
|
- }
|
|
|
- catch (IException * e)
|
|
|
- {
|
|
|
- e->Release();
|
|
|
- size = (unsigned __int64)-1;
|
|
|
- }
|
|
|
- return size;
|
|
|
-}
|
|
|
-
|
|
|
bool EclAgent::fileExists(const char *name)
|
|
|
{
|
|
|
unsigned __int64 size = 0;
|
|
@@ -1573,7 +1554,7 @@ char *EclAgent::getPlatform()
|
|
|
|
|
|
char *EclAgent::getEnv(const char *name, const char *defaultValue) const
|
|
|
{
|
|
|
- const char *val = globals->queryProp(name);
|
|
|
+ const char *val = cmdLineArgs->queryProp(name);
|
|
|
if (!val)
|
|
|
val = getenv(name);
|
|
|
if (val)
|
|
@@ -2020,12 +2001,6 @@ void EclAgent::doProcess()
|
|
|
UWARNLOG(errCode, "%s", rmMsg.str());
|
|
|
}
|
|
|
|
|
|
- if (globals->getPropBool("DUMPFINALWU", false))
|
|
|
- {
|
|
|
- StringBuffer xml;
|
|
|
- exportWorkUnitToXML(wuRead, xml, true, false, true);
|
|
|
- fprintf(stdout, "%s", xml.str());
|
|
|
- }
|
|
|
wuRead.clear(); // have a write lock still, but don't want to leave dangling unlocked wuRead after releasing write lock
|
|
|
// or else something can delete whilst still referenced (e.g. on complete signal)
|
|
|
w.clear();
|
|
@@ -2078,20 +2053,16 @@ void EclAgent::runProcess(IEclProcess *process)
|
|
|
|
|
|
//Get memory limit. Workunit specified value takes precedence over config file
|
|
|
int memLimitMB = agentTopology->getPropInt("@defaultMemoryLimitMB", DEFAULT_MEM_LIMIT);
|
|
|
- memLimitMB = globals->getPropInt("defaultMemoryLimitMB", memLimitMB);
|
|
|
memLimitMB = queryWorkUnit()->getDebugValueInt("hthorMemoryLimit", memLimitMB);
|
|
|
|
|
|
bool allowHugePages = agentTopology->getPropBool("@heapUseHugePages", false);
|
|
|
- allowHugePages = globals->getPropBool("heapUseHugePages", allowHugePages);
|
|
|
|
|
|
bool allowTransparentHugePages = agentTopology->getPropBool("@heapUseTransparentHugePages", true);
|
|
|
- allowTransparentHugePages = globals->getPropBool("heapUseTransparentHugePages", allowTransparentHugePages);
|
|
|
|
|
|
bool retainMemory = agentTopology->getPropBool("@heapRetainMemory", false);
|
|
|
- retainMemory = globals->getPropBool("heapRetainMemory", retainMemory);
|
|
|
|
|
|
- if (globals->hasProp("@httpGlobalIdHeader"))
|
|
|
- updateDummyContextLogger().setHttpIdHeaders(globals->queryProp("@httpGlobalIdHeader"), globals->queryProp("@httpCallerIdHeader"));
|
|
|
+ if (agentTopology->hasProp("@httpGlobalIdHeader"))
|
|
|
+ updateDummyContextLogger().setHttpIdHeaders(agentTopology->queryProp("@httpGlobalIdHeader"), agentTopology->queryProp("@httpCallerIdHeader"));
|
|
|
|
|
|
if (queryWorkUnit()->hasDebugValue("GlobalId"))
|
|
|
{
|
|
@@ -2733,7 +2704,7 @@ bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntime
|
|
|
{
|
|
|
//Loop trying to get a write lock - if it fails, then release the read lock, otherwise
|
|
|
//you can get a deadlock with several things waiting to read, and none being able to write.
|
|
|
- bool rebuildAllPersists = globals->getPropBool("REBUILDPERSISTS", false); // Useful for debugging purposes
|
|
|
+ bool rebuildAllPersists = agentTopology->getPropBool("@rebuildPersists", false); // Useful for debugging purposes
|
|
|
for (;;)
|
|
|
{
|
|
|
StringBuffer dummy;
|
|
@@ -3307,6 +3278,17 @@ int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const uns
|
|
|
}
|
|
|
#endif
|
|
|
//--------------------------------------------------------------
|
|
|
+
|
|
|
+void usage()
|
|
|
+{
|
|
|
+ printf("USAGE: eclagent --wuid=wuid options\n"
|
|
|
+ "options include:\n"
|
|
|
+ " --daliServers=daliEp\n"
|
|
|
+ " --traceLevel=n\n"
|
|
|
+ " --resetWorkflow (performs workflow reset on starting)\n"
|
|
|
+ " --noRetry (immediately fails if workunit is in failed state)\n");
|
|
|
+}
|
|
|
+
|
|
|
extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer * wuXML, bool standAloneExe)
|
|
|
{
|
|
|
#ifdef _DEBUG
|
|
@@ -3316,9 +3298,18 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
#endif
|
|
|
int retcode = 0;
|
|
|
addAbortHandler(ControlHandler);
|
|
|
- Owned<IProperties> globals = createProperties(true); // cmdline props only
|
|
|
- for (int i = 1; i < argc; i++)
|
|
|
- globals->loadProp(argv[i], true);
|
|
|
+ cmdLineArgs.setown(createProperties(true));
|
|
|
+ if (argc==1)
|
|
|
+ {
|
|
|
+ usage();
|
|
|
+ return 2;
|
|
|
+ }
|
|
|
+ for (int i = 1; i < argc; i++)
|
|
|
+ {
|
|
|
+ const char *arg = argv[i];
|
|
|
+ if (arg && arg[0] != '-')
|
|
|
+ cmdLineArgs->loadProp(arg, true);
|
|
|
+ }
|
|
|
|
|
|
//get logfile location from agentexec.xml config file
|
|
|
if (!standAloneExe)
|
|
@@ -3334,7 +3325,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
- agentTopology.setown(createPTree("AGENTEXEC"));
|
|
|
+ agentTopology.setown(createPTree("AGENTEXEC")); // MORE - this needs thought!
|
|
|
|
|
|
//Build log file specification
|
|
|
StringBuffer logfilespec;
|
|
@@ -3357,50 +3348,12 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
PROGLOG("Logging to %s", logfilespec.str());
|
|
|
}
|
|
|
|
|
|
- if (wuXML && wuXML->length())
|
|
|
- {
|
|
|
- if (const char * fn = globals->queryProp("-wu"))
|
|
|
- {
|
|
|
- //Write workunit to file and exit
|
|
|
- if (0==strcmp(fn,"1"))
|
|
|
- fn = "stdout:";
|
|
|
- Owned<IFile> file = createIFile(fn);
|
|
|
- OwnedIFileIO io;
|
|
|
- try
|
|
|
- {
|
|
|
- io.setown(file->open(IFOcreate));
|
|
|
- }
|
|
|
- catch(IException * e)
|
|
|
- {
|
|
|
- StringBuffer sb;
|
|
|
- e->errorMessage(sb);
|
|
|
- throw MakeStringException(errno, "Failed to create WU XML file %s : %s", fn, sb.str());
|
|
|
- }
|
|
|
-
|
|
|
- Owned<IFileIOStream> out = createIOStream(io);
|
|
|
-
|
|
|
- StringBuffer str;
|
|
|
- str.appendf("%s\n",XMLHEADER);// "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
|
|
|
- out->write(str.length(), str.str());
|
|
|
-
|
|
|
- out->write(wuXML->length(), wuXML->str());
|
|
|
- return 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
#ifdef _DEBUG
|
|
|
traceLevel = 10;
|
|
|
-#ifdef _WIN32
|
|
|
- if (globals->getPropInt("ALLOCBREAK"))
|
|
|
- _CrtSetBreakAlloc(globals->getPropInt("ALLOCBREAK"));
|
|
|
- if (globals->getPropInt("BREAKATSTART"))
|
|
|
- DebugBreak();
|
|
|
-#endif
|
|
|
#else
|
|
|
traceLevel = 0;
|
|
|
#endif
|
|
|
traceLevel = agentTopology->getPropInt("@traceLevel", traceLevel);
|
|
|
- traceLevel = globals->getPropInt("TRACELEVEL", traceLevel);
|
|
|
|
|
|
if (traceLevel)
|
|
|
{
|
|
@@ -3412,7 +3365,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
Owned<IPropertyTree> query;
|
|
|
try
|
|
|
{
|
|
|
- const char *queryXML = globals->queryProp("query");
|
|
|
+ const char *queryXML = agentTopology->queryProp("@query");
|
|
|
if (queryXML)
|
|
|
{
|
|
|
if (queryXML[0]=='@')
|
|
@@ -3420,7 +3373,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
else
|
|
|
query.setown(createPTreeFromXMLString(queryXML));
|
|
|
}
|
|
|
- Owned<IPropertyIterator> it = globals->getIterator();
|
|
|
+ Owned<IPropertyIterator> it = cmdLineArgs->getIterator();
|
|
|
ForEach(*it)
|
|
|
{
|
|
|
const char * key = it->getPropKey();
|
|
@@ -3428,7 +3381,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
{
|
|
|
if (!query)
|
|
|
query.setown(createPTree("Query"));
|
|
|
- const char *val = globals->queryProp(key);
|
|
|
+ const char *val = cmdLineArgs->queryProp(key);
|
|
|
if (val[0]=='<')
|
|
|
{
|
|
|
Owned<IPropertyTree> valtree = createPTreeFromXMLString(val);
|
|
@@ -3448,15 +3401,13 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
}
|
|
|
|
|
|
StringBuffer wuid;
|
|
|
- StringBuffer daliServers;
|
|
|
- if (!globals->getProp("DALISERVERS", daliServers) && !globals->getProp("-DALISERVERS", daliServers))
|
|
|
- daliServers.append(agentTopology->queryProp("@daliServers"));
|
|
|
+ const char *daliServers = agentTopology->queryProp("@daliServers");
|
|
|
|
|
|
#ifdef LEAK_FILE
|
|
|
enableMemLeakChecking(true);
|
|
|
logLeaks(LEAK_FILE);
|
|
|
#endif
|
|
|
- if (globals->getPropInt("DAFILESRVCACHE", 1))
|
|
|
+ if (agentTopology->getPropBool("@dafilesrvCache", true))
|
|
|
setDaliServixSocketCaching(true);
|
|
|
|
|
|
enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
|
|
@@ -3476,7 +3427,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
}
|
|
|
|
|
|
Owned<IUserDescriptor> standAloneUDesc;
|
|
|
- if (daliServers.length())
|
|
|
+ if (daliServers)
|
|
|
{
|
|
|
SocketEndpoint daliEp(daliServers, DALI_SERVER_PORT);
|
|
|
{
|
|
@@ -3546,10 +3497,10 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
IExtendedWUInterface * extendedWu = queryExtendedWU(daliWu);
|
|
|
extendedWu->copyWorkUnit(standAloneWorkUnit, true, true);
|
|
|
wuid.set(daliWu->queryWuid());
|
|
|
- globals->setProp("WUID", wuid.str());
|
|
|
+ cmdLineArgs->setProp("WUID", wuid.str()); // So it can be retrieved by getenv
|
|
|
|
|
|
standAloneUDesc.setown(createUserDescriptor());
|
|
|
- if (const char * userpwd = globals->queryProp("-USER"))
|
|
|
+ if (const char * userpwd = agentTopology->queryProp("@user"))
|
|
|
{
|
|
|
StringBuffer usr(userpwd);
|
|
|
usr.replace(':',(char)NULL);
|
|
@@ -3574,10 +3525,10 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
}
|
|
|
|
|
|
StringBuffer sb;
|
|
|
- sb.append("//").append(daliServers.str()).append(':').append(appName);
|
|
|
+ sb.append("//").append(daliServers).append(':').append(appName);
|
|
|
daliWu->setJobName(sb.str());
|
|
|
|
|
|
- sb.clear().append("//").append(daliServers.str()).append(":StandAloneHThor");
|
|
|
+ sb.clear().append("//").append(daliServers).append(":StandAloneHThor");
|
|
|
daliWu->setClusterName(sb.str());
|
|
|
|
|
|
standAloneWorkUnit.clear();
|
|
@@ -3586,9 +3537,9 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
|
|
|
if (!standAloneWorkUnit)
|
|
|
{
|
|
|
- if (!globals->hasProp("WUID"))
|
|
|
- throw MakeStringException(0, "WUID not specified");
|
|
|
- wuid.set(globals->queryProp("WUID"));
|
|
|
+ agentTopology->getProp("@wuid", wuid);
|
|
|
+ if (!wuid.length())
|
|
|
+ throw MakeStringException(0, "wuid not specified");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -3623,19 +3574,21 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
|
|
|
if (w)
|
|
|
{
|
|
|
- EclAgent agent(w, wuid.str(), globals->getPropInt("IGNOREVERSION", 0)==0, globals->getPropBool("WFRESET", false), globals->getPropBool("NORETRY", false), logfilespec.str(), globals->queryProp("allowedPipePrograms"), query.getClear(), globals, agentTopology, logMsgHandler);
|
|
|
- const bool isRemoteWorkunit = (daliServers.length() != 0);
|
|
|
- const bool resolveFilesLocally = standAloneExe && (!isRemoteWorkunit || globals->getPropBool("USELOCALFILES", false));
|
|
|
- const bool writeResultsToStdout = standAloneExe && (!isRemoteWorkunit || globals->getPropBool("RESULTSTOSTDOUT", true));
|
|
|
+ EclAgent agent(w, wuid.str(), agentTopology->getPropBool("@ignoreVersion", false), agentTopology->getPropBool("@resetWorkflow", false), agentTopology->getPropBool("@noRetry", false), logfilespec.str(),
|
|
|
+ agentTopology->queryProp("@allowedPipePrograms"), query.getClear(), logMsgHandler);
|
|
|
+ const bool isRemoteWorkunit = !isEmptyString(daliServers);
|
|
|
+ const bool resolveFilesLocally = standAloneExe && (!isRemoteWorkunit || agentTopology->getPropBool("@useLocalFiles", false));
|
|
|
+ const bool writeResultsToStdout = standAloneExe && (!isRemoteWorkunit || agentTopology->getPropBool("@resultsToStdout", true));
|
|
|
|
|
|
outputFmts outputFmt = ofSTD;
|
|
|
if (writeResultsToStdout)
|
|
|
{
|
|
|
- if (globals->getPropBool("-xml", false))
|
|
|
+ const char *format = agentTopology->queryProp("@format");
|
|
|
+ if (strsame(format, "xml") || agentTopology->getPropBool("@xml", false))
|
|
|
outputFmt = ofXML;
|
|
|
- else if (globals->getPropBool("-raw", false))
|
|
|
+ else if (strsame(format, "raw") || agentTopology->getPropBool("@raw", false))
|
|
|
outputFmt = ofRAW;
|
|
|
- else if (globals->getPropBool("-csv", false))
|
|
|
+ else if (strsame(format, "csv") || agentTopology->getPropBool("@csv", false))
|
|
|
{
|
|
|
fprintf(stdout,"\nCSV output format not supported\n");
|
|
|
return false;
|
|
@@ -3698,21 +3651,20 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
|
|
|
}
|
|
|
|
|
|
//=======================================================================================
|
|
|
-void usage(const char * exeName)
|
|
|
-{
|
|
|
- fprintf(stdout,"\nUsage:\n"
|
|
|
- " %s <options>\n"
|
|
|
- "\nGeneral options:\n"
|
|
|
- " -wu=<file> Write XML formatted workunit to given filespec and exit\n"
|
|
|
- " -xml Display output as XML\n"
|
|
|
- " -raw Display output as binary\n"
|
|
|
- " -limit=x Limit number of output rows\n"
|
|
|
- " -DALISERVERS=daliEp Connect to the specified Dali(s)\n"
|
|
|
- " -USER=user:password Dali credentials\n"
|
|
|
- " --help Display this message\n",
|
|
|
- exeName
|
|
|
+
|
|
|
+void standalone_usage(const char * exeName)
|
|
|
+{
|
|
|
+ printf("Usage:\n"
|
|
|
+ " %s <options>\n"
|
|
|
+ "\nGeneral options:\n"
|
|
|
+ " --xml Display output as XML\n"
|
|
|
+ " --raw Display output as binary\n"
|
|
|
+ " --limit=x Limit number of output rows\n"
|
|
|
+ " --daliServers=ep Connect to the specified Dali(s)\n",
|
|
|
+ exeName
|
|
|
);
|
|
|
}
|
|
|
+
|
|
|
//=======================================================================================
|
|
|
|
|
|
|
|
@@ -3723,10 +3675,10 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
|
|
|
for (int idx = 1; idx < argc; idx++)
|
|
|
{
|
|
|
- if (strstr(argv[idx], "-help" ))
|
|
|
+ if (strstr(argv[idx], "--help" ))
|
|
|
{
|
|
|
const char * p = strrchr(argv[0],PATHSEPCHAR);
|
|
|
- usage(p ? ++p : argv[0]);
|
|
|
+ standalone_usage(p ? ++p : argv[0]);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -3800,6 +3752,7 @@ public:
|
|
|
everStarted = false;
|
|
|
rowCount = 0;
|
|
|
maxRowSize = 0;
|
|
|
+ totalTime = 0;
|
|
|
inMeta = NULL;
|
|
|
}
|
|
|
|
|
@@ -4318,8 +4271,6 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
-IDebugGraphManager *createProxyDebugGraphManager(unsigned graphId, unsigned channel, memsize_t remoteGraphId);
|
|
|
-
|
|
|
//=======================================================================================
|
|
|
|
|
|
class CHThorDebugGraphManager : extends CBaseDebugGraphManager
|