|
@@ -595,11 +595,6 @@ public:
|
|
|
return graph.getClear();
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerSideCache *queryServerSideCache() const
|
|
|
- {
|
|
|
- return NULL; // Activities that wish to support server-side caching will need to do better....
|
|
|
- }
|
|
|
-
|
|
|
virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
|
|
|
{
|
|
|
// Most activities have nothing to say...
|
|
@@ -1626,11 +1621,6 @@ public:
|
|
|
{
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerSideCache *queryServerSideCache() const
|
|
|
- {
|
|
|
- return factory->queryServerSideCache();
|
|
|
- }
|
|
|
-
|
|
|
virtual const IRoxieServerActivityFactory *queryFactory() const
|
|
|
{
|
|
|
return factory;
|
|
@@ -3100,189 +3090,6 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-class CRoxieServerSideCache : implements IRoxieServerSideCache, implements ILRUChain
|
|
|
-{
|
|
|
-protected:
|
|
|
- unsigned cacheTableSize;
|
|
|
- unsigned cacheTableSpace;
|
|
|
- IRoxieServerQueryPacket **cacheTable;
|
|
|
- mutable ILRUChain *prev;
|
|
|
- mutable ILRUChain *next;
|
|
|
- mutable CriticalSection crit;
|
|
|
-
|
|
|
- virtual ILRUChain *queryPrev() const { return prev; }
|
|
|
- virtual ILRUChain *queryNext() const { return next; }
|
|
|
- virtual void setPrev(ILRUChain *p) { prev = p; }
|
|
|
- virtual void setNext(ILRUChain *n) { next = n; }
|
|
|
- virtual void unchain()
|
|
|
- {
|
|
|
- prev->setNext(next);
|
|
|
- next->setPrev(prev);
|
|
|
- next = NULL;
|
|
|
- prev = NULL;
|
|
|
- }
|
|
|
-
|
|
|
- void moveToHead(IRoxieServerQueryPacket *mru)
|
|
|
- {
|
|
|
- mru->unchain();
|
|
|
-
|
|
|
- mru->setNext(next);
|
|
|
- next->setPrev(mru);
|
|
|
- mru->setPrev(this);
|
|
|
- next = mru;
|
|
|
- }
|
|
|
-
|
|
|
- IRoxieServerQueryPacket *removeLRU()
|
|
|
- {
|
|
|
- if (next==this)
|
|
|
- assertex(next != this);
|
|
|
- IRoxieServerQueryPacket *goer = (IRoxieServerQueryPacket *) next;
|
|
|
- goer->unchain(); // NOTE - this will modify the value of next
|
|
|
- return goer;
|
|
|
- }
|
|
|
-
|
|
|
- void removeEntry(IRoxieServerQueryPacket *goer)
|
|
|
- {
|
|
|
- unsigned v = goer->queryHash() % cacheTableSize;
|
|
|
- for (;;)
|
|
|
- {
|
|
|
- IRoxieServerQueryPacket *found = cacheTable[v];
|
|
|
- assertex(found);
|
|
|
- if (found == goer)
|
|
|
- {
|
|
|
- cacheTable[v] = NULL;
|
|
|
- unsigned vn = v;
|
|
|
- for (;;)
|
|
|
- {
|
|
|
- vn++;
|
|
|
- if (vn==cacheTableSize) vn = 0;
|
|
|
- IRoxieServerQueryPacket *found2 = cacheTable[vn];
|
|
|
- if (!found2)
|
|
|
- break;
|
|
|
- unsigned vm = found2->queryHash() % cacheTableSize;
|
|
|
- if (((vn+cacheTableSize-vm) % cacheTableSize)>=((vn+cacheTableSize-v) % cacheTableSize)) // diff(vn,vm)>=diff(vn,v)
|
|
|
- {
|
|
|
- cacheTable[v] = found2;
|
|
|
- v = vn;
|
|
|
- cacheTable[v] = NULL;
|
|
|
- }
|
|
|
- }
|
|
|
- cacheTableSpace++;
|
|
|
- break;
|
|
|
- }
|
|
|
- v++;
|
|
|
- if (v==cacheTableSize)
|
|
|
- v = 0;
|
|
|
- }
|
|
|
- goer->Release();
|
|
|
- }
|
|
|
-
|
|
|
-public:
|
|
|
- CRoxieServerSideCache(unsigned _cacheSize)
|
|
|
- {
|
|
|
- cacheTableSize = (_cacheSize*4)/3;
|
|
|
- cacheTable = new IRoxieServerQueryPacket *[cacheTableSize];
|
|
|
- memset(cacheTable, 0, cacheTableSize * sizeof(IRoxieServerQueryPacket *));
|
|
|
- cacheTableSpace = _cacheSize;
|
|
|
- prev = this;
|
|
|
- next = this;
|
|
|
- }
|
|
|
- ~CRoxieServerSideCache()
|
|
|
- {
|
|
|
- for (unsigned i = 0; i < cacheTableSize; i++)
|
|
|
- {
|
|
|
- ::Release(cacheTable[i]);
|
|
|
- }
|
|
|
- delete [] cacheTable;
|
|
|
- }
|
|
|
-
|
|
|
- virtual IRoxieServerQueryPacket *findCachedResult(const IRoxieContextLogger &logctx, IRoxieQueryPacket *p) const
|
|
|
- {
|
|
|
- unsigned hash = p->hash();
|
|
|
- unsigned et = hash % cacheTableSize;
|
|
|
- if (traceServerSideCache)
|
|
|
- {
|
|
|
- StringBuffer s;
|
|
|
- logctx.CTXLOG("CRoxieServerSideCache::findCachedResult hash %x slot %d %s", hash, et, p->queryHeader().toString(s).str());
|
|
|
- }
|
|
|
- CriticalBlock b(crit);
|
|
|
- for (;;)
|
|
|
- {
|
|
|
- IRoxieServerQueryPacket *found = cacheTable[et];
|
|
|
- if (!found)
|
|
|
- return NULL;
|
|
|
- if (found->queryHash() == hash && found->queryPacket()->cacheMatch(p))
|
|
|
- {
|
|
|
- const_cast<CRoxieServerSideCache *>(this)->moveToHead(found);
|
|
|
- if (traceServerSideCache)
|
|
|
- logctx.CTXLOG("CRoxieServerSideCache::findCachedResult cache hit");
|
|
|
- logctx.noteStatistic(StNumServerCacheHits, 1);
|
|
|
- return NULL;
|
|
|
- // Because IMessageResult cannot be replayed, this scheme is flawed. I'm leaving the code here just as a stats gatherer to see how useful it would have been....
|
|
|
- //IRoxieServerQueryPacket *ret = new CRoxieServerQueryPacket(p);
|
|
|
- //ret->setResult(found->getResult());
|
|
|
- //return ret;
|
|
|
- }
|
|
|
- et++;
|
|
|
- if (et == cacheTableSize)
|
|
|
- et = 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- virtual void noteCachedResult(IRoxieServerQueryPacket *out, IMessageResult *in)
|
|
|
- {
|
|
|
- if (true) //!in->getLength()) // MORE - separate caches for hits and nohits
|
|
|
- {
|
|
|
- unsigned hash = out->queryPacket()->hash();
|
|
|
- out->setHash(hash);
|
|
|
- unsigned et = hash % cacheTableSize;
|
|
|
- if (traceServerSideCache)
|
|
|
- {
|
|
|
- StringBuffer s;
|
|
|
- DBGLOG("CRoxieServerSideCache::noteCachedResult hash %x slot %d %s", hash, et, out->queryPacket()->queryHeader().toString(s).str());
|
|
|
- }
|
|
|
- CriticalBlock b(crit);
|
|
|
- for (;;)
|
|
|
- {
|
|
|
- IRoxieServerQueryPacket *found = cacheTable[et];
|
|
|
- if (!found)
|
|
|
- {
|
|
|
- if (cacheTableSpace)
|
|
|
- {
|
|
|
- out->setResult(LINK(in));
|
|
|
- cacheTable[et] = LINK(out);
|
|
|
- cacheTableSpace--;
|
|
|
- moveToHead(out);
|
|
|
- break;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- IRoxieServerQueryPacket *goer = removeLRU();
|
|
|
- removeEntry(goer);
|
|
|
- et = hash % cacheTableSize;
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- else if (found->queryHash()==hash && found->queryPacket()->cacheMatch(out->queryPacket()))
|
|
|
- {
|
|
|
- moveToHead(found);
|
|
|
- return; // already in the cache. Because we don't cache until we have result, this can happen where
|
|
|
- // multiple copies of a agent query are in-flight at once.
|
|
|
- }
|
|
|
- et++;
|
|
|
- if (et == cacheTableSize)
|
|
|
- et = 0;
|
|
|
- }
|
|
|
- }
|
|
|
- // MORE - do we need to worry about the attachment between the MessageUnpacker and the current row manager. May all fall out ok...
|
|
|
- // Can I easily spot a null result? Do I want to cache null results separately? only?
|
|
|
- }
|
|
|
-
|
|
|
- // Note that this caching mechanism (unlike the old keyed-join specific one) does not common up cases where multiple
|
|
|
- // identical queries are in-flight at the same time. But if we can make it persistent between queries that will
|
|
|
- // more than make up for it
|
|
|
-};
|
|
|
-
|
|
|
class CRowArrayMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface
|
|
|
{
|
|
|
ConstPointerArray &data;
|
|
@@ -3812,19 +3619,8 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
|
|
|
|
|
|
};
|
|
|
|
|
|
- IRoxieServerQueryPacket *createRoxieServerQueryPacket(IRoxieQueryPacket *p, bool &cached)
|
|
|
+ IRoxieServerQueryPacket *createRoxieServerQueryPacket(IRoxieQueryPacket *p)
|
|
|
{
|
|
|
- if (serverSideCache && !debugContext)
|
|
|
- {
|
|
|
- IRoxieServerQueryPacket *ret = serverSideCache->findCachedResult(activity.queryLogCtx(), p);
|
|
|
- if (ret)
|
|
|
- {
|
|
|
- p->Release();
|
|
|
- cached = true;
|
|
|
- return ret;
|
|
|
- }
|
|
|
- }
|
|
|
- cached = false;
|
|
|
return new CRoxieServerQueryPacket(p);
|
|
|
}
|
|
|
|
|
@@ -4082,7 +3878,6 @@ private:
|
|
|
IHThorArg *colocalArg;
|
|
|
IArrayOf<IRoxieServerQueryPacket> pending;
|
|
|
CriticalSection pendingCrit;
|
|
|
- IRoxieServerSideCache *serverSideCache;
|
|
|
unsigned sentSequence;
|
|
|
Owned<IOutputRowDeserializer> deserializer;
|
|
|
Owned<IEngineRowAllocator> rowAllocator;
|
|
@@ -4245,7 +4040,6 @@ public:
|
|
|
sentSequence = 0;
|
|
|
resendSequence = 0;
|
|
|
totalCycles = 0;
|
|
|
- serverSideCache = activity.queryServerSideCache();
|
|
|
bufferStream.setown(createMemoryBufferSerialStream(tempRowBuffer));
|
|
|
rowSource.setStream(bufferStream);
|
|
|
timeActivities = defaultTimeActivities;
|
|
@@ -4312,8 +4106,7 @@ public:
|
|
|
|
|
|
if (p->queryHeader().channel)
|
|
|
{
|
|
|
- bool cached = false;
|
|
|
- IRoxieServerQueryPacket *rsqp = createRoxieServerQueryPacket(p, cached);
|
|
|
+ IRoxieServerQueryPacket *rsqp = createRoxieServerQueryPacket(p);
|
|
|
if (deferredStart)
|
|
|
rsqp->setDelayed(true);
|
|
|
rsqp->setSequence(sentSequence++);
|
|
@@ -4323,8 +4116,7 @@ public:
|
|
|
}
|
|
|
if (!deferredStart)
|
|
|
{
|
|
|
- if (!cached)
|
|
|
- ROQ->sendPacket(p, activity.queryLogCtx());
|
|
|
+ ROQ->sendPacket(p, activity.queryLogCtx());
|
|
|
sentsome.signal();
|
|
|
}
|
|
|
}
|
|
@@ -4333,14 +4125,10 @@ public:
|
|
|
// Care is needed here. If I send the packet before I add to the pending there is a danger that I'll get results that I discard
|
|
|
// Need to add first, then send
|
|
|
unsigned i;
|
|
|
- bool allCached = true;
|
|
|
for (i = 1; i <= numChannels; i++)
|
|
|
{
|
|
|
IRoxieQueryPacket *q = p->clonePacket(i);
|
|
|
- bool thisChannelCached;
|
|
|
- IRoxieServerQueryPacket *rsqp = createRoxieServerQueryPacket(q, thisChannelCached);
|
|
|
- if (!thisChannelCached)
|
|
|
- allCached = false;
|
|
|
+ IRoxieServerQueryPacket *rsqp = createRoxieServerQueryPacket(q);
|
|
|
rsqp->setSequence(sentSequence++);
|
|
|
if (deferredStart)
|
|
|
{
|
|
@@ -4353,7 +4141,7 @@ public:
|
|
|
if (!deferredStart)
|
|
|
sentsome.signal();
|
|
|
}
|
|
|
- if (!allCached && !deferredStart)
|
|
|
+ if (!deferredStart)
|
|
|
ROQ->sendPacket(p, activity.queryLogCtx());
|
|
|
buffers[0]->signal(); // since replies won't come back on that channel...
|
|
|
p->Release();
|
|
@@ -5045,12 +4833,6 @@ public:
|
|
|
ROQ->sendPacket(resend, activity.queryLogCtx());
|
|
|
sentsome.signal();
|
|
|
}
|
|
|
- // Note that we don't attempt to cache results that have continuation records - too tricky !
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if (serverSideCache)
|
|
|
- serverSideCache->noteCachedResult(original, mr);
|
|
|
}
|
|
|
unsigned channel = header.channel;
|
|
|
{
|
|
@@ -23100,8 +22882,6 @@ public:
|
|
|
unsigned maxSeekLookahead;
|
|
|
Owned<const IResolvedFile> indexfile;
|
|
|
|
|
|
- CRoxieServerSideCache *cache;
|
|
|
-
|
|
|
CRoxieServerBaseIndexActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId)
|
|
|
: CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode), remoteId(_remoteId)
|
|
|
{
|
|
@@ -23134,16 +22914,9 @@ public:
|
|
|
if (thisBase->numParts()==1 && !thisBase->queryPart(0)->isTopLevelKey() && !_queryFactory.queryOptions().disableLocalOptimizations)
|
|
|
isSimple = true;
|
|
|
}
|
|
|
- int cacheSize = _graphNode.getPropInt("hint[@name='cachehits']/@value", serverSideCacheSize);
|
|
|
- cache = cacheSize ? new CRoxieServerSideCache(cacheSize) : NULL;
|
|
|
maxSeekLookahead = _graphNode.getPropInt("hint[@name='maxseeklookahead']/@value", 0);
|
|
|
}
|
|
|
|
|
|
- ~CRoxieServerBaseIndexActivityFactory()
|
|
|
- {
|
|
|
- delete cache;
|
|
|
- }
|
|
|
-
|
|
|
virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
|
|
|
{
|
|
|
if (indexfile)
|
|
@@ -23166,11 +22939,6 @@ public:
|
|
|
throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() should not be called for indexread activity");
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerSideCache *queryServerSideCache() const
|
|
|
- {
|
|
|
- return cache;
|
|
|
- }
|
|
|
-
|
|
|
virtual const StatisticsMapping &queryStatsMapping() const
|
|
|
{
|
|
|
return indexStatistics;
|