|
@@ -77,6 +77,7 @@ protected:
|
|
|
bool rowLimitSkips = false;
|
|
|
rowcount_t keyedProcessed = 0;
|
|
|
rowcount_t rowLimit = RCMAX;
|
|
|
+ bool useRemoteStreaming = false;
|
|
|
|
|
|
|
|
|
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
|
|
@@ -163,117 +164,118 @@ public:
|
|
|
while (p<partDescs.ordinality()) // will process all parts if localMerge
|
|
|
{
|
|
|
IPartDescriptor &part = partDescs.item(p++);
|
|
|
-
|
|
|
- Owned<ITranslator> translator = getTranslators(part);
|
|
|
- IOutputMetaData *actualFormat = translator ? &translator->queryActualFormat() : expectedFormat;
|
|
|
- bool tryRemoteStream = actualFormat->queryTypeInfo()->canInterpret() && actualFormat->queryTypeInfo()->canSerialize() &&
|
|
|
- projectedFormat->queryTypeInfo()->canInterpret() && projectedFormat->queryTypeInfo()->canSerialize();
|
|
|
- bool usesBlobs = 0 != (helper->getFlags() & TIRusesblob);
|
|
|
-
|
|
|
unsigned crc=0;
|
|
|
part.getCrc(crc);
|
|
|
|
|
|
- /* If part can potentially be remotely streamed, 1st check if any part is local,
|
|
|
- * then try to remote stream, and otherwise failover to legacy remote access
|
|
|
- */
|
|
|
- if (tryRemoteStream && !usesBlobs && !localMerge)
|
|
|
+ if (useRemoteStreaming)
|
|
|
{
|
|
|
- std::vector<unsigned> remoteCandidates;
|
|
|
- for (unsigned copy=0; copy<part.numCopies(); copy++)
|
|
|
+ Owned<ITranslator> translator = getTranslators(part);
|
|
|
+ IOutputMetaData *actualFormat = translator ? &translator->queryActualFormat() : expectedFormat;
|
|
|
+ bool tryRemoteStream = actualFormat->queryTypeInfo()->canInterpret() && actualFormat->queryTypeInfo()->canSerialize() &&
|
|
|
+ projectedFormat->queryTypeInfo()->canInterpret() && projectedFormat->queryTypeInfo()->canSerialize();
|
|
|
+
|
|
|
+ /* If part can potentially be remotely streamed, 1st check if any part is local,
|
|
|
+ * then try to remote stream, and otherwise failover to legacy remote access
|
|
|
+ */
|
|
|
+ if (tryRemoteStream)
|
|
|
{
|
|
|
- RemoteFilename rfn;
|
|
|
- part.getFilename(copy, rfn);
|
|
|
- if (!isRemoteReadCandidate(*this, rfn))
|
|
|
+ std::vector<unsigned> remoteCandidates;
|
|
|
+ for (unsigned copy=0; copy<part.numCopies(); copy++)
|
|
|
{
|
|
|
- StringBuffer path;
|
|
|
- rfn.getPath(path);
|
|
|
- Owned<IFile> iFile = createIFile(path);
|
|
|
- try
|
|
|
+ RemoteFilename rfn;
|
|
|
+ part.getFilename(copy, rfn);
|
|
|
+ if (!isRemoteReadCandidate(*this, rfn))
|
|
|
{
|
|
|
- if (iFile->exists())
|
|
|
+ StringBuffer path;
|
|
|
+ rfn.getPath(path);
|
|
|
+ Owned<IFile> iFile = createIFile(path);
|
|
|
+ try
|
|
|
{
|
|
|
- remoteCandidates.clear();
|
|
|
- break;
|
|
|
+ if (iFile->exists())
|
|
|
+ {
|
|
|
+ remoteCandidates.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ ActPrintLog(e, "getNextInput()");
|
|
|
+ e->Release();
|
|
|
}
|
|
|
}
|
|
|
- catch (IException *e)
|
|
|
- {
|
|
|
- ActPrintLog(e, "getNextInput()");
|
|
|
- e->Release();
|
|
|
- }
|
|
|
+ else
|
|
|
+ remoteCandidates.push_back(copy);
|
|
|
}
|
|
|
- else
|
|
|
- remoteCandidates.push_back(copy);
|
|
|
- }
|
|
|
- Owned<IException> remoteReadException;
|
|
|
- StringBuffer remoteReadExceptionPath;
|
|
|
- for (unsigned © : remoteCandidates) // only if no local part found above
|
|
|
- {
|
|
|
- RemoteFilename rfn;
|
|
|
- part.getFilename(copy, rfn);
|
|
|
- StringBuffer path;
|
|
|
- rfn.getPath(path);
|
|
|
+ Owned<IException> remoteReadException;
|
|
|
+ StringBuffer remoteReadExceptionPath;
|
|
|
+ for (unsigned © : remoteCandidates) // only if no local part found above
|
|
|
+ {
|
|
|
+ RemoteFilename rfn;
|
|
|
+ part.getFilename(copy, rfn);
|
|
|
+ StringBuffer path;
|
|
|
+ rfn.getPath(path);
|
|
|
|
|
|
- // Open a stream from remote file, having passed actual, expected, projected, and filters to it
|
|
|
- SocketEndpoint ep(rfn.queryEndpoint());
|
|
|
- setDafsEndpointPort(ep);
|
|
|
+ // Open a stream from remote file, having passed actual, expected, projected, and filters to it
|
|
|
+ SocketEndpoint ep(rfn.queryEndpoint());
|
|
|
+ setDafsEndpointPort(ep);
|
|
|
|
|
|
- IConstArrayOf<IFieldFilter> fieldFilters; // These refer to the expected layout
|
|
|
- struct CIndexReadContext : implements IIndexReadContext
|
|
|
- {
|
|
|
- IConstArrayOf<IFieldFilter> &fieldFilters;
|
|
|
- CIndexReadContext(IConstArrayOf<IFieldFilter> &_fieldFilters) : fieldFilters(_fieldFilters)
|
|
|
- {
|
|
|
- }
|
|
|
- virtual void append(IKeySegmentMonitor *segment) override { throwUnexpected(); }
|
|
|
- virtual void append(FFoption option, const IFieldFilter * filter) override
|
|
|
- {
|
|
|
- fieldFilters.append(*filter);
|
|
|
- }
|
|
|
- } context(fieldFilters);
|
|
|
- helper->createSegmentMonitors(&context);
|
|
|
-
|
|
|
- RowFilter actualFilter;
|
|
|
- Owned<const IKeyTranslator> keyedTranslator = createKeyTranslator(actualFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true));
|
|
|
- if (keyedTranslator && keyedTranslator->needsTranslate())
|
|
|
- keyedTranslator->translate(actualFilter, fieldFilters);
|
|
|
- else
|
|
|
- actualFilter.appendFilters(fieldFilters);
|
|
|
-
|
|
|
- StringBuffer lPath;
|
|
|
- rfn.getLocalPath(lPath);
|
|
|
- Owned<IIndexLookup> indexLookup = createRemoteFilteredKey(ep, lPath, crc, actualFormat, projectedFormat, actualFilter, remoteLimit);
|
|
|
- if (indexLookup)
|
|
|
- {
|
|
|
- try
|
|
|
+ IConstArrayOf<IFieldFilter> fieldFilters; // These refer to the expected layout
|
|
|
+ struct CIndexReadContext : implements IIndexReadContext
|
|
|
{
|
|
|
- indexLookup->ensureAvailable();
|
|
|
- }
|
|
|
- catch (IException *e)
|
|
|
+ IConstArrayOf<IFieldFilter> &fieldFilters;
|
|
|
+ CIndexReadContext(IConstArrayOf<IFieldFilter> &_fieldFilters) : fieldFilters(_fieldFilters)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ virtual void append(IKeySegmentMonitor *segment) override { throwUnexpected(); }
|
|
|
+ virtual void append(FFoption option, const IFieldFilter * filter) override
|
|
|
+ {
|
|
|
+ fieldFilters.append(*filter);
|
|
|
+ }
|
|
|
+ } context(fieldFilters);
|
|
|
+ helper->createSegmentMonitors(&context);
|
|
|
+
|
|
|
+ RowFilter actualFilter;
|
|
|
+ Owned<const IKeyTranslator> keyedTranslator = createKeyTranslator(actualFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true));
|
|
|
+ if (keyedTranslator && keyedTranslator->needsTranslate())
|
|
|
+ keyedTranslator->translate(actualFilter, fieldFilters);
|
|
|
+ else
|
|
|
+ actualFilter.appendFilters(fieldFilters);
|
|
|
+
|
|
|
+ StringBuffer lPath;
|
|
|
+ rfn.getLocalPath(lPath);
|
|
|
+ Owned<IIndexLookup> indexLookup = createRemoteFilteredKey(ep, lPath, crc, actualFormat, projectedFormat, actualFilter, remoteLimit);
|
|
|
+ if (indexLookup)
|
|
|
{
|
|
|
-#ifdef _DEBUG
|
|
|
- EXCLOG(e, nullptr);
|
|
|
-#endif
|
|
|
- if (remoteReadException)
|
|
|
- e->Release(); // only record 1st
|
|
|
- else
|
|
|
+ try
|
|
|
+ {
|
|
|
+ indexLookup->ensureAvailable();
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
{
|
|
|
- remoteReadException.setown(e);
|
|
|
- remoteReadExceptionPath.set(path);
|
|
|
+ #ifdef _DEBUG
|
|
|
+ EXCLOG(e, nullptr);
|
|
|
+ #endif
|
|
|
+ if (remoteReadException)
|
|
|
+ e->Release(); // only record 1st
|
|
|
+ else
|
|
|
+ {
|
|
|
+ remoteReadException.setown(e);
|
|
|
+ remoteReadExceptionPath.set(path);
|
|
|
+ }
|
|
|
+ continue; // try next copy and ultimately failover to local when no more copies
|
|
|
}
|
|
|
- continue; // try next copy and ultimately failover to local when no more copies
|
|
|
+ ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
|
|
|
+ partNum = p;
|
|
|
+ return indexLookup.getClear();
|
|
|
}
|
|
|
- ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
|
|
|
- partNum = p;
|
|
|
- return indexLookup.getClear();
|
|
|
}
|
|
|
- }
|
|
|
- if (remoteReadException)
|
|
|
- {
|
|
|
- VStringBuffer msg("Remote streaming failure, failing over to direct read for: '%s'. ", remoteReadExceptionPath.str());
|
|
|
- remoteReadException->errorMessage(msg);
|
|
|
- Owned<IThorException> e2 = MakeActivityWarning(this, TE_RemoteReadFailure, "%s", msg.str());
|
|
|
- fireException(e2);
|
|
|
+ if (remoteReadException)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Remote streaming failure, failing over to direct read for: '%s'. ", remoteReadExceptionPath.str());
|
|
|
+ remoteReadException->errorMessage(msg);
|
|
|
+ Owned<IThorException> e2 = MakeActivityWarning(this, TE_RemoteReadFailure, "%s", msg.str());
|
|
|
+ fireException(e2);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -558,6 +560,88 @@ public:
|
|
|
statsArr = _statsArr.getArray();
|
|
|
lastSeeks = lastScans = 0;
|
|
|
localMerge = (localKey && partDescs.ordinality()>1) || seekGEOffset;
|
|
|
+
|
|
|
+ if (parts)
|
|
|
+ {
|
|
|
+ IPartDescriptor &part0 = partDescs.item(0);
|
|
|
+ IFileDescriptor &fileDesc = part0.queryOwner();
|
|
|
+
|
|
|
+ if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
|
|
|
+ {
|
|
|
+ if (!inChildQuery())
|
|
|
+ useRemoteStreaming = true;
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * If in a CQ, it is counterproductive to use an index read stream per CQ execution if the index
|
|
|
+ * involved is relatively small.
|
|
|
+ * Because, if the index is small and direct reading (and caching) key node pages, it is likely
|
|
|
+ * that repeated executions will not read any (or few) new key pages (i.e. cache hit).
|
|
|
+ *
|
|
|
+ * Example: small 1-way key being remotely read by the whole cluster.
|
|
|
+ * If it is small it will fit (or mostly fit) in node key cache, and thus mostly read from memory vs over the network etc.
|
|
|
+ *
|
|
|
+ */
|
|
|
+
|
|
|
+ // # data parts excluding TLK if present
|
|
|
+ unsigned totalNumDataParts = fileDesc.numParts();
|
|
|
+ if ((totalNumDataParts>1) && !fileDesc.queryProperties().getPropBool("@local"))
|
|
|
+ totalNumDataParts--; // TLK
|
|
|
+
|
|
|
+ offset_t logicalFileSize = fileDesc.queryProperties().getPropInt64("@size"); // NB: size is compressed size
|
|
|
+ if (!logicalFileSize) // not sure when/if this should ever be missing, but..
|
|
|
+ {
|
|
|
+ IWARNLOG("Missing @size in meta data for index file '%s'", logicalFilename.get());
|
|
|
+ // estimate size based on physical size of 1st part
|
|
|
+ RemoteFilename rfn;
|
|
|
+ part0.getFilename(0, rfn);
|
|
|
+ StringBuffer path;
|
|
|
+ rfn.getPath(path);
|
|
|
+ Owned<IFile> iFile = createIFile(path);
|
|
|
+ offset_t partSize = iFile->size();
|
|
|
+ logicalFileSize = partSize * totalNumDataParts;
|
|
|
+ }
|
|
|
+
|
|
|
+ memsize_t keyCacheSize = queryJob().getKeyNodeCacheSize() + queryJob().getKeyLeafCacheSize();
|
|
|
+ memsize_t minRemoteCQIndexSizeMb = getOptInt64(THOROPT_MIN_REMOTE_CQ_INDEX_SIZE_MB);
|
|
|
+ if (minRemoteCQIndexSizeMb)
|
|
|
+ {
|
|
|
+ // anything larger is streamed, anything smaller is read directly
|
|
|
+ if (logicalFileSize > (minRemoteCQIndexSizeMb * 0x100000))
|
|
|
+ useRemoteStreaming = true;
|
|
|
+ }
|
|
|
+ else // no min. size to stream set, so use a heuristic
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * Rough heuristic.
|
|
|
+ *
|
|
|
+ * If (([average compressed part size] * [# parts handling] * [compressionMultiple] * [cacheSizeFitPercentage%]) > [keyCacheSize])
|
|
|
+ * then useRemoteStreaming = true
|
|
|
+ *
|
|
|
+ * i.e. if the [cacheSizeFitPercentage] % of total size of the compressed index data this slave is handling multiplied
|
|
|
+ * by a rough compression multiplier [compressionMultiple] is larger than the cache size [keyCacheSize], then use streaming.
|
|
|
+ * If not (useRemoteStreaming=false), direct read (and use the cache).
|
|
|
+ *
|
|
|
+ * The cacheSizeFitPercentage (25%) is used, so that the index has to be significantly bigger than the cache to use streaming,
|
|
|
+ * because it is still worth directly reading on relatively small indexes, even if 1:4 cache hits are acheived.
|
|
|
+ *
|
|
|
+ */
|
|
|
+
|
|
|
+ static const unsigned compressionMultiple = 10; // v. rough approx. of compression ratio (actual compression ratio/uncompressed size unknown)
|
|
|
+ static const unsigned cacheSizeFitPercentage = 25; // if this much (%) of amount I'm handling fits into cache
|
|
|
+
|
|
|
+ offset_t avgPartSize = logicalFileSize / totalNumDataParts;
|
|
|
+
|
|
|
+ // NB: The # parts this slave is dealing with (partDescs.ordinality()) is equal to all data parts (totalNumDataParts) when in a CQ.
|
|
|
+ offset_t myIndexPartSizeTotal = avgPartSize * partDescs.ordinality() * compressionMultiple;
|
|
|
+
|
|
|
+ offset_t myIndexPartSizeHitShare = myIndexPartSizeTotal * cacheSizeFitPercentage / 100;
|
|
|
+ if (myIndexPartSizeHitShare >= keyCacheSize) // e.g. if 25% of my handled index data is larger than cache
|
|
|
+ useRemoteStreaming = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
// IThorDataLink
|
|
|
virtual void start() override
|