|
@@ -3115,7 +3115,7 @@ static const CassandraXmlMapping workunitsMappings [] =
|
|
|
{"action", "text", "Action", stringColumnMapper},
|
|
|
{"protected", "boolean", "@protected", boolColumnMapper},
|
|
|
{"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
|
|
|
- {"totalThorTime", "int", "@totalThorTime", intColumnMapper},
|
|
|
+ {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
|
|
|
{"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
|
|
|
|
|
|
{"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
|
|
@@ -3149,7 +3149,7 @@ static const CassandraXmlMapping workunitInfoMappings [] = // A cut down versio
|
|
|
{"action", "text", "Action", stringColumnMapper},
|
|
|
{"protected", "boolean", "@protected", boolColumnMapper},
|
|
|
{"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
|
|
|
- {"totalThorTime", "int", "@totalThorTime", intColumnMapper},
|
|
|
+ {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
|
|
|
{"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
|
|
|
{ NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
|
|
|
};
|
|
@@ -3172,14 +3172,14 @@ static const CassandraXmlMapping searchMappings [] =
|
|
|
{"action", "text", "Action", stringColumnMapper},
|
|
|
{"protected", "boolean", "@protected", boolColumnMapper},
|
|
|
{"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
|
|
|
- {"totalThorTime", "int", "@totalThorTime", intColumnMapper},
|
|
|
+ {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
|
|
|
{"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
|
|
|
{ NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
|
|
|
};
|
|
|
|
|
|
-// The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search
|
|
|
+// The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
|
|
|
|
|
|
-const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", NULL};
|
|
|
+const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
|
|
|
|
|
|
static const CassandraXmlMapping uniqueSearchMappings [] =
|
|
|
{
|
|
@@ -3190,6 +3190,11 @@ static const CassandraXmlMapping uniqueSearchMappings [] =
|
|
|
{ NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
|
|
|
};
|
|
|
|
|
|
+// The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
|
|
|
+
|
|
|
+const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
|
|
|
+
|
|
|
+
|
|
|
/*
|
|
|
* Some thoughts on the secondary tables:
|
|
|
* 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
|
|
@@ -3794,6 +3799,7 @@ private:
|
|
|
interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
|
|
|
{
|
|
|
virtual bool hasPostFilters() const = 0;
|
|
|
+ virtual bool isMerging() const = 0;
|
|
|
};
|
|
|
|
|
|
/*
|
|
@@ -3816,7 +3822,7 @@ public:
|
|
|
{
|
|
|
return hint;
|
|
|
}
|
|
|
- void noteWuid(const char *wuid, unsigned row)
|
|
|
+ void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
|
|
|
{
|
|
|
CriticalBlock b(crit);
|
|
|
ForEachItemInRev(idx, rows)
|
|
@@ -3828,6 +3834,7 @@ public:
|
|
|
}
|
|
|
rows.add(row, idx+1);
|
|
|
wuids.add(wuid, idx+1);
|
|
|
+ fieldValues.add(fieldValue, idx+1);
|
|
|
}
|
|
|
IConstWorkUnitIteratorEx *getResult() const
|
|
|
{
|
|
@@ -3839,7 +3846,7 @@ public:
|
|
|
CriticalBlock b(crit);
|
|
|
result.set(_result);
|
|
|
}
|
|
|
- unsigned lookupStartRow(StringBuffer &wuid, unsigned startOffset)
|
|
|
+ unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset)
|
|
|
{
|
|
|
// See if we can provide a base wuid to search above/below
|
|
|
CriticalBlock b(crit);
|
|
@@ -3849,6 +3856,7 @@ public:
|
|
|
if (foundRow <= startOffset)
|
|
|
{
|
|
|
wuid.set(wuids.item(idx));
|
|
|
+ fieldValue.set(fieldValues.item(idx));
|
|
|
return foundRow;
|
|
|
}
|
|
|
}
|
|
@@ -3866,6 +3874,7 @@ private:
|
|
|
mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
|
|
|
UnsignedArray rows;
|
|
|
StringArray wuids;
|
|
|
+ StringArray fieldValues;
|
|
|
Owned<IConstWorkUnitIteratorEx> result;
|
|
|
__uint64 hint;
|
|
|
unsigned lastAccess;
|
|
@@ -3904,6 +3913,10 @@ public:
|
|
|
{
|
|
|
return postFilters.length() != 0;
|
|
|
}
|
|
|
+ virtual bool isMerging() const
|
|
|
+ {
|
|
|
+ return results.length() > 1;
|
|
|
+ }
|
|
|
virtual bool first()
|
|
|
{
|
|
|
inputs.kill();
|
|
@@ -3924,10 +3937,12 @@ public:
|
|
|
if (!nextSource)
|
|
|
{
|
|
|
if (cache && last)
|
|
|
- cache->noteWuid(last->queryWuid(), rowNum);
|
|
|
+ {
|
|
|
+ cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
|
|
|
+ }
|
|
|
return false;
|
|
|
}
|
|
|
- Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource));
|
|
|
+ Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
|
|
|
bool postFiltered = false;
|
|
|
ForEachItemIn(pfIdx, postFilters)
|
|
|
{
|
|
@@ -3940,6 +3955,7 @@ public:
|
|
|
if (!postFiltered)
|
|
|
{
|
|
|
current.setown(createConstWorkUnitInfo(*wuXML));
|
|
|
+ lastThorTime.set(wuXML->queryProp("@totalThorTime"));
|
|
|
rowNum++;
|
|
|
return true;
|
|
|
}
|
|
@@ -3984,6 +4000,7 @@ private:
|
|
|
CIArrayOf<PostFilter> postFilters;
|
|
|
Owned<IConstWorkUnitInfo> current;
|
|
|
Linked<CCassandraWuUQueryCacheEntry> cache;
|
|
|
+ StringAttr lastThorTime;
|
|
|
unsigned compareColumn;
|
|
|
unsigned startRowNum;
|
|
|
unsigned rowNum;
|
|
@@ -4025,6 +4042,10 @@ public:
|
|
|
{
|
|
|
return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
|
|
|
}
|
|
|
+ virtual bool isMerging() const
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
private:
|
|
|
void readFirst()
|
|
|
{
|
|
@@ -4125,6 +4146,10 @@ public:
|
|
|
{
|
|
|
return false;
|
|
|
}
|
|
|
+ virtual bool isMerging() const
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
private:
|
|
|
Owned<IConstWorkUnitIteratorEx> input;
|
|
|
unsigned startOffset;
|
|
@@ -4527,10 +4552,7 @@ protected:
|
|
|
{
|
|
|
deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, *batch);
|
|
|
if (p->hasProp(xpath))
|
|
|
- {
|
|
|
simpleXMLtoCassandra(sessionCache, *batch, searchMappings, p, xpath);
|
|
|
- simpleXMLtoCassandra(sessionCache, *batch, uniqueSearchMappings, p, xpath);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
void deleteSecondaries(const char *wuid)
|
|
@@ -4541,8 +4563,14 @@ protected:
|
|
|
|
|
|
void updateSecondaries(const char *wuid)
|
|
|
{
|
|
|
- for (const char * const *search = searchPaths; *search; search++)
|
|
|
+ const char * const *search;
|
|
|
+ for (search = searchPaths; *search; search++)
|
|
|
updateSecondaryTable(*search, prev->queryProp(*search), wuid);
|
|
|
+ for (search = wildSearchPaths; *search; search++)
|
|
|
+ {
|
|
|
+ if (p->hasProp(*search))
|
|
|
+ simpleXMLtoCassandra(sessionCache, *batch, uniqueSearchMappings, p, *search);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
|
|
@@ -4741,29 +4769,35 @@ public:
|
|
|
CIArrayOf<PostFilter> poorFilters;
|
|
|
CIArrayOf<PostFilter> remoteWildFilters;
|
|
|
Owned<IConstWorkUnitIteratorEx> result;
|
|
|
- bool needsPostSort = ((sortorder & 0xff) != WUSFwuid);
|
|
|
- result.setown(cached->getResult());
|
|
|
+ WUSortField baseSort = (WUSortField) (sortorder & 0xff);
|
|
|
+ StringBuffer thorTimeThreshold;
|
|
|
+ bool sortByThorTime = (baseSort == WUSFtotalthortime);
|
|
|
+ bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
|
|
|
+ bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
|
|
|
if (!result)
|
|
|
{
|
|
|
- Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortorder!=WUSFwuid); // Merge by wuid. We always fetch ordered by wuid desc, except if the order requested is wuid asc.
|
|
|
- if (startOffset > 4)
|
|
|
+ Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid // MORE - except when we merge by thor time....
|
|
|
+ if (startOffset)
|
|
|
{
|
|
|
StringBuffer startWuid;
|
|
|
- unsigned found = cached->lookupStartRow(startWuid, startOffset);
|
|
|
+ unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
|
|
|
if (found)
|
|
|
{
|
|
|
- if (sortorder==WUSFwuid)
|
|
|
- startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
|
|
|
- else
|
|
|
- startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
|
|
|
-
|
|
|
+ if (!sortByThorTime)
|
|
|
+ {
|
|
|
+ if (sortDescending)
|
|
|
+ startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
|
|
|
+ else
|
|
|
+ startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
|
|
|
+ thorTimeThreshold.clear();
|
|
|
+ }
|
|
|
wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
|
|
|
startOffset -= found;
|
|
|
merger->setStartOffset(found);
|
|
|
}
|
|
|
}
|
|
|
const char *fv = (const char *) filterbuf;
|
|
|
- while (*thisFilter)
|
|
|
+ while (thisFilter && *thisFilter)
|
|
|
{
|
|
|
WUSortField field = (WUSortField) (*thisFilter & 0xff);
|
|
|
bool isWild = (*thisFilter & WUSFwild) != 0;
|
|
@@ -4793,12 +4827,17 @@ public:
|
|
|
case WUSFwuid: // Acts as wuidLo when specified as a filter
|
|
|
case WUSFwuidhigh:
|
|
|
// Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
|
|
|
- mergeFilter(wuidFilters, field, fv);
|
|
|
+ if (sortByThorTime)
|
|
|
+ remoteWildFilters.append(*new PostFilter(field, fv, true));
|
|
|
+ else
|
|
|
+ mergeFilter(wuidFilters, field, fv);
|
|
|
break;
|
|
|
case WUSFfileread:
|
|
|
UNIMPLEMENTED;
|
|
|
case WUSFtotalthortime:
|
|
|
- UNIMPLEMENTED; // Numeric sort makes it hard to do the filter remotely. And ESP does not presently use. Though it would be pretty handy...
|
|
|
+ // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
|
|
|
+ if (thorTimeThreshold.isEmpty()) // If not a continuation
|
|
|
+ formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
|
|
|
break;
|
|
|
case WUSFwildwuid:
|
|
|
// Translate into a range - note that we only support trailing * wildcard.
|
|
@@ -4827,7 +4866,25 @@ public:
|
|
|
fv = fv + strlen(fv)+1;
|
|
|
}
|
|
|
|
|
|
- if (goodFilters.length())
|
|
|
+ if (sortByThorTime)
|
|
|
+ {
|
|
|
+ merger->addPostFilters(goodFilters, 0);
|
|
|
+ merger->addPostFilters(poorFilters, 0);
|
|
|
+ merger->addPostFilters(remoteWildFilters, 0);
|
|
|
+ if (wuidFilters.length())
|
|
|
+ {
|
|
|
+ // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
|
|
|
+ // We need two queries - one where searchField==startSearchField and wuid > startWuid,
|
|
|
+ // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
|
|
|
+ // though there may be postfilters
|
|
|
+ assertex(wuidFilters.length()==1);
|
|
|
+ merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
|
|
|
+ merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
|
|
|
+ }
|
|
|
+ else if (goodFilters.length())
|
|
|
{
|
|
|
merger->addPostFilters(goodFilters, 1);
|
|
|
merger->addPostFilters(poorFilters, 0);
|
|
@@ -4874,7 +4931,7 @@ public:
|
|
|
else
|
|
|
result.setown(merger.getClear());
|
|
|
}
|
|
|
- if (startOffset || needsPostSort || result->hasPostFilters()) // we need a subpage if we have fetched anything other than exactly the rows requested
|
|
|
+ if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
|
|
|
result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
|
|
|
if (cachehint)
|
|
|
{
|
|
@@ -4882,13 +4939,6 @@ public:
|
|
|
CriticalBlock b(cacheCrit);
|
|
|
cacheIdMap.setValue(*cachehint, cached.getClear());
|
|
|
}
|
|
|
- // Caching/continuation...
|
|
|
- // For continuation purposes, it's useful to pass the client something that allows us to give the next page, accurately and efficiently
|
|
|
- // This will typically be the wuid of the last row (if not sorting) or the sortfield (if sorting)
|
|
|
- // We would also prefer NOT to recalculate the postsort/postfilter cases if we don't have to - the cachehint allows us to reuse a previous case
|
|
|
- // We can use a cachehint to retrieve a piece of user state info, that will contain this info.
|
|
|
- // For caching purposes, we would prefer NOT to recalculate the searches for other users, but I suspect that we really should (since there is no way to know what is a reasonable expiry).
|
|
|
- // For commonly used pages in eclwatch, esp should itself cache the page (e.g. activity page).
|
|
|
if (total)
|
|
|
*total = 0; // We don't know
|
|
|
return result.getClear();
|
|
@@ -5372,6 +5422,65 @@ private:
|
|
|
return executeQuery(session, statement);
|
|
|
}
|
|
|
|
|
|
+ // Fetch rows from the search table, by thorTime, above a threshold
|
|
|
+
|
|
|
+ const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
|
|
|
+ {
|
|
|
+ StringBuffer names;
|
|
|
+ StringBuffer tableName;
|
|
|
+ getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
|
|
|
+ VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "");
|
|
|
+ if (threshold && *threshold)
|
|
|
+ selectQuery.appendf(" where fieldValue >= '%s'", threshold);
|
|
|
+ if (descending)
|
|
|
+ selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
|
|
|
+ else
|
|
|
+ selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
|
|
|
+ if (limit)
|
|
|
+ selectQuery.appendf(" LIMIT %u", limit);
|
|
|
+ selectQuery.append(';');
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("%s", selectQuery.str());
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
+ return executeQuery(session, statement);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
|
|
|
+ // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
|
|
|
+
|
|
|
+ const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
|
|
|
+ {
|
|
|
+ StringBuffer names;
|
|
|
+ StringBuffer tableName;
|
|
|
+ getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
|
|
|
+ const char *wuidTest;
|
|
|
+ const char *fieldTest;
|
|
|
+ if (descending)
|
|
|
+ {
|
|
|
+ wuidTest = ">";
|
|
|
+ fieldTest = wuid ? "=" : "<";
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ wuidTest = "<";
|
|
|
+ fieldTest = wuid ? "=" : ">";
|
|
|
+ }
|
|
|
+ VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s' and fieldValue %s '%s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "", fieldTest, threshold);
|
|
|
+ if (wuid)
|
|
|
+ selectQuery.appendf(" and wuid %s '%s'", wuidTest, wuid);
|
|
|
+ if (descending)
|
|
|
+ selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
|
|
|
+ else
|
|
|
+ selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
|
|
|
+ if (limit)
|
|
|
+ selectQuery.appendf(" LIMIT %u", limit);
|
|
|
+ selectQuery.append(';');
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("%s", selectQuery.str());
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
+ return executeQuery(session, statement);
|
|
|
+ }
|
|
|
+
|
|
|
// Fetch matching rows from the search table, for a single wuid
|
|
|
|
|
|
const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
|