|
@@ -2162,6 +2162,7 @@ interface ICassandraSession
|
|
|
{
|
|
|
virtual CassSession *querySession() const = 0;
|
|
|
virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
|
|
|
+ virtual unsigned queryTraceLevel() const = 0;
|
|
|
};
|
|
|
|
|
|
|
|
@@ -2716,8 +2717,8 @@ const CassandraXmlMapping workunitsMappings [] =
|
|
|
|
|
|
const CassandraXmlMapping ownerMappings [] =
|
|
|
{
|
|
|
- {"submitID", "text", "@submitID", stringColumnMapper},
|
|
|
{"wuid", "text", NULL, rootNameColumnMapper},
|
|
|
+ {"submitID", "text", "@submitID", stringColumnMapper},
|
|
|
{"clustername", "text", "@clusterName", stringColumnMapper},
|
|
|
{"jobname", "text", "@jobName", stringColumnMapper},
|
|
|
{"priorityclass", "int", "@priorityClass", intColumnMapper},
|
|
@@ -2830,7 +2831,8 @@ const CassResult *fetchDataForKey(const char *key, CassSession *session, const C
|
|
|
if (key)
|
|
|
selectQuery.appendf(" where %s='%s'", mappings->columnName, key); // MORE - should consider using prepared for this - is it faster?
|
|
|
selectQuery.append(';');
|
|
|
- DBGLOG("%s", selectQuery.str());
|
|
|
+ //if (traceLevel >= 2)
|
|
|
+ // DBGLOG("%s", selectQuery.str());
|
|
|
CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
|
|
|
CassandraFuture future(cass_session_execute(session, statement));
|
|
|
future.wait("execute");
|
|
@@ -2883,24 +2885,33 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
|
|
|
VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
|
|
|
Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 0, cass_string_init(wuid))); // wuid is first column of all tables
|
|
|
bindFromXML(mappings, update, inXML, 0);
|
|
|
check(cass_batch_add_statement(batch, update));
|
|
|
}
|
|
|
|
|
|
-void updateSecondaryTable(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML,const char *prevKey)
|
|
|
+void deleteSecondaryByKey(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, const char *key)
|
|
|
{
|
|
|
- if (prevKey)
|
|
|
+ if (key && *key)
|
|
|
{
|
|
|
StringBuffer names;
|
|
|
StringBuffer tableName;
|
|
|
getFieldNames(mappings, names, tableName);
|
|
|
- VStringBuffer insertQuery("DELETE from HPCC.%s where %s=?;", tableName.str(), mappings[0].columnName);
|
|
|
+ VStringBuffer insertQuery("DELETE from HPCC.%s where wuid=? and %s=?;", tableName.str(), mappings[1].columnName);
|
|
|
Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(prevKey)));
|
|
|
+ check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 1, cass_string_init(key)));
|
|
|
+ check(cass_batch_add_statement(batch, update));
|
|
|
}
|
|
|
- simpleXMLtoCassandra(session, batch, mappings, wuid, inXML);
|
|
|
+}
|
|
|
+
|
|
|
+void updateSecondaryTable(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML,const char *prevKey)
|
|
|
+{
|
|
|
+ if (prevKey && *prevKey)
|
|
|
+ deleteSecondaryByKey(session, batch, mappings, wuid, prevKey);
|
|
|
+ if (inXML->hasProp(mappings[1].xpath))
|
|
|
+ simpleXMLtoCassandra(session, batch, mappings, wuid, inXML);
|
|
|
}
|
|
|
|
|
|
extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, int defaultValue)
|
|
@@ -3215,20 +3226,34 @@ public:
|
|
|
|
|
|
virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
|
|
|
{
|
|
|
+ const char *wuid = queryWuid();
|
|
|
CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
|
|
|
- printStackReport();
|
|
|
- UNIMPLEMENTED;
|
|
|
+ if (!batch)
|
|
|
+ batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
|
|
|
+ Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from HPCC.workunits where wuid=?;");
|
|
|
+ CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
+ check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_batch_add_statement(*batch, update));
|
|
|
+ deleteSecondaryByKey(sessionCache, *batch, ownerMappings, wuid, queryUser());
|
|
|
+ CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
|
|
|
+ futureBatch.wait("execute");
|
|
|
+ batch.clear();
|
|
|
}
|
|
|
|
|
|
virtual void commit()
|
|
|
{
|
|
|
CLocalWorkUnit::commit();
|
|
|
- StringBuffer s; toXML(p, s); DBGLOG("%s", s.str());
|
|
|
+ if (sessionCache->queryTraceLevel() >= 8)
|
|
|
+ {
|
|
|
+ StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
|
|
|
+ }
|
|
|
if (batch)
|
|
|
{
|
|
|
const char *wuid = queryWuid();
|
|
|
if (basicDirty)
|
|
|
+ {
|
|
|
updateSecondaryTable(sessionCache, *batch, ownerMappings, wuid, p, prevOwner);
|
|
|
+ }
|
|
|
simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, wuid, p); // This just does the parent row
|
|
|
if (allDirty)
|
|
|
{
|
|
@@ -3278,6 +3303,7 @@ public:
|
|
|
{
|
|
|
// printStackReport();
|
|
|
// UNIMPLEMENTED;
|
|
|
+ commit();
|
|
|
batch.clear();
|
|
|
}
|
|
|
|
|
@@ -3379,7 +3405,7 @@ protected:
|
|
|
class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
|
|
|
{
|
|
|
public:
|
|
|
- CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new())
|
|
|
+ CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0)
|
|
|
{
|
|
|
StringArray options;
|
|
|
Owned<IPTreeIterator> it = props->getElements("Option");
|
|
@@ -3390,8 +3416,15 @@ public:
|
|
|
const char *val = item.queryProp("@value");
|
|
|
if (opt && val)
|
|
|
{
|
|
|
- VStringBuffer optstr("%s=%s", opt, val);
|
|
|
- options.append(optstr);
|
|
|
+ if (strieq(opt, "randomWuidSuffix"))
|
|
|
+ randomizeSuffix = atoi(val);
|
|
|
+ else if (strieq(opt, "traceLevel"))
|
|
|
+ traceLevel = atoi(val);
|
|
|
+ else
|
|
|
+ {
|
|
|
+ VStringBuffer optstr("%s=%s", opt, val);
|
|
|
+ options.append(optstr);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
cluster.setOptions(options);
|
|
@@ -3429,16 +3462,36 @@ public:
|
|
|
*/
|
|
|
virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
- unsigned suffix = 0;
|
|
|
+ unsigned suffix;
|
|
|
+ unsigned suffixLength;
|
|
|
+ if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
|
|
|
+ {
|
|
|
+ suffix = rand();
|
|
|
+ suffixLength = randomizeSuffix;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ suffix = 0;
|
|
|
+ suffixLength = 0;
|
|
|
+ }
|
|
|
+ Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO HPCC.workunits (wuid) VALUES (?) IF NOT EXISTS;");
|
|
|
loop
|
|
|
{
|
|
|
// Create a unique WUID by adding suffixes until we managed to add a new value
|
|
|
StringBuffer useWuid(wuid);
|
|
|
if (suffix)
|
|
|
- useWuid.appendf("-%u", suffix);
|
|
|
- VStringBuffer insert("INSERT INTO HPCC.workunits (wuid) VALUES ('%s') IF NOT EXISTS;", useWuid.str());
|
|
|
- DBGLOG("%s", insert.str());
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init(insert.str()), 0));
|
|
|
+ {
|
|
|
+ useWuid.append("-");
|
|
|
+ for (unsigned i = 0; i < suffixLength; i++)
|
|
|
+ {
|
|
|
+ useWuid.appendf("%c", '0'+suffix%10);
|
|
|
+ suffix /= 10;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ CassandraStatement statement(cass_prepared_bind(*prepared));
|
|
|
+ check(cass_statement_bind_string(statement, 0, cass_string_init(useWuid.str())));
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("Try creating %s", useWuid.str());
|
|
|
CassandraFuture future(cass_session_execute(session, statement));
|
|
|
future.wait("execute");
|
|
|
CassandraResult result(cass_future_get_result(future));
|
|
@@ -3451,7 +3504,9 @@ public:
|
|
|
wu->lockRemote(true);
|
|
|
return wu.getClear();
|
|
|
}
|
|
|
- suffix++;
|
|
|
+ suffix = rand();
|
|
|
+ if (suffixLength<9)
|
|
|
+ suffixLength++;
|
|
|
}
|
|
|
}
|
|
|
virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
|
|
@@ -3485,7 +3540,15 @@ public:
|
|
|
virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField * sortorder, WUSortField * filters, const void * filterbuf,
|
|
|
unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
|
|
|
ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
|
|
|
- virtual unsigned numWorkUnits() { UNIMPLEMENTED; }
|
|
|
+ virtual unsigned numWorkUnits()
|
|
|
+ {
|
|
|
+ Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM HPCC.workunits;");
|
|
|
+ CassandraStatement statement(cass_prepared_bind(*prepared));
|
|
|
+ CassandraFuture future(cass_session_execute(session, statement));
|
|
|
+ future.wait("select count(*)");
|
|
|
+ CassandraResult result(cass_future_get_result(future));
|
|
|
+ return getUnsignedResult(NULL, getSingleResult(result));
|
|
|
+ }
|
|
|
/*
|
|
|
virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
|
|
|
virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
|
|
@@ -3494,17 +3557,20 @@ public:
|
|
|
virtual void clearAborting(const char *wuid) { UNIMPLEMENTED; }
|
|
|
*/
|
|
|
virtual CassSession *querySession() const { return session; };
|
|
|
+ virtual unsigned queryTraceLevel() const { return traceLevel; };
|
|
|
virtual CassandraPrepared *prepareStatement(const char *query) const
|
|
|
{
|
|
|
CriticalBlock b(cacheCrit);
|
|
|
Linked<CassandraPrepared> cached = preparedCache.getValue(query);
|
|
|
if (cached)
|
|
|
{
|
|
|
- DBGLOG("prepareStatement: Reusing %s", query);
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("prepareStatement: Reusing %s", query);
|
|
|
return cached.getClear();
|
|
|
}
|
|
|
{
|
|
|
- DBGLOG("prepareStatement: Binding %s", query);
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("prepareStatement: Binding %s", query);
|
|
|
// We don't want to block cache lookups while we prepare a new bound statement
|
|
|
// Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
|
|
|
CriticalUnblock b(cacheCrit);
|
|
@@ -3589,14 +3655,12 @@ private:
|
|
|
const char *childName = child->queryName();
|
|
|
parent->addPropTree(childName, child.getClear());
|
|
|
}
|
|
|
- StringBuffer buf;
|
|
|
- toXML(parent, buf);
|
|
|
- DBGLOG("%s", buf.str());
|
|
|
Owned<IPropertyTreeIterator> iter = parent->getElements("*");
|
|
|
return createConstWUIterator(iter, secmgr, secuser);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ unsigned randomizeSuffix;
|
|
|
+ unsigned traceLevel;
|
|
|
CassandraCluster cluster;
|
|
|
CassandraSession session;
|
|
|
mutable CriticalSection cacheCrit;
|