|
@@ -31,6 +31,7 @@
|
|
|
#include "nbcd.hpp"
|
|
|
#include "jsort.hpp"
|
|
|
#include "jptree.hpp"
|
|
|
+#include "jregexp.hpp"
|
|
|
|
|
|
#include "workunit.hpp"
|
|
|
#include "workunit.ipp"
|
|
@@ -2257,6 +2258,7 @@ extern void cassandraToGenericXML()
|
|
|
|
|
|
//--------------------------------------------
|
|
|
|
|
|
+#define CASS_WORKUNIT_POSTSORT_LIMIT 10000
|
|
|
#define CASS_SEARCH_PREFIX_SIZE 2
|
|
|
#define NUM_PARTITIONS 2
|
|
|
|
|
@@ -2520,10 +2522,10 @@ public:
|
|
|
}
|
|
|
virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
|
|
|
{
|
|
|
- return _fromXML(statement, idx, row, userVal, CASS_SEARCH_PREFIX_SIZE);
|
|
|
+ return _fromXML(statement, idx, row, userVal, CASS_SEARCH_PREFIX_SIZE, true);
|
|
|
}
|
|
|
protected:
|
|
|
- static bool _fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength)
|
|
|
+ static bool _fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
|
|
|
{
|
|
|
const char *columnVal = row->queryProp(xpath);
|
|
|
if (columnVal)
|
|
@@ -2531,7 +2533,8 @@ protected:
|
|
|
if (statement)
|
|
|
{
|
|
|
StringBuffer buf(columnVal);
|
|
|
- buf.toUpperCase();
|
|
|
+ if (uc)
|
|
|
+ buf.toUpperCase();
|
|
|
if (prefixLength && prefixLength < buf.length())
|
|
|
check(cass_statement_bind_string_n(statement, idx, buf, prefixLength));
|
|
|
else
|
|
@@ -2549,10 +2552,19 @@ static class SearchColumnMapper : public PrefixSearchColumnMapper
|
|
|
public:
|
|
|
virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
|
|
|
{
|
|
|
- return _fromXML(statement, idx, row, userVal, 0);
|
|
|
+ return _fromXML(statement, idx, row, userVal, 0, true);
|
|
|
}
|
|
|
} searchColumnMapper;
|
|
|
|
|
|
+static class LCSearchColumnMapper : public PrefixSearchColumnMapper
|
|
|
+{
|
|
|
+public:
|
|
|
+ virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
|
|
|
+ {
|
|
|
+ return _fromXML(statement, idx, row, userVal, 0, false);
|
|
|
+ }
|
|
|
+} lcSearchColumnMapper;
|
|
|
+
|
|
|
static class IntColumnMapper : implements CassandraColumnMapper
|
|
|
{
|
|
|
public:
|
|
@@ -3161,13 +3173,22 @@ static const CassandraXmlMapping searchMappings [] =
|
|
|
{"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
|
|
|
{"totalThorTime", "int", "@totalThorTime", intColumnMapper},
|
|
|
{"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
|
|
|
-{ NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)", stringColumnMapper}
|
|
|
+ { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
|
|
|
};
|
|
|
|
|
|
// The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search
|
|
|
|
|
|
const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", NULL};
|
|
|
|
|
|
+static const CassandraXmlMapping uniqueSearchMappings [] =
|
|
|
+{
|
|
|
+ {"xpath", "text", NULL, suppliedStringColumnMapper},
|
|
|
+ {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
|
|
|
+ {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
|
|
|
+ {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
|
|
|
+ { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
|
|
|
+};
|
|
|
+
|
|
|
/*
|
|
|
* Some thoughts on the secondary tables:
|
|
|
* 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
|
|
@@ -3660,6 +3681,80 @@ static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXml
|
|
|
return xml.getClear();
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
|
|
|
+ */
|
|
|
+const char *queryFilterField(WUSortField field)
|
|
|
+{
|
|
|
+ switch (field)
|
|
|
+ {
|
|
|
+ case WUSFuser: return "submitID";
|
|
|
+ case WUSFcluster: return "clustername";
|
|
|
+ case WUSFjob: return "jobname";
|
|
|
+ case WUSFstate: return "state";
|
|
|
+ case WUSFpriority: return "priorityClass";
|
|
|
+ case WUSFwuid: return "wuid";
|
|
|
+ case WUSFwuidhigh: return "wuid";
|
|
|
+ case WUSFfileread: UNIMPLEMENTED;
|
|
|
+ case WUSFprotected: return "protected";
|
|
|
+ case WUSFtotalthortime: return "totalThorTime";
|
|
|
+ case WUSFwildwuid: return "wuid";
|
|
|
+ case WUSFecl: UNIMPLEMENTED;
|
|
|
+ default:
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+class PostFilter : public CInterface
|
|
|
+{
|
|
|
+public:
|
|
|
+ PostFilter(WUSortField _field, const char *_value, bool _wild)
|
|
|
+ : field(_field), cqlField(queryFilterField(_field)), xpath(queryFilterXPath(_field)), wild(_wild)
|
|
|
+ {
|
|
|
+ setValue(_value);
|
|
|
+ }
|
|
|
+ bool matches(IPTree &p) const
|
|
|
+ {
|
|
|
+ const char *val = p.queryProp(xpath);
|
|
|
+ if (val)
|
|
|
+ return wild ? WildMatch(val, pattern) : strieq(val, pattern);
|
|
|
+ else
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ const char *queryValue() const
|
|
|
+ {
|
|
|
+ return value.str();
|
|
|
+ }
|
|
|
+ void setValue(const char *_value)
|
|
|
+ {
|
|
|
+ if (wild)
|
|
|
+ {
|
|
|
+ VStringBuffer filter("*%s*", _value);
|
|
|
+ pattern.set(filter);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ pattern.set(_value);
|
|
|
+ value.set(_value);
|
|
|
+ }
|
|
|
+ const char *queryXPath() const
|
|
|
+ {
|
|
|
+ return xpath;
|
|
|
+ }
|
|
|
+ WUSortField queryField() const
|
|
|
+ {
|
|
|
+ return field;
|
|
|
+ }
|
|
|
+private:
|
|
|
+ const char *cqlField;
|
|
|
+ const char *xpath;
|
|
|
+ StringAttr pattern;
|
|
|
+ StringAttr value;
|
|
|
+ WUSortField field;
|
|
|
+ bool wild;
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
class CassSortableIterator : public CassandraIterator
|
|
|
{
|
|
|
public:
|
|
@@ -3709,7 +3804,20 @@ public:
|
|
|
{
|
|
|
results.append(result);
|
|
|
}
|
|
|
-
|
|
|
+ void addPostFilters(CIArrayOf<PostFilter> &filters, unsigned start)
|
|
|
+ {
|
|
|
+ unsigned len = filters.length();
|
|
|
+ while (start<len)
|
|
|
+ postFilters.append(OLINK(filters.item(start++)));
|
|
|
+ }
|
|
|
+ void addPostFilter(PostFilter &filter)
|
|
|
+ {
|
|
|
+ postFilters.append(filter);
|
|
|
+ }
|
|
|
+ bool hasPostFilters() const
|
|
|
+ {
|
|
|
+ return postFilters.length() != 0;
|
|
|
+ }
|
|
|
virtual bool first()
|
|
|
{
|
|
|
inputs.kill();
|
|
@@ -3723,12 +3831,27 @@ public:
|
|
|
virtual bool next()
|
|
|
{
|
|
|
current.clear();
|
|
|
- const CassandraIterator *nextSource = get_row();
|
|
|
- if (!nextSource)
|
|
|
- return false;
|
|
|
- Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource));
|
|
|
- current.setown(createConstWorkUnitInfo(*wuXML));
|
|
|
- return true;
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ const CassandraIterator *nextSource = get_row();
|
|
|
+ if (!nextSource)
|
|
|
+ return false;
|
|
|
+ Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource));
|
|
|
+ bool postFiltered = false;
|
|
|
+ ForEachItemIn(pfIdx, postFilters)
|
|
|
+ {
|
|
|
+ if (!postFilters.item(pfIdx).matches(*wuXML))
|
|
|
+ {
|
|
|
+ postFiltered = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!postFiltered)
|
|
|
+ {
|
|
|
+ current.setown(createConstWorkUnitInfo(*wuXML));
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
virtual bool isValid()
|
|
|
{
|
|
@@ -3767,6 +3890,210 @@ private:
|
|
|
Owned<IRowStream> merger;
|
|
|
IArrayOf<CassandraResult> results;
|
|
|
IArrayOf<CassSortableIterator> inputs;
|
|
|
+ CIArrayOf<PostFilter> postFilters;
|
|
|
+ Owned<IConstWorkUnitInfo> current;
|
|
|
+ unsigned compareColumn;
|
|
|
+ bool descending;
|
|
|
+};
|
|
|
+
|
|
|
+class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIterator>, implements ICompare
|
|
|
+{
|
|
|
+public:
|
|
|
+ CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
|
|
|
+ : input(_input), sortorder(_sortorder), limit(_limit)
|
|
|
+ {
|
|
|
+ idx = 0;
|
|
|
+ }
|
|
|
+ virtual bool first()
|
|
|
+ {
|
|
|
+ if (input)
|
|
|
+ {
|
|
|
+ readFirst();
|
|
|
+ input.clear();
|
|
|
+ }
|
|
|
+ idx = 0;
|
|
|
+ return sorted.isItem(idx);
|
|
|
+ }
|
|
|
+ virtual bool next()
|
|
|
+ {
|
|
|
+ idx++;
|
|
|
+ return sorted.isItem(idx);
|
|
|
+ }
|
|
|
+ virtual bool isValid()
|
|
|
+ {
|
|
|
+ return sorted.isItem(idx);
|
|
|
+ }
|
|
|
+ virtual IConstWorkUnitInfo & query()
|
|
|
+ {
|
|
|
+ return sorted.item(idx);
|
|
|
+ }
|
|
|
+private:
|
|
|
+ void readFirst()
|
|
|
+ {
|
|
|
+ ForEach(*input)
|
|
|
+ {
|
|
|
+ sorted.append(OLINK(input->query()));
|
|
|
+ if (sorted.length()>=limit)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual int docompare(const void *a, const void *b) const
|
|
|
+ {
|
|
|
+ // a and b point to to IConstWorkUnitInfo objects
|
|
|
+ const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
|
|
|
+ const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
|
|
|
+ int diff;
|
|
|
+ switch (sortorder & 0xff)
|
|
|
+ {
|
|
|
+ case WUSFuser:
|
|
|
+ diff = stricmp(aa->queryUser(), bb->queryUser());
|
|
|
+ break;
|
|
|
+ case WUSFcluster:
|
|
|
+ diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
|
|
|
+ break;
|
|
|
+ case WUSFjob:
|
|
|
+ diff = stricmp(aa->queryJobName(), bb->queryJobName());
|
|
|
+ break;
|
|
|
+ case WUSFstate:
|
|
|
+ diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
|
|
|
+ break;
|
|
|
+ case WUSFprotected:
|
|
|
+ diff = (int) bb->isProtected() - (int) aa->isProtected();
|
|
|
+ break;
|
|
|
+ case WUSFtotalthortime:
|
|
|
+ diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
|
|
|
+ break;
|
|
|
+ case WUSFwuid:
|
|
|
+ diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
+ if (sortorder & WUSFreverse)
|
|
|
+ return -diff;
|
|
|
+ else
|
|
|
+ return diff;
|
|
|
+ }
|
|
|
+
|
|
|
+ Owned<IConstWorkUnitIterator> input;
|
|
|
+ IArrayOf<IConstWorkUnitInfo> sorted;
|
|
|
+ unsigned sortorder;
|
|
|
+ unsigned idx;
|
|
|
+ unsigned limit;
|
|
|
+};
|
|
|
+
|
|
|
+class CassJoinIterator : public CInterface, implements IConstWorkUnitIterator
|
|
|
+{
|
|
|
+public:
|
|
|
+ IMPLEMENT_IINTERFACE;
|
|
|
+ CassJoinIterator(unsigned _compareColumn, bool _descending)
|
|
|
+ {
|
|
|
+ compareColumn = _compareColumn;
|
|
|
+ descending = _descending;
|
|
|
+ }
|
|
|
+
|
|
|
+ void addResult(CassandraResult &result)
|
|
|
+ {
|
|
|
+ results.append(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ void addPostFilter(PostFilter &post)
|
|
|
+ {
|
|
|
+ postFilters.append(post);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool first()
|
|
|
+ {
|
|
|
+ if (!results.length())
|
|
|
+ return false;
|
|
|
+ inputs.kill();
|
|
|
+ ForEachItemIn(idx, results)
|
|
|
+ {
|
|
|
+ Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending);
|
|
|
+ if (!input->nextRow())
|
|
|
+ return false;
|
|
|
+ inputs.append(*input.getClear());
|
|
|
+
|
|
|
+ }
|
|
|
+ return next();
|
|
|
+ }
|
|
|
+ virtual bool next()
|
|
|
+ {
|
|
|
+ current.clear();
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ unsigned idx = 0;
|
|
|
+ unsigned target = 0;
|
|
|
+ unsigned matches = 1; // I always match myself!
|
|
|
+ unsigned sources = inputs.length();
|
|
|
+ if (!sources)
|
|
|
+ return false;
|
|
|
+ while (matches < sources)
|
|
|
+ {
|
|
|
+ idx++;
|
|
|
+ if (idx==sources)
|
|
|
+ idx = 0;
|
|
|
+ int diff;
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ assert(idx != target);
|
|
|
+ diff = inputs.item(idx).compare(&inputs.item(target));
|
|
|
+ if (diff >= 0)
|
|
|
+ break;
|
|
|
+ if (!inputs.item(idx).nextRow())
|
|
|
+ {
|
|
|
+ inputs.kill(); // Once any reaches EOF, we are done
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (diff > 0)
|
|
|
+ {
|
|
|
+ target = idx;
|
|
|
+ matches = 1;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ matches++;
|
|
|
+ }
|
|
|
+ Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
|
|
|
+ bool postFiltered = false;
|
|
|
+ ForEachItemIn(pfIdx, postFilters)
|
|
|
+ {
|
|
|
+ if (!postFilters.item(pfIdx).matches(*wuXML))
|
|
|
+ {
|
|
|
+ postFiltered = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!postFiltered)
|
|
|
+ {
|
|
|
+ current.setown(createConstWorkUnitInfo(*wuXML));
|
|
|
+ ForEachItemIn(idx2, inputs)
|
|
|
+ {
|
|
|
+ if (!inputs.item(idx2).nextRow())
|
|
|
+ {
|
|
|
+ inputs.clear(); // Make sure next() fails next time it is called
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ virtual bool isValid()
|
|
|
+ {
|
|
|
+ return current != NULL;
|
|
|
+ }
|
|
|
+ virtual IConstWorkUnitInfo & query()
|
|
|
+ {
|
|
|
+ assertex(current);
|
|
|
+ return *current.get();
|
|
|
+ }
|
|
|
+private:
|
|
|
+ IArrayOf<CassandraResult> results;
|
|
|
+ IArrayOf<CassSortableIterator> inputs;
|
|
|
+ CIArrayOf<PostFilter> postFilters;
|
|
|
Owned<IConstWorkUnitInfo> current;
|
|
|
unsigned compareColumn;
|
|
|
bool descending;
|
|
@@ -4052,7 +4379,10 @@ protected:
|
|
|
{
|
|
|
deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, *batch);
|
|
|
if (p->hasProp(xpath))
|
|
|
+ {
|
|
|
simpleXMLtoCassandra(sessionCache, *batch, searchMappings, p, xpath);
|
|
|
+ simpleXMLtoCassandra(sessionCache, *batch, uniqueSearchMappings, p, xpath);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void deleteSecondaries(const char *wuid)
|
|
@@ -4112,6 +4442,16 @@ protected:
|
|
|
MapStringTo<const CassandraXmlMapping *> dirtyPaths;
|
|
|
};
|
|
|
|
|
|
+/*
|
|
|
+Streams from N sources...
|
|
|
+Loop through round and round seeing if all the same
|
|
|
+if we get N matches in a row, we have success
|
|
|
+if we are lower than the current seek value, keep on me until >=
|
|
|
+if we are higher, reset match count and use me as new target
|
|
|
+if equal, increase match count and go on to next
|
|
|
+if any reach eof, we are done.
|
|
|
+*/
|
|
|
+
|
|
|
class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
|
|
|
{
|
|
|
IMPLEMENT_IINTERFACE;
|
|
@@ -4225,14 +4565,149 @@ public:
|
|
|
}
|
|
|
virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
- return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser);
|
|
|
+ return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
|
|
|
}
|
|
|
virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
|
|
|
- unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
|
|
|
+ unsigned startoffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
|
|
|
ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
- // Note that we only support a single sort order - which makes life easier!
|
|
|
- UNIMPLEMENTED;
|
|
|
+ // Note that we only support a single sort order.
|
|
|
+ // Any sort order other than WUID ASC or WUID DESC will require post-sorting - if there are more than WUID_LOCALSORT_LIMIT we will refuse
|
|
|
+ // We need a single 'hard' filter, and do others by post-filtering. If the best we can do for a 'hard' filter is itself wild, we
|
|
|
+ // have to do two hits - one to find all the matching values, and the second to do a merge of all the results for each value.
|
|
|
+ // We should encourage the UI to present drop-lists of users for filtering.
|
|
|
+ // Any post-filter that Cassandra CAN do (via ALLOW FILTERING) it should? This seems to be an empty set unless we add secondary indexes AND give up on sorted results...
|
|
|
+ // Any that it can't (e.g. wild) we post-filter client-side.
|
|
|
+ // Wild can be translated into ranges but we then end up losing the sorting (well, we end up sorted by the filter field first, then wuid. This may actually be desirable in some situations.
|
|
|
+ // Alternatively we can transform into a set and merge multiple queries.
|
|
|
+ // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids - this can be join-merged if other filters are present.
|
|
|
+ // Searching by application values are done as if each was a new (user-defined) attribute.
|
|
|
+
|
|
|
+ // At present we assume that the order in which filters are provided indicates the best order to apply them - this may not be smart
|
|
|
+ Owned<IConstWorkUnitIterator> result;
|
|
|
+ const WUSortField *thisFilter = filters;
|
|
|
+ CIArrayOf<PostFilter> goodFilters;
|
|
|
+ CIArrayOf<PostFilter> wuidFilters;
|
|
|
+ CIArrayOf<PostFilter> poorFilters;
|
|
|
+ CIArrayOf<PostFilter> remoteWildFilters;
|
|
|
+ Owned<CassMultiIterator> merger = new CassMultiIterator(0, sortorder!=WUSFwuid); // Merge by wuid. We always fetch ordered by wuid desc, except if the order requested is wuid asc.
|
|
|
+ pageSize += startoffset; // Nasty!
|
|
|
+ const char *fv = (const char *) filterbuf;
|
|
|
+ while (*thisFilter)
|
|
|
+ {
|
|
|
+ WUSortField field = (WUSortField) (*thisFilter & 0xff);
|
|
|
+ bool isWild = (*thisFilter & WUSFwild) != 0;
|
|
|
+ switch (field)
|
|
|
+ {
|
|
|
+ case WUSFuser:
|
|
|
+ case WUSFcluster:
|
|
|
+ case WUSFjob:
|
|
|
+ if (isWild)
|
|
|
+ remoteWildFilters.append(*new PostFilter(field, fv, true)); // Trailing-only wildcards can be done remotely
|
|
|
+ else
|
|
|
+ goodFilters.append(*new PostFilter(field, fv, false));
|
|
|
+ break;
|
|
|
+ case WUSFstate:
|
|
|
+ case WUSFpriority:
|
|
|
+ case WUSFprotected:
|
|
|
+ // These can't be wild, but are not very good filters
|
|
|
+ poorFilters.append(*new PostFilter(field, fv, false));
|
|
|
+ break;
|
|
|
+ case WUSFwuid: // Acts as wuidLo when specified as a filter
|
|
|
+ case WUSFwuidhigh:
|
|
|
+ // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
|
|
|
+ mergeFilter(wuidFilters, field, fv);
|
|
|
+ break;
|
|
|
+ case WUSFfileread:
|
|
|
+ UNIMPLEMENTED;
|
|
|
+ case WUSFtotalthortime:
|
|
|
+ UNIMPLEMENTED; // Numeric sort makes it hard to do the filter remotely. And ESP does not presently use. Though it would be pretty handy...
|
|
|
+ break;
|
|
|
+ case WUSFwildwuid:
|
|
|
+ // Translate into a range - note that we only support trailing * wildcard.
|
|
|
+ if (fv && *fv)
|
|
|
+ {
|
|
|
+ StringBuffer s(fv);
|
|
|
+ if (s.charAt(s.length()-1)== '*')
|
|
|
+ s.remove(s.length()-1, 1);
|
|
|
+ if (s.length())
|
|
|
+ {
|
|
|
+ mergeFilter(wuidFilters, WUSFwuid, s);
|
|
|
+ s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
|
|
|
+ mergeFilter(wuidFilters, WUSFwuidhigh, s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case WUSFcustom:
|
|
|
+ UNIMPLEMENTED;
|
|
|
+ case WUSFecl: // This is different...
|
|
|
+ if (isWild)
|
|
|
+ merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
|
|
|
+ else
|
|
|
+ goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
|
|
|
+ }
|
|
|
+ thisFilter++;
|
|
|
+ fv = fv + strlen(fv)+1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (goodFilters.length())
|
|
|
+ {
|
|
|
+ merger->addPostFilters(goodFilters, 1);
|
|
|
+ merger->addPostFilters(poorFilters, 0);
|
|
|
+ merger->addPostFilters(remoteWildFilters, 0);
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(goodFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
|
|
|
+ }
|
|
|
+ else if (poorFilters.length())
|
|
|
+ {
|
|
|
+ merger->addPostFilters(poorFilters, 1);
|
|
|
+ merger->addPostFilters(remoteWildFilters, 0);
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(poorFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
|
|
|
+ }
|
|
|
+ else if (remoteWildFilters.length())
|
|
|
+ {
|
|
|
+ merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
|
|
|
+ // Convert into a value IN [] which we do via a merge
|
|
|
+ // MORE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
|
|
|
+ StringArray fieldValues;
|
|
|
+ PostFilter &best= remoteWildFilters.item(0);
|
|
|
+ _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
|
|
|
+ ForEachItemIn(idx, fieldValues)
|
|
|
+ {
|
|
|
+ PostFilter p(best.queryField(), best.queryValue(), false);
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(p, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
|
|
|
+ for (int i = 0; i < NUM_PARTITIONS; i++)
|
|
|
+ {
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ result.setown(merger.getClear());
|
|
|
+
|
|
|
+ // The result we have will be sorted by wuid (ascending or descending)
|
|
|
+ if ((sortorder & 0xff) != WUSFwuid)
|
|
|
+ {
|
|
|
+ // A post-sort will be required.
|
|
|
+ // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
|
|
|
+ result.setown(new CassPostSortIterator(result.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
|
|
|
+ }
|
|
|
+ if (startoffset && result->first())
|
|
|
+ {
|
|
|
+ while (startoffset && result->next())
|
|
|
+ startoffset--;
|
|
|
+ }
|
|
|
+ if (cachehint)
|
|
|
+ *cachehint = 0;
|
|
|
+ if (total)
|
|
|
+ *total = 0; // we don't know... (unless we postsort)
|
|
|
+ return result.getClear();
|
|
|
+ }
|
|
|
+ virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
|
|
|
+ {
|
|
|
+ return _getUniqueValues(queryFilterXPath(field), prefix, result);
|
|
|
}
|
|
|
virtual unsigned numWorkUnits()
|
|
|
{
|
|
@@ -4309,6 +4784,12 @@ public:
|
|
|
// 1. Check that every entry in main wu table has matching entries in secondary tables
|
|
|
CassandraResult result(fetchData(workunitInfoMappings+1));
|
|
|
CassandraIterator rows(cass_iterator_from_result(result));
|
|
|
+ if (batch)
|
|
|
+ {
|
|
|
+ // Delete the unique values table - the validate process recreates it afresh
|
|
|
+ CassandraStatement truncate(cass_statement_new("TRUNCATE uniqueSearchValues", 0));
|
|
|
+ check(cass_batch_add_statement(batch, truncate));
|
|
|
+ }
|
|
|
while (cass_iterator_next(rows))
|
|
|
{
|
|
|
Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
|
|
@@ -4354,6 +4835,7 @@ public:
|
|
|
connect();
|
|
|
ensureTable(session, workunitsMappings);
|
|
|
ensureTable(session, searchMappings);
|
|
|
+ ensureTable(session, uniqueSearchMappings);
|
|
|
for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
|
|
|
ensureTable(session, table[0]->mappings);
|
|
|
}
|
|
@@ -4408,37 +4890,55 @@ private:
|
|
|
CassandraResult result(cass_future_get_result(future));
|
|
|
return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
|
|
|
}
|
|
|
-
|
|
|
- IConstWorkUnitIterator * getSortedWorkUnits(ISecManager *secmgr, ISecUser *secuser)
|
|
|
+ void mergeFilter(CIArrayOf<PostFilter> &filters, WUSortField field, const char *value)
|
|
|
{
|
|
|
- // Hack in some test code to test stream merging
|
|
|
- Owned<IPTree> parent = createPTree("WorkUnits");
|
|
|
- Owned<CassMultiIterator> merger = new CassMultiIterator(0, true); // Merge by wuid (note that we didn't fetch partition...)
|
|
|
- for (int i = 0; i < NUM_PARTITIONS; i++)
|
|
|
+ // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
|
|
|
+ ForEachItemIn(idx, filters)
|
|
|
{
|
|
|
- merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i)));
|
|
|
+ PostFilter &filter = filters.item(idx);
|
|
|
+ if (filter.queryField()==field)
|
|
|
+ {
|
|
|
+ const char *prevLimit = filter.queryValue();
|
|
|
+ int diff = strcmp(prevLimit, value);
|
|
|
+ if (diff && ((diff < 0) == (field==WUSFwuid)))
|
|
|
+ filter.setValue(value);
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
- return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
|
|
|
+ // Not found - add new filter
|
|
|
+ filters.append(*new PostFilter(field, value, true));
|
|
|
}
|
|
|
-
|
|
|
IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
|
|
|
{
|
|
|
+ Owned<CassMultiIterator> merger = new CassMultiIterator(0, true); // Merge by wuid
|
|
|
if (!key || !*key)
|
|
|
- return getSortedWorkUnits(secmgr, secuser);
|
|
|
- const CassandraXmlMapping *mappings = searchMappings+3; // Don't return the xpath, searchPrefix or searchValue fields
|
|
|
- Owned<CassandraResult> result = new CassandraResult(fetchDataForKey(xpath, key));
|
|
|
- Owned<IPTree> parent = createPTree("WorkUnits");
|
|
|
- CassandraIterator rows(cass_iterator_from_result(*result));
|
|
|
- while (cass_iterator_next(rows))
|
|
|
{
|
|
|
- Owned<IPTree> wuXML = rowToPTree(xpath, key, mappings, cass_iterator_get_row(rows));
|
|
|
- const char *wuid = wuXML->queryName();
|
|
|
- parent->addPropTree(wuid, wuXML.getClear());
|
|
|
+ CIArrayOf<PostFilter> wuidFilters;
|
|
|
+ for (int i = 0; i < NUM_PARTITIONS; i++)
|
|
|
+ {
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
|
|
|
+ }
|
|
|
}
|
|
|
- Owned<IPropertyTreeIterator> iter = parent->getElements("*");
|
|
|
- return createSecureConstWUIterator(iter.getClear(), secmgr, secuser);
|
|
|
+ else
|
|
|
+ merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
|
|
|
+ return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
|
|
|
+ }
|
|
|
+ StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
|
|
|
+ {
|
|
|
+ if (prefix && strlen(prefix) >= CASS_SEARCH_PREFIX_SIZE)
|
|
|
+ {
|
|
|
+ CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
|
|
|
+ CassandraIterator rows(cass_iterator_from_result(r));
|
|
|
+ StringBuffer value;
|
|
|
+ while (cass_iterator_next(rows))
|
|
|
+ {
|
|
|
+ const CassRow *row = cass_iterator_get_row(rows);
|
|
|
+ getCassString(value.clear(), cass_row_get_column(row, 0));
|
|
|
+ result.append(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result;
|
|
|
}
|
|
|
-
|
|
|
unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, CassBatch *batch)
|
|
|
{
|
|
|
unsigned errCount = 0;
|
|
@@ -4446,6 +4946,8 @@ private:
|
|
|
if (childKey && *childKey)
|
|
|
{
|
|
|
CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
|
|
|
+ if (batch)
|
|
|
+ simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
|
|
|
switch (cass_result_row_count(result))
|
|
|
{
|
|
|
case 0:
|
|
@@ -4552,12 +5054,35 @@ private:
|
|
|
|
|
|
// Fetch all rows from a single partition of a table
|
|
|
|
|
|
- const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition) const
|
|
|
+ const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const CIArrayOf<PostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
|
|
|
{
|
|
|
StringBuffer names;
|
|
|
StringBuffer tableName;
|
|
|
getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
|
|
|
- VStringBuffer selectQuery("select %s from %s where partition=%d;", names.str()+1, tableName.str(), partition);
|
|
|
+ VStringBuffer selectQuery("select %s from %s where partition=%d", names.str()+1, tableName.str(), partition);
|
|
|
+ ForEachItemIn(idx, wuidFilters)
|
|
|
+ {
|
|
|
+ const PostFilter &wuidFilter = wuidFilters.item(idx);
|
|
|
+ selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
|
|
|
+ }
|
|
|
+ switch (sortOrder)
|
|
|
+ {
|
|
|
+ case WUSFwuid:
|
|
|
+ selectQuery.append(" ORDER BY WUID ASC");
|
|
|
+ break;
|
|
|
+ case WUSFwuid|WUSFreverse:
|
|
|
+ // If not wuid, descending, we will have to post-sort
|
|
|
+ selectQuery.append(" ORDER BY WUID DESC");
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
|
|
|
+ selectQuery.append(" ORDER BY WUID DESC");
|
|
|
+ if (!limit)
|
|
|
+ limit = CASS_WORKUNIT_POSTSORT_LIMIT;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (limit)
|
|
|
+ selectQuery.appendf(" LIMIT %u", limit);
|
|
|
selectQuery.append(';');
|
|
|
if (traceLevel >= 2)
|
|
|
DBGLOG("%s", selectQuery.str());
|
|
@@ -4580,7 +5105,7 @@ private:
|
|
|
return executeQuery(session, statement);
|
|
|
}
|
|
|
|
|
|
- // Fetch matching rows from the search table, for all wuids
|
|
|
+ // Fetch matching rows from the search table, for all wuids, sorted by wuid
|
|
|
|
|
|
const CassResult *fetchDataForKey(const char *xpath, const char *key) const
|
|
|
{
|
|
@@ -4590,7 +5115,70 @@ private:
|
|
|
StringBuffer ucKey(key);
|
|
|
ucKey.toUpperCase();
|
|
|
getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
|
|
|
- VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str()); // MORE - should consider using prepared/bind for this - is it faster?
|
|
|
+ VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str()); // MORE - should consider using prepared/bind for this - is it faster?
|
|
|
+ selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("%s", selectQuery.str());
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
+ return executeQuery(session, statement);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fetch matching rows from the search table, for all wuids, sorted by wuid
|
|
|
+
|
|
|
+ const CassResult *fetchDataForKeyWithFilter(const PostFilter &filter, const CIArrayOf<PostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
|
|
|
+ {
|
|
|
+ const char *xpath = filter.queryXPath();
|
|
|
+ const char *key = filter.queryValue();
|
|
|
+ assertex(key);
|
|
|
+ StringBuffer names;
|
|
|
+ StringBuffer tableName;
|
|
|
+ StringBuffer ucKey(key);
|
|
|
+ ucKey.toUpperCase();
|
|
|
+ getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
|
|
|
+ VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str());
|
|
|
+ ForEachItemIn(idx, wuidFilters)
|
|
|
+ {
|
|
|
+ const PostFilter &wuidFilter = wuidFilters.item(idx);
|
|
|
+ selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
|
|
|
+ }
|
|
|
+ switch (sortOrder)
|
|
|
+ {
|
|
|
+ case WUSFwuid:
|
|
|
+ selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
|
|
|
+ break;
|
|
|
+ case WUSFwuid|WUSFreverse:
|
|
|
+ selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
|
|
|
+ selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
|
|
|
+ if (!limit)
|
|
|
+ limit = CASS_WORKUNIT_POSTSORT_LIMIT;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (limit)
|
|
|
+ selectQuery.appendf(" LIMIT %u", limit);
|
|
|
+ if (traceLevel >= 2)
|
|
|
+ DBGLOG("%s", selectQuery.str());
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
+ return executeQuery(session, statement);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fetch matching rows from the search or uniqueSearch table, for a given prefix
|
|
|
+
|
|
|
+ const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
|
|
|
+ {
|
|
|
+ assertex(prefix && *prefix);
|
|
|
+ StringBuffer names;
|
|
|
+ StringBuffer tableName;
|
|
|
+ StringBuffer ucKey(prefix);
|
|
|
+ ucKey.toUpperCase();
|
|
|
+ StringBuffer ucKeyEnd(ucKey);
|
|
|
+ size32_t len = ucKeyEnd.length();
|
|
|
+ assertex(len);
|
|
|
+ ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
|
|
|
+ getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
|
|
|
+ VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue >='%s' and fieldValue < '%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str(), ucKeyEnd.str()); // MORE - should consider using prepared/bind for this - is it faster?
|
|
|
if (traceLevel >= 2)
|
|
|
DBGLOG("%s", selectQuery.str());
|
|
|
CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|