|
@@ -40,6 +40,7 @@
|
|
#define DROPZONE_BY_MACHINE_SUFFIX "-dropzoneByMachine-"
|
|
#define DROPZONE_BY_MACHINE_SUFFIX "-dropzoneByMachine-"
|
|
#define DROPZONE_SUFFIX "dropzone-"
|
|
#define DROPZONE_SUFFIX "dropzone-"
|
|
#define MACHINE_PREFIX "machine-"
|
|
#define MACHINE_PREFIX "machine-"
|
|
|
|
+#define SPARKTHOR_SUFFIX "sparkthor-"
|
|
|
|
|
|
static int environmentTraceLevel = 1;
|
|
static int environmentTraceLevel = 1;
|
|
static Owned <IConstEnvironment> cache;
|
|
static Owned <IConstEnvironment> cache;
|
|
@@ -100,6 +101,44 @@ protected:
|
|
unsigned maxIndex = 0;
|
|
unsigned maxIndex = 0;
|
|
};
|
|
};
|
|
|
|
|
|
|
|
+class CConstSparkThorInfoIterator : public CSimpleInterfaceOf<IConstSparkThorInfoIterator>
|
|
|
|
+{
|
|
|
|
+public:
|
|
|
|
+ CConstSparkThorInfoIterator();
|
|
|
|
+
|
|
|
|
+ virtual bool first() override;
|
|
|
|
+ virtual bool next() override;
|
|
|
|
+ virtual bool isValid() override;
|
|
|
|
+ virtual IConstSparkThorInfo & query() override;
|
|
|
|
+ virtual unsigned count() const override;
|
|
|
|
+
|
|
|
|
+protected:
|
|
|
|
+ Owned<IConstSparkThorInfo> curr;
|
|
|
|
+ Owned<CLocalEnvironment> constEnv;
|
|
|
|
+ unsigned index = 1;
|
|
|
|
+ unsigned maxIndex = 0;
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+class CConstInstanceInfoIterator : public CSimpleInterfaceOf<IConstInstanceInfoIterator>
|
|
|
|
+{
|
|
|
|
+public:
|
|
|
|
+ CConstInstanceInfoIterator(const CLocalEnvironment * env, IPropertyTreeIterator * itr);
|
|
|
|
+
|
|
|
|
+ virtual bool first() override;
|
|
|
|
+ virtual bool next() override;
|
|
|
|
+ virtual bool isValid() override;
|
|
|
|
+ virtual IConstInstanceInfo & query() override;
|
|
|
|
+ virtual unsigned count() const override;
|
|
|
|
+
|
|
|
|
+protected:
|
|
|
|
+ Owned<IPropertyTreeIterator> instanceItr;
|
|
|
|
+ Owned<IConstInstanceInfo> curr;
|
|
|
|
+ const CLocalEnvironment* constEnv;
|
|
|
|
+ unsigned index = 1;
|
|
|
|
+ unsigned maxIndex = 0;
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+
|
|
//==========================================================================================
|
|
//==========================================================================================
|
|
|
|
|
|
class CConstInstanceInfo;
|
|
class CConstInstanceInfo;
|
|
@@ -114,6 +153,7 @@ private:
|
|
mutable Mutex safeCache;
|
|
mutable Mutex safeCache;
|
|
mutable bool dropZoneCacheBuilt;
|
|
mutable bool dropZoneCacheBuilt;
|
|
mutable bool machineCacheBuilt;
|
|
mutable bool machineCacheBuilt;
|
|
|
|
+ mutable bool sparkThorCacheBuilt;
|
|
mutable bool clusterKeyNameCache;
|
|
mutable bool clusterKeyNameCache;
|
|
StringBuffer fileAccessUrl;
|
|
StringBuffer fileAccessUrl;
|
|
|
|
|
|
@@ -126,12 +166,14 @@ private:
|
|
StringBuffer xPath;
|
|
StringBuffer xPath;
|
|
mutable unsigned numOfMachines;
|
|
mutable unsigned numOfMachines;
|
|
mutable unsigned numOfDropZones;
|
|
mutable unsigned numOfDropZones;
|
|
|
|
+ mutable unsigned numOfSparkThors;
|
|
|
|
|
|
|
|
|
|
IConstEnvBase * getCache(const char *path) const;
|
|
IConstEnvBase * getCache(const char *path) const;
|
|
void setCache(const char *path, IConstEnvBase *value) const;
|
|
void setCache(const char *path, IConstEnvBase *value) const;
|
|
void buildMachineCache() const;
|
|
void buildMachineCache() const;
|
|
void buildDropZoneCache() const;
|
|
void buildDropZoneCache() const;
|
|
|
|
+ void buildSparkThorCache() const;
|
|
void init();
|
|
void init();
|
|
mutable bool isDropZoneRestrictionLoaded = false;
|
|
mutable bool isDropZoneRestrictionLoaded = false;
|
|
mutable bool dropZoneRestrictionEnabled = true;
|
|
mutable bool dropZoneRestrictionEnabled = true;
|
|
@@ -298,6 +340,11 @@ public:
|
|
return fileAccessUrl.length() ? fileAccessUrl.str() : nullptr;
|
|
return fileAccessUrl.length() ? fileAccessUrl.str() : nullptr;
|
|
}
|
|
}
|
|
virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override;
|
|
virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override;
|
|
|
|
+
|
|
|
|
+ virtual IConstSparkThorInfo *getSparkThor(const char *name) const;
|
|
|
|
+ virtual IConstSparkThorInfoIterator *getSparkThorIterator() const;
|
|
|
|
+ unsigned getNumberOfSparkThors() const { buildSparkThorCache(); return numOfSparkThors; }
|
|
|
|
+ IConstSparkThorInfo *getSparkThorByIndex(unsigned index) const;
|
|
};
|
|
};
|
|
|
|
|
|
class CLockedEnvironment : implements IEnvironment, public CInterface
|
|
class CLockedEnvironment : implements IEnvironment, public CInterface
|
|
@@ -420,6 +467,10 @@ public:
|
|
{ return c->getFileAccessUrl(); }
|
|
{ return c->getFileAccessUrl(); }
|
|
virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override
|
|
virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override
|
|
{ return c->getDaFileSrvGroupInfo(name); }
|
|
{ return c->getDaFileSrvGroupInfo(name); }
|
|
|
|
+ virtual IConstSparkThorInfo *getSparkThor(const char *name) const
|
|
|
|
+ { return c->getSparkThor(name); }
|
|
|
|
+ virtual IConstSparkThorInfoIterator *getSparkThorIterator() const
|
|
|
|
+ { return c->getSparkThorIterator(); }
|
|
};
|
|
};
|
|
|
|
|
|
void CLockedEnvironment::commit()
|
|
void CLockedEnvironment::commit()
|
|
@@ -969,6 +1020,11 @@ public:
|
|
str.set(ep.str());
|
|
str.set(ep.str());
|
|
return str;
|
|
return str;
|
|
}
|
|
}
|
|
|
|
+ virtual IStringVal & getDirectory(IStringVal & str) const
|
|
|
|
+ {
|
|
|
|
+ str.set(root->queryProp("@directory"));
|
|
|
|
+ return str;
|
|
|
|
+ }
|
|
|
|
|
|
virtual bool doGetRunInfo(IStringVal & progpath, IStringVal & workdir, const char *defprogname, bool useprog) const
|
|
virtual bool doGetRunInfo(IStringVal & progpath, IStringVal & workdir, const char *defprogname, bool useprog) const
|
|
{
|
|
{
|
|
@@ -1103,6 +1159,57 @@ private:
|
|
StringBuffer posixPath;
|
|
StringBuffer posixPath;
|
|
};
|
|
};
|
|
|
|
|
|
|
|
+class CConstSparkThorInfo : public CConstEnvBase, implements IConstSparkThorInfo
|
|
|
|
+{
|
|
|
|
+public:
|
|
|
|
+ IMPLEMENT_IINTERFACE;
|
|
|
|
+ IMPLEMENT_ICONSTENVBASE;
|
|
|
|
+ CConstSparkThorInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root) {}
|
|
|
|
+
|
|
|
|
+ virtual IStringVal &getBuild(IStringVal &str) const
|
|
|
|
+ {
|
|
|
|
+ str.set(root->queryProp("@build"));
|
|
|
|
+ return str;
|
|
|
|
+ }
|
|
|
|
+ virtual IStringVal &getThorClusterName(IStringVal &str) const
|
|
|
|
+ {
|
|
|
|
+ str.set(root->queryProp("@ThorClusterName"));
|
|
|
|
+ return str;
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned getSparkExecutorCores() const
|
|
|
|
+ {
|
|
|
|
+ return root->getPropInt("@SPARK_EXECUTOR_CORES", 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned long getSparkExecutorMemory() const
|
|
|
|
+ {
|
|
|
|
+ return readSizeSetting(root->queryProp("@SPARK_EXECUTOR_MEMORY"), 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned getSparkMasterPort() const
|
|
|
|
+ {
|
|
|
|
+ return root->getPropInt("@SPARK_MASTER_PORT", 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned getSparkMasterWebUIPort() const
|
|
|
|
+ {
|
|
|
|
+ return root->getPropInt("@SPARK_MASTER_WEBUI_PORT", 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned getSparkWorkerCores() const
|
|
|
|
+ {
|
|
|
|
+ return root->getPropInt("@SPARK_WORKER_CORES", 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned long getSparkWorkerMemory() const
|
|
|
|
+ {
|
|
|
|
+ return readSizeSetting(root->queryProp("@SPARK_WORKER_MEMORY"), 0);
|
|
|
|
+ }
|
|
|
|
+ virtual unsigned getSparkWorkerPort() const
|
|
|
|
+ {
|
|
|
|
+ return root->getPropInt("@SPARK_WORKER_PORT", 0);
|
|
|
|
+ }
|
|
|
|
+ virtual IConstInstanceInfoIterator *getInstanceIterator() const
|
|
|
|
+ {
|
|
|
|
+ return new CConstInstanceInfoIterator(env, root->getElements("Instance"));
|
|
|
|
+ }
|
|
|
|
+};
|
|
|
|
+
|
|
#if 0
|
|
#if 0
|
|
//==========================================================================================
|
|
//==========================================================================================
|
|
|
|
|
|
@@ -1210,8 +1317,10 @@ void CLocalEnvironment::init()
|
|
{
|
|
{
|
|
machineCacheBuilt = false;
|
|
machineCacheBuilt = false;
|
|
dropZoneCacheBuilt = false;
|
|
dropZoneCacheBuilt = false;
|
|
|
|
+ sparkThorCacheBuilt = false;
|
|
numOfMachines = 0;
|
|
numOfMachines = 0;
|
|
numOfDropZones = 0;
|
|
numOfDropZones = 0;
|
|
|
|
+ numOfSparkThors = 0;
|
|
isDropZoneRestrictionLoaded = false;
|
|
isDropZoneRestrictionLoaded = false;
|
|
clusterKeyNameCache = false;
|
|
clusterKeyNameCache = false;
|
|
::getFileAccessUrl(fileAccessUrl);
|
|
::getFileAccessUrl(fileAccessUrl);
|
|
@@ -1845,6 +1954,64 @@ IConstDaFileSrvInfo *CLocalEnvironment::getDaFileSrvGroupInfo(const char *name)
|
|
return (IConstDaFileSrvInfo *) cached;
|
|
return (IConstDaFileSrvInfo *) cached;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+IConstSparkThorInfo *CLocalEnvironment::getSparkThor(const char *name) const
|
|
|
|
+{
|
|
|
|
+ if (isEmptyString(name))
|
|
|
|
+ return nullptr;
|
|
|
|
+ buildSparkThorCache();
|
|
|
|
+ VStringBuffer xpath("Software/SparkThor[@name=\"%s\"]", name);
|
|
|
|
+ synchronized procedure(safeCache);
|
|
|
|
+ return (CConstSparkThorInfo *) getCache(xpath);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+IConstSparkThorInfo *CLocalEnvironment::getSparkThorByIndex(unsigned index) const
|
|
|
|
+{
|
|
|
|
+ if (index == 0)
|
|
|
|
+ return nullptr;
|
|
|
|
+
|
|
|
|
+ buildSparkThorCache();
|
|
|
|
+ if (index > numOfSparkThors)
|
|
|
|
+ return nullptr;
|
|
|
|
+
|
|
|
|
+ StringBuffer xpath("Software/SparkThor[@id=\"");
|
|
|
|
+ xpath.append(SPARKTHOR_SUFFIX).append(index).append("\"]");
|
|
|
|
+ synchronized procedure(safeCache);
|
|
|
|
+ return (CConstSparkThorInfo *) getCache(xpath);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+IConstSparkThorInfoIterator *CLocalEnvironment::getSparkThorIterator() const
|
|
|
|
+{
|
|
|
|
+ return new CConstSparkThorInfoIterator();
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void CLocalEnvironment::buildSparkThorCache() const
|
|
|
|
+{
|
|
|
|
+ synchronized procedure(safeCache);
|
|
|
|
+ if (sparkThorCacheBuilt)
|
|
|
|
+ return;
|
|
|
|
+
|
|
|
|
+ Owned<IPropertyTreeIterator> it = p->getElements("Software/SparkThorProcess");
|
|
|
|
+ ForEach(*it)
|
|
|
|
+ {
|
|
|
|
+ const char *name = it->query().queryProp("@name");
|
|
|
|
+ if (!isEmptyString(name))
|
|
|
|
+ {
|
|
|
|
+ StringBuffer x("Software/SparkThor[@name=\"");
|
|
|
|
+ x.append(name).append("\"]");
|
|
|
|
+ Owned<IConstEnvBase> cached = new CConstSparkThorInfo((CLocalEnvironment *) this, &it->query());
|
|
|
|
+ cache.setValue(x, cached);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ numOfSparkThors++;
|
|
|
|
+ StringBuffer x("Software/SparkThor[@id=\"");
|
|
|
|
+ x.append(SPARKTHOR_SUFFIX).append(numOfSparkThors).append("\"]");
|
|
|
|
+ Owned<IConstEnvBase> cached = new CConstSparkThorInfo((CLocalEnvironment *) this, &it->query());
|
|
|
|
+ cache.setValue(x, cached);
|
|
|
|
+ }
|
|
|
|
+ sparkThorCacheBuilt = true;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
//==========================================================================================
|
|
//==========================================================================================
|
|
// Iterators implementation
|
|
// Iterators implementation
|
|
|
|
|
|
@@ -2012,6 +2179,98 @@ unsigned CConstDropZoneInfoIterator::count() const
|
|
return maxIndex;
|
|
return maxIndex;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+//--------------------------------------------------
|
|
|
|
+
|
|
|
|
+CConstSparkThorInfoIterator::CConstSparkThorInfoIterator()
|
|
|
|
+{
|
|
|
|
+ Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
|
|
|
|
+ constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
|
|
|
|
+ maxIndex = constEnv->getNumberOfSparkThors();
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstSparkThorInfoIterator::first()
|
|
|
|
+{
|
|
|
|
+ index = 1;
|
|
|
|
+ curr.setown(constEnv->getSparkThorByIndex(index));
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstSparkThorInfoIterator::next()
|
|
|
|
+{
|
|
|
|
+ if (index < maxIndex)
|
|
|
|
+ {
|
|
|
|
+ index++;
|
|
|
|
+ curr.setown(constEnv->getSparkThorByIndex(index));
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ curr.clear();
|
|
|
|
+
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstSparkThorInfoIterator::isValid()
|
|
|
|
+{
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+IConstSparkThorInfo &CConstSparkThorInfoIterator::query()
|
|
|
|
+{
|
|
|
|
+ return *curr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+unsigned CConstSparkThorInfoIterator::count() const
|
|
|
|
+{
|
|
|
|
+ return maxIndex;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//--------------------------------------------------
|
|
|
|
+
|
|
|
|
+CConstInstanceInfoIterator::CConstInstanceInfoIterator(const CLocalEnvironment *env, IPropertyTreeIterator *itr)
|
|
|
|
+ : constEnv(env)
|
|
|
|
+{
|
|
|
|
+ instanceItr.setown(itr);
|
|
|
|
+ maxIndex = 0;
|
|
|
|
+ ForEach(*instanceItr)
|
|
|
|
+ maxIndex++;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstInstanceInfoIterator::first()
|
|
|
|
+{
|
|
|
|
+ index = 1;
|
|
|
|
+ instanceItr->first();
|
|
|
|
+ curr.setown(new CConstInstanceInfo(constEnv, &instanceItr->query()));
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstInstanceInfoIterator::next()
|
|
|
|
+{
|
|
|
|
+ if (index < maxIndex)
|
|
|
|
+ {
|
|
|
|
+ index++;
|
|
|
|
+ instanceItr->next();
|
|
|
|
+ curr.setown(new CConstInstanceInfo(constEnv, &instanceItr->query()));
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ curr.clear();
|
|
|
|
+
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+bool CConstInstanceInfoIterator::isValid()
|
|
|
|
+{
|
|
|
|
+ return curr != nullptr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+IConstInstanceInfo &CConstInstanceInfoIterator::query()
|
|
|
|
+{
|
|
|
|
+ return *curr;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+unsigned CConstInstanceInfoIterator::count() const
|
|
|
|
+{
|
|
|
|
+ return maxIndex;
|
|
|
|
+}
|
|
|
|
+
|
|
//==========================================================================================
|
|
//==========================================================================================
|
|
|
|
|
|
|
|
|
|
@@ -2056,3 +2315,50 @@ extern ENVIRONMENT_API void closeEnvironment()
|
|
EXCLOG(e);
|
|
EXCLOG(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+extern ENVIRONMENT_API unsigned long readSizeSetting(const char * sizeStr, const unsigned long defaultSize)
|
|
|
|
+{
|
|
|
|
+ StringBuffer buf = sizeStr;
|
|
|
|
+ buf.trim();
|
|
|
|
+
|
|
|
|
+ if (buf.isEmpty())
|
|
|
|
+ return defaultSize;
|
|
|
|
+
|
|
|
|
+ const char* ptrStart = buf;
|
|
|
|
+ const char* ptrAfterDigit = ptrStart;
|
|
|
|
+ while (*ptrAfterDigit && isdigit(*ptrAfterDigit))
|
|
|
|
+ ptrAfterDigit++;
|
|
|
|
+
|
|
|
|
+ if (!*ptrAfterDigit)
|
|
|
|
+ return atol(buf);
|
|
|
|
+
|
|
|
|
+ const char* ptr = ptrAfterDigit;
|
|
|
|
+ while (*ptr && (ptr[0] == ' '))
|
|
|
|
+ ptr++;
|
|
|
|
+
|
|
|
|
+ char c = ptr[0];
|
|
|
|
+ buf.setLength(ptrAfterDigit - ptrStart);
|
|
|
|
+ unsigned long size = atol(buf);
|
|
|
|
+ switch (c)
|
|
|
|
+ {
|
|
|
|
+ case 'k':
|
|
|
|
+ case 'K':
|
|
|
|
+ size *= 1000;
|
|
|
|
+ break;
|
|
|
|
+ case 'm':
|
|
|
|
+ case 'M':
|
|
|
|
+ size *= 1000000;
|
|
|
|
+ break;
|
|
|
|
+ case 'g':
|
|
|
|
+ case 'G':
|
|
|
|
+ size *= 1000000000;
|
|
|
|
+ break;
|
|
|
|
+ case 't':
|
|
|
|
+ case 'T':
|
|
|
|
+ size *= 1000000000000;
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ return size;
|
|
|
|
+}
|