Explorar el Código

HPCC-19566 Add virtual field support to remote reads

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith hace 7 años
padre
commit
283f22063d

+ 61 - 21
common/remote/sockfile.cpp

@@ -41,6 +41,7 @@
 
 #include "remoteerr.hpp"
 #include <atomic>
+#include <unordered_map>
 
 #include "rtldynfield.hpp"
 #include "rtlds_imp.hpp"
@@ -1783,7 +1784,7 @@ void CEndpointCS::beforeDispose()
     table.removeExact(this);
 }
 
-class CRemoteFilteredFileIOBase : public CRemoteBase, implements IFileIO
+class CRemoteFilteredFileIOBase : public CRemoteBase, implements IRemoteFileIO
 {
 public:
     IMPLEMENT_IINTERFACE;
@@ -1794,7 +1795,7 @@ public:
     {
         // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
 
-        openRequest(request);
+        openRequest();
         request.appendf("\"format\" : \"binary\",\n"
             "\"node\" : {\n"
             " \"fileName\" : \"%s\"", filename);
@@ -1883,17 +1884,49 @@ public:
          */
         return 0;
     }
+// IRemoteFileIO
+    virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) override
+    {
+        virtualFields[fieldName] = fieldValue;
+    }
+    virtual void ensureAvailable() override
+    {
+        if (firstRequest)
+            handleFirstRequest();
+    }
 protected:
-    StringBuffer &openRequest(StringBuffer &request)
+    StringBuffer &openRequest()
     {
         return request.append("{\n");
     }
-    StringBuffer &closeRequest(StringBuffer &request)
+    StringBuffer &closeRequest()
     {
         return request.append("\n }\n");
     }
+    void addVirtualFields()
+    {
+        request.append(", \n \"virtualFields\" : {\n");
+        bool first=true;
+        for (auto &e : virtualFields)
+        {
+            if (!first)
+                request.append(",\n");
+            request.appendf("  \"%s\" : \"%s\"", e.first.c_str(), e.second.c_str());
+            first = false;
+        }
+        request.append(" }");
+    }
+    void handleFirstRequest()
+    {
+        firstRequest = false;
+        addVirtualFields();
+        closeRequest();
+        sendRequest(0, nullptr);
+    }
     void refill()
     {
+        if (firstRequest)
+            handleFirstRequest();
         size32_t cursorLength;
         reply.read(cursorLength);
         if (!cursorLength)
@@ -1950,6 +1983,9 @@ protected:
     size32_t bufRemaining = 0;
     unsigned bufPos = 0;
     bool eof = false;
+
+    bool firstRequest = true;
+    std::unordered_map<std::string, std::string> virtualFields;
 };
 
 class CRemoteFilteredFileIO : public CRemoteFilteredFileIOBase
@@ -1965,8 +2001,6 @@ public:
             " \"compressed\" : \"%s\",\n"
             " \"inputGrouped\" : \"%s\",\n"
             " \"outputGrouped\" : \"%s\"", boolToStr(compressed), boolToStr(grouped), boolToStr(grouped));
-        closeRequest(request);
-        sendRequest(0, nullptr);
     }
 };
 
@@ -1997,7 +2031,7 @@ protected:
     const RtlRecord &recInfo;
 };
 
-extern IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
+extern IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
 {
     try
     {
@@ -2020,8 +2054,6 @@ public:
         : CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
     {
         request.appendf(",\n \"kind\" : \"indexread\"");
-        closeRequest(request);
-        sendRequest(0, nullptr);
     }
 };
 
@@ -2034,14 +2066,12 @@ public:
         : CRemoteFilteredFileIOBase(ep, filename, actual, actual, fieldFilters, rowLimit)
     {
         request.appendf(",\n \"kind\" : \"indexcount\"");
-        closeRequest(request);
-        sendRequest(0, nullptr);
     }
 };
 
 class CRemoteKey : public CSimpleInterfaceOf<IIndexLookup>
 {
-    Owned<IFileIO> fileIO;
+    Owned<IRemoteFileIO> iRemoteFileIO;
     offset_t pos = 0;
     Owned<ISourceRowPrefetcher> prefetcher;
     CThorContiguousRowBuffer prefetchBuffer;
@@ -2058,15 +2088,19 @@ public:
     {
         for (unsigned f=0; f<_fieldFilters.numFilterFields(); f++)
             fieldFilters.addFilter(OLINK(_fieldFilters.queryFilter(f)));
-        fileIO.setown(new CRemoteFilteredKeyIO(ep, filename, actual, projected, fieldFilters, rowLimit));
-        if (!fileIO)
+        iRemoteFileIO.setown(new CRemoteFilteredKeyIO(ep, filename, actual, projected, fieldFilters, rowLimit));
+        if (!iRemoteFileIO)
             throw MakeStringException(0, "Unable to open remote key part: '%s'", filename.get());
-        strm.setown(createFileSerialStream(fileIO));
+        strm.setown(createFileSerialStream(iRemoteFileIO));
         prefetcher.setown(projected->createDiskPrefetcher());
         assertex(prefetcher);
         prefetchBuffer.setStream(strm);
     }
 // IIndexLookup
+    virtual void ensureAvailable() override
+    {
+        iRemoteFileIO->ensureAvailable(); // will throw an exception if fails
+    }
     virtual unsigned __int64 getCount() override
     {
         return checkCount(0);
@@ -3925,6 +3959,8 @@ protected:
     bool eofSeen = false;
     const RtlRecord *record = nullptr;
     RowFilter filters;
+    // virtual field values
+    StringAttr logicalFilename;
 
     void initCommon(IPropertyTree &config)
     {
@@ -3935,6 +3971,7 @@ protected:
         Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
         ForEach(*filterIter)
             filters.addFilter(*record, filterIter->query().queryProp(nullptr));
+        logicalFilename.set(config.queryProp("virtualFields/logicalFilename"));
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IRemoteActivity>)
@@ -3966,8 +4003,7 @@ public:
 //interface IVirtualFieldCallback
     virtual const char * queryLogicalFilename(const void * row) override
     {
-        const char * filename = nullptr; // MORE: This needs to be passed from the client to dafilesrv
-        return filename ? filename : "";
+        return logicalFilename.str();
     }
     virtual unsigned __int64 getFilePosition(const void * row) override
     {
@@ -3995,6 +4031,9 @@ class CRemoteDiskReadActivity : public CRemoteDiskBaseActivity
     mutable bool eogPending = false;
     mutable bool someInGroup = false;
     RtlDynRow *filterRow = nullptr;
+    // virtual field values
+    unsigned partNum = 0;
+    offset_t baseFpos = 0;
 
     void checkOpen()
     {
@@ -4063,6 +4102,9 @@ public:
         if (!outMeta)
             outMeta.set(inMeta);
 
+        partNum = config.getPropInt("virtualFields/partNum");
+        baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
+
         initCommon(config);
         if (config.hasProp("keyFilter"))
             filterRow = new RtlDynRow(*record);
@@ -4149,13 +4191,11 @@ public:
 //interface IVirtualFieldCallback
     virtual unsigned __int64 getFilePosition(const void * row) override
     {
-        unsigned __int64 baseOffset = 0; // MORE: This needs to be passed from the client to dafilesrv
-        return prefetchBuffer.tell() + baseOffset;
+        return prefetchBuffer.tell() + baseFpos;
     }
     virtual unsigned __int64 getLocalFilePosition(const void * row) override
     {
-        unsigned part = 0; // MORE: This needs to be passed from the client to dafilesrv
-        return makeLocalFposOffset(part, prefetchBuffer.tell());
+        return makeLocalFposOffset(partNum, prefetchBuffer.tell());
     }
 };
 

+ 6 - 1
common/remote/sockfile.hpp

@@ -80,7 +80,12 @@ extern REMOTE_API ISocket *connectDafs(SocketEndpoint &ep, unsigned timeoutms);
 extern REMOTE_API ISocket *checkSocketSecure(ISocket *socket);
 interface IOutputMetaData;
 class RowFilter;
-extern REMOTE_API IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseNLimit);
+interface IRemoteFileIO : extends IFileIO
+{
+    virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) = 0;
+    virtual void ensureAvailable() = 0;
+};
+extern REMOTE_API IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseNLimit);
 
 interface IIndexLookup;
 extern REMOTE_API IIndexLookup *createRemoteFilteredKey(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseNLimit);

+ 7 - 2
ecl/hthor/hthor.cpp

@@ -8282,12 +8282,17 @@ bool CHThorDiskReadBaseActivity::openNext()
                         StringBuffer path;
                         rfilename.getLocalPath(path);
 
-                        inputfileio.setown(createRemoteFilteredFile(ep, path, actualDiskMeta, projectedDiskMeta, actualFilter, compressed, grouped, remoteLimit));
-                        if (inputfileio)
+                        Owned<IRemoteFileIO> remoteFileIO = createRemoteFilteredFile(ep, path, actualDiskMeta, projectedDiskMeta, actualFilter, compressed, grouped, remoteLimit);
+                        if (remoteFileIO)
                         {
+                            StringBuffer tmp;
+                            remoteFileIO->addVirtualFieldMapping("logicalFilename", mangledHelperFileName.str());
+                            remoteFileIO->addVirtualFieldMapping("baseFpos", tmp.clear().append(offsetOfPart).str());
+                            remoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(curPart->getPartIndex()).str());
                             actualDiskMeta.set(projectedDiskMeta);
                             expectedDiskMeta = projectedDiskMeta;
                             actualFilter.clear();
+                            inputfileio.setown(remoteFileIO.getClear());
                         }
                     }
                     if (inputfileio)

+ 1 - 0
system/jhtree/jhtree.cpp

@@ -2995,6 +2995,7 @@ extern jhtree_decl IIndexLookup *createIndexLookup(IKeyManager *keyManager)
         CIndexLookup(IKeyManager *_keyManager) : keyManager(_keyManager)
         {
         }
+        virtual void ensureAvailable() override { }
         virtual unsigned __int64 getCount() override
         {
             return keyManager->getCount();

+ 1 - 0
system/jhtree/jhtree.hpp

@@ -234,6 +234,7 @@ public:
 
 interface IIndexLookup : extends IInterface // similar to a small subset of IKeyManager
 {
+    virtual void ensureAvailable() = 0;
     virtual const void *nextKey() = 0;
     virtual unsigned __int64 getCount() = 0;
     virtual unsigned __int64 checkCount(unsigned __int64 limit) = 0;

+ 17 - 5
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -276,16 +276,28 @@ void CDiskRecordPartHandler::open()
                     else
                         actualFilter.appendFilters(activity.fieldFilters);
                 }
-                Owned<IFileIO> iFileIO = createRemoteFilteredFile(ep, path, actualFormat, projectedFormat, actualFilter, compressed, activity.grouped, activity.remoteLimit);
-                if (iFileIO)
+                Owned<IRemoteFileIO> iRemoteFileIO = createRemoteFilteredFile(ep, path, actualFormat, projectedFormat, actualFilter, compressed, activity.grouped, activity.remoteLimit);
+                if (iRemoteFileIO)
                 {
+                    StringBuffer tmp;
+                    iRemoteFileIO->addVirtualFieldMapping("logicalFilename", logicalFilename.get());
+                    iRemoteFileIO->addVirtualFieldMapping("baseFpos", tmp.clear().append(fileBaseOffset).str());
+                    iRemoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(partDesc->queryPartIndex()).str());
                     rfn.getPath(path);
                     filename.set(path);
                     checkFileCrc = false;
 
-                    // JCSMORE - needTransform - see CDiskPartHandler::nextRow(), may need/want to differentiate if only transform is only remote
-
-                    partStream.setown(createRowStreamEx(iFileIO, activity.queryProjectedDiskRowInterfaces(), 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, nullptr, this));
+                    try
+                    {
+                        iRemoteFileIO->ensureAvailable(); // force open now, because want to failover to other copies if fails (e.g. remote part is missing)
+                    }
+                    catch (IException *e)
+                    {
+                        EXCLOG(e, nullptr);
+                        e->Release();
+                        continue; // try next copy and ultimately failover to local when no more copies
+                    }
+                    partStream.setown(createRowStreamEx(iRemoteFileIO, activity.queryProjectedDiskRowInterfaces(), 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, nullptr, this));
                     ActPrintLog(&activity, "%s[part=%d]: reading remote dafilesrv file '%s' (logical file = %s)", kindStr, which, path.str(), activity.logicalFilename.get());
                     break;
                 }

+ 10 - 0
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -201,6 +201,16 @@ public:
                             Owned<IIndexLookup> indexLookup = createRemoteFilteredKey(ep, lPath, actualFormat, projectedFormat, actualFilter, remoteLimit);
                             if (indexLookup)
                             {
+                                try
+                                {
+                                    indexLookup->ensureAvailable();
+                                }
+                                catch (IException *e)
+                                {
+                                    EXCLOG(e, nullptr);
+                                    e->Release();
+                                    continue; // try next copy and ultimately failover to local when no more copies
+                                }
                                 ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
                                 partNum = p;
                                 return indexLookup.getClear();