|
@@ -33,7 +33,9 @@
|
|
|
#include "jptree.hpp"
|
|
|
#include "jregexp.hpp"
|
|
|
#include "dadfs.hpp"
|
|
|
+#include "dasds.hpp"
|
|
|
|
|
|
+#include "wuerror.hpp"
|
|
|
#include "workunit.hpp"
|
|
|
#include "workunit.ipp"
|
|
|
|
|
@@ -52,6 +54,7 @@ namespace cassandraembed {
|
|
|
#define CASS_SEARCH_PREFIX_SIZE 2
|
|
|
#define NUM_PARTITIONS 2
|
|
|
|
|
|
+
|
|
|
static const CassValue *getSingleResult(const CassResult *result)
|
|
|
{
|
|
|
const CassRow *row = cass_result_first_row(result);
|
|
@@ -908,8 +911,8 @@ static const CassandraXmlMapping workunitsMappings [] =
|
|
|
{"action", "text", "Action", stringColumnMapper},
|
|
|
{"protected", "boolean", "@protected", boolColumnMapper},
|
|
|
{"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
|
|
|
- {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
|
|
|
- {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
|
|
|
+ {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string (with leading spaces to force to one partition)
|
|
|
+ {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
|
|
|
|
|
|
{"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
|
|
|
{"attributes", "map<text, text>", "@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
|
|
@@ -1134,7 +1137,7 @@ static const CassandraXmlMapping wuFilesReadMappings [] =
|
|
|
{"wuid", "text", NULL, rootNameColumnMapper},
|
|
|
{"name", "text", "@name", stringColumnMapper},
|
|
|
{"cluster", "text", "@cluster", stringColumnMapper},
|
|
|
- {"useCount", "int", "@useCount", intColumnMapper}, // MORE - could think about using a counter column, but would mess up the commit paradigm
|
|
|
+ {"useCount", "int", "@useCount", intColumnMapper}, // NOTE - could think about using a counter column, but would mess up the commit paradigm
|
|
|
{"subfiles", "list<text>", NULL, subfileListColumnMapper},
|
|
|
{ NULL, "wuFilesRead", "((partition, wuid), name)", stringColumnMapper}
|
|
|
};
|
|
@@ -1157,6 +1160,7 @@ interface ICassandraSession : public IInterface // MORE - rename!
|
|
|
|
|
|
virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
|
|
|
virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
|
|
|
+ virtual IPTree *cassandraToWorkunitXML(const char *wuid) const = 0;
|
|
|
};
|
|
|
|
|
|
void getBoundFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
|
|
@@ -2126,21 +2130,29 @@ private:
|
|
|
bool descending;
|
|
|
};
|
|
|
|
|
|
-class CCassandraWorkUnit : public CLocalWorkUnit
|
|
|
+static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
|
|
|
+{
|
|
|
+ VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
|
|
|
+ if (connection)
|
|
|
+ connection->changeMode(RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
|
|
|
+ else
|
|
|
+ connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
|
|
|
+ if (!connection)
|
|
|
+ throw MakeStringException(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
|
|
|
+}
|
|
|
+
|
|
|
+class CCassandraWorkUnit : public CPersistedWorkUnit
|
|
|
{
|
|
|
- // IMPLEMENT_IINTERFACE;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, bool write)
|
|
|
- : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
|
|
|
+ CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock)
|
|
|
+ : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock)
|
|
|
{
|
|
|
- CLocalWorkUnit::loadPTree(wuXML);
|
|
|
+ CPersistedWorkUnit::loadPTree(wuXML);
|
|
|
allDirty = false; // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
|
|
|
memset(childLoaded, 0, sizeof(childLoaded));
|
|
|
- abortDirty = true;
|
|
|
- abortState = false;
|
|
|
- if (write)
|
|
|
- _lockRemote();
|
|
|
+ if (daliLock)
|
|
|
+ createBatch();
|
|
|
}
|
|
|
~CCassandraWorkUnit()
|
|
|
{
|
|
@@ -2148,15 +2160,17 @@ public:
|
|
|
|
|
|
virtual void forceReload()
|
|
|
{
|
|
|
- printStackReport();
|
|
|
- UNIMPLEMENTED;
|
|
|
+ synchronized sync(locked); // protect locked workunits (uncommited writes) from reload
|
|
|
+ loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
|
|
|
+ memset(childLoaded, 0, sizeof(childLoaded));
|
|
|
+ allDirty = false;
|
|
|
abortDirty = true;
|
|
|
}
|
|
|
|
|
|
virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
|
|
|
{
|
|
|
const char *wuid = queryWuid();
|
|
|
- CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
|
|
|
+ CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
|
|
|
if (!batch)
|
|
|
batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
|
|
|
deleteChildren(wuid);
|
|
@@ -2172,7 +2186,7 @@ public:
|
|
|
|
|
|
virtual void commit()
|
|
|
{
|
|
|
- CLocalWorkUnit::commit();
|
|
|
+ CPersistedWorkUnit::commit();
|
|
|
if (sessionCache->queryTraceLevel() >= 8)
|
|
|
{
|
|
|
StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
|
|
@@ -2255,82 +2269,66 @@ public:
|
|
|
virtual void setUser(const char *user)
|
|
|
{
|
|
|
if (trackSecondaryChange(user, "@submitID"))
|
|
|
- CLocalWorkUnit::setUser(user);
|
|
|
+ CPersistedWorkUnit::setUser(user);
|
|
|
}
|
|
|
virtual void setClusterName(const char *cluster)
|
|
|
{
|
|
|
if (trackSecondaryChange(cluster, "@clusterName"))
|
|
|
- CLocalWorkUnit::setClusterName(cluster);
|
|
|
+ CPersistedWorkUnit::setClusterName(cluster);
|
|
|
}
|
|
|
virtual void setJobName(const char *jobname)
|
|
|
{
|
|
|
if (trackSecondaryChange(jobname, "@jobName"))
|
|
|
- CLocalWorkUnit::setJobName(jobname);
|
|
|
+ CPersistedWorkUnit::setJobName(jobname);
|
|
|
}
|
|
|
virtual void setState(WUState state)
|
|
|
{
|
|
|
if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
|
|
|
- CLocalWorkUnit::setState(state);
|
|
|
+ CPersistedWorkUnit::setState(state);
|
|
|
}
|
|
|
virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
|
|
|
{
|
|
|
VStringBuffer xpath("Application/%s/%s", app, propname);
|
|
|
if (trackSecondaryChange(value, xpath))
|
|
|
- CLocalWorkUnit::setApplicationValue(app, propname, value, overwrite);
|
|
|
+ CPersistedWorkUnit::setApplicationValue(app, propname, value, overwrite);
|
|
|
}
|
|
|
|
|
|
virtual void _lockRemote()
|
|
|
{
|
|
|
- batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
|
|
|
+ lockWuid(daliLock, queryWuid());
|
|
|
+ createBatch();
|
|
|
}
|
|
|
|
|
|
virtual void _unlockRemote()
|
|
|
{
|
|
|
-// printStackReport();
|
|
|
-// UNIMPLEMENTED;
|
|
|
commit();
|
|
|
batch.clear();
|
|
|
- }
|
|
|
-
|
|
|
- virtual void subscribe(WUSubscribeOptions options)
|
|
|
- {
|
|
|
-// printStackReport();
|
|
|
-// UNIMPLEMENTED;
|
|
|
- }
|
|
|
-
|
|
|
- virtual void unsubscribe()
|
|
|
- {
|
|
|
-// printStackReport();
|
|
|
-// UNIMPLEMENTED;
|
|
|
- }
|
|
|
-
|
|
|
- virtual bool aborting() const
|
|
|
- {
|
|
|
- return false;
|
|
|
- // MORE - work out what to do about aborts in Cassandra
|
|
|
-// printStackReport();
|
|
|
-// UNIMPLEMENTED;
|
|
|
+ if (daliLock)
|
|
|
+ {
|
|
|
+ daliLock->close(true);
|
|
|
+ daliLock.clear();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
virtual IWUResult * updateResultByName(const char * name)
|
|
|
{
|
|
|
- return noteDirty(CLocalWorkUnit::updateResultByName(name));
|
|
|
+ return noteDirty(CPersistedWorkUnit::updateResultByName(name));
|
|
|
}
|
|
|
virtual IWUResult * updateResultBySequence(unsigned seq)
|
|
|
{
|
|
|
- return noteDirty(CLocalWorkUnit::updateResultBySequence(seq));
|
|
|
+ return noteDirty(CPersistedWorkUnit::updateResultBySequence(seq));
|
|
|
}
|
|
|
virtual IWUResult * updateTemporaryByName(const char * name)
|
|
|
{
|
|
|
- return noteDirty(CLocalWorkUnit::updateTemporaryByName(name));
|
|
|
+ return noteDirty(CPersistedWorkUnit::updateTemporaryByName(name));
|
|
|
}
|
|
|
virtual IWUResult * updateVariableByName(const char * name)
|
|
|
{
|
|
|
- return noteDirty(CLocalWorkUnit::updateVariableByName(name));
|
|
|
+ return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
|
|
|
}
|
|
|
virtual IWUException *createException()
|
|
|
{
|
|
|
- IWUException *result = CLocalWorkUnit::createException();
|
|
|
+ IWUException *result = CPersistedWorkUnit::createException();
|
|
|
VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
|
|
|
noteDirty(xpath, wuExceptionsMappings);
|
|
|
return result;
|
|
@@ -2343,7 +2341,7 @@ public:
|
|
|
trackSecondaryChange(fromP->queryProp(*search), *search);
|
|
|
for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
|
|
|
checkChildLoaded(**table);
|
|
|
- CLocalWorkUnit::copyWorkUnit(cached, all);
|
|
|
+ CPersistedWorkUnit::copyWorkUnit(cached, all);
|
|
|
memset(childLoaded, 1, sizeof(childLoaded));
|
|
|
allDirty = true;
|
|
|
}
|
|
@@ -2351,7 +2349,7 @@ public:
|
|
|
{
|
|
|
if (file)
|
|
|
{
|
|
|
- CLocalWorkUnit::noteFileRead(file);
|
|
|
+ CPersistedWorkUnit::noteFileRead(file);
|
|
|
VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
|
|
|
noteDirty(xpath, wuFilesReadMappings);
|
|
|
}
|
|
@@ -2370,44 +2368,44 @@ public:
|
|
|
virtual void _loadFilesRead() const
|
|
|
{
|
|
|
checkChildLoaded(wuFilesReadTable); // Lazy populate the FilesRead branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadFilesRead();
|
|
|
+ CPersistedWorkUnit::_loadFilesRead();
|
|
|
}
|
|
|
|
|
|
virtual void _loadResults() const
|
|
|
{
|
|
|
checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadResults();
|
|
|
+ CPersistedWorkUnit::_loadResults();
|
|
|
}
|
|
|
|
|
|
virtual void _loadVariables() const
|
|
|
{
|
|
|
checkChildLoaded(wuVariablesTable); // Lazy populate the Variables branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadVariables();
|
|
|
+ CPersistedWorkUnit::_loadVariables();
|
|
|
}
|
|
|
|
|
|
virtual void _loadTemporaries() const
|
|
|
{
|
|
|
checkChildLoaded(wuTemporariesTable); // Lazy populate the Temporaries branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadTemporaries();
|
|
|
+ CPersistedWorkUnit::_loadTemporaries();
|
|
|
}
|
|
|
|
|
|
virtual void _loadStatistics() const
|
|
|
{
|
|
|
checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadStatistics();
|
|
|
+ CPersistedWorkUnit::_loadStatistics();
|
|
|
}
|
|
|
|
|
|
virtual void _loadExceptions() const
|
|
|
{
|
|
|
checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
|
|
|
- CLocalWorkUnit::_loadExceptions();
|
|
|
+ CPersistedWorkUnit::_loadExceptions();
|
|
|
}
|
|
|
|
|
|
virtual void clearExceptions()
|
|
|
{
|
|
|
CriticalBlock b(crit);
|
|
|
noteDirty("*Exceptions/Exception", wuExceptionsMappings);
|
|
|
- CLocalWorkUnit::clearExceptions();
|
|
|
+ CPersistedWorkUnit::clearExceptions();
|
|
|
}
|
|
|
|
|
|
virtual IPropertyTree *queryPTree() const
|
|
@@ -2419,6 +2417,10 @@ public:
|
|
|
return p;
|
|
|
}
|
|
|
protected:
|
|
|
+ void createBatch()
|
|
|
+ {
|
|
|
+ batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
|
|
|
+ }
|
|
|
// Delete child table rows
|
|
|
void deleteChildren(const char *wuid)
|
|
|
{
|
|
@@ -2584,8 +2586,6 @@ protected:
|
|
|
dirtyPaths.setValue(xpath, table);
|
|
|
}
|
|
|
Linked<const ICassandraSession> sessionCache;
|
|
|
- mutable bool abortDirty;
|
|
|
- mutable bool abortState;
|
|
|
mutable bool childLoaded[ChildTablesSize];
|
|
|
bool allDirty;
|
|
|
Owned<IPTree> prev;
|
|
@@ -2593,6 +2593,7 @@ protected:
|
|
|
Owned<CassandraBatch> batch;
|
|
|
MapStringTo<const CassandraXmlMapping *> dirtyPaths;
|
|
|
IArrayOf<IWUResult> dirtyResults;
|
|
|
+ Owned<IRemoteConnection> daliLock; // We still use dali for locking
|
|
|
};
|
|
|
|
|
|
class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
|
|
@@ -2678,7 +2679,9 @@ public:
|
|
|
// If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
|
|
|
Owned<IPTree> wuXML = createPTree(useWuid);
|
|
|
wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
|
|
|
- Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, true);
|
|
|
+ Owned<IRemoteConnection> daliLock;
|
|
|
+ lockWuid(daliLock, useWuid);
|
|
|
+ Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
|
|
|
return wu.getClear();
|
|
|
}
|
|
|
suffix = rand_r(&randState);
|
|
@@ -2686,18 +2689,21 @@ public:
|
|
|
suffixLength++;
|
|
|
}
|
|
|
}
|
|
|
- virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
|
|
|
+ virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
|
|
|
if (wuXML)
|
|
|
- return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, lock);
|
|
|
+ return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL);
|
|
|
else
|
|
|
return NULL;
|
|
|
}
|
|
|
virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
+ // We still use dali for the locks
|
|
|
+ Owned<IRemoteConnection> daliLock;
|
|
|
+ lockWuid(daliLock, wuid);
|
|
|
Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
|
|
|
- Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, true);
|
|
|
+ Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
|
|
|
return wu.getClear();
|
|
|
}
|
|
|
|
|
@@ -2772,7 +2778,7 @@ public:
|
|
|
bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
|
|
|
if (!result)
|
|
|
{
|
|
|
- Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid // MORE - except when we merge by thor time....
|
|
|
+ Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
|
|
|
if (startOffset)
|
|
|
{
|
|
|
StringBuffer startWuid;
|
|
@@ -2934,7 +2940,7 @@ public:
|
|
|
{
|
|
|
merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
|
|
|
// Convert into a value IN [] which we do via a merge
|
|
|
- // MORE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
|
|
|
+ // NOTE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
|
|
|
StringArray fieldValues;
|
|
|
const IPostFilter &best= remoteWildFilters.item(0);
|
|
|
_getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
|
|
@@ -2988,10 +2994,8 @@ public:
|
|
|
return getUnsignedResult(NULL, getSingleResult(result));
|
|
|
}
|
|
|
/*
|
|
|
- virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
|
|
|
- virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
|
|
|
- virtual bool isAborting(const char *wuid) const { UNIMPLEMENTED; }
|
|
|
- virtual void clearAborting(const char *wuid) { UNIMPLEMENTED; }
|
|
|
+ virtual bool isAborting(const char *wuid) const - done in the base class using dali
|
|
|
+ virtual void clearAborting(const char *wuid) - done in the base class using dali
|
|
|
*/
|
|
|
virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
|
|
|
{
|