瀏覽代碼

Merge pull request #11688 from jakesmith/hpcc-20504

HPCC-20504 Add dafilesrv reply errors codes and normalize json response/specialize testsocket response

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 6 年之前
父節點
當前提交
a9ddb1fd77
共有 5 個文件被更改,包括 110 次插入39 次删除
  1. 7 1
      common/remote/remoteerr.hpp
  2. 76 34
      common/remote/sockfile.cpp
  3. 4 1
      common/remote/sockfile.hpp
  4. 2 0
      tools/testsocket/CMakeLists.txt
  5. 21 3
      tools/testsocket/testsocket.cpp

+ 7 - 1
common/remote/remoteerr.hpp

@@ -99,7 +99,13 @@ enum DAFS_ERROR_CODES {
     DAFSERR_authenticate_failed             = -2,
     DAFSERR_protocol_failure                = -3,
     DAFSERR_serveraccept_failed             = -4,
-    DAFSERR_serverinit_failed               = -5
+    DAFSERR_serverinit_failed               = -5,
+    DAFSERR_cmdstream_invalidexpiry         = -6,
+    DAFSERR_cmdstream_authexpired           = -7,
+    DAFSERR_cmdstream_unsupported_recfmt    = -8,
+    DAFSERR_cmdstream_openfailure           = -9,
+    DAFSERR_cmdstream_protocol_failure      = -10,
+    DAFSERR_cmdstream_unauthorized          = -11
 };
 
 

+ 76 - 34
common/remote/sockfile.cpp

@@ -186,14 +186,13 @@ struct dummyReadWrite
 // backward compatible modes
 typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
 
-static const char *VERSTRING= "DS V2.3"       // dont forget FILESRV_VERSION in header
+static const char *VERSTRING= "DS V2.4"       // dont forget FILESRV_VERSION in header
 #ifdef _WIN32
 "Windows ";
 #else
 "Linux ";
 #endif
 
-typedef unsigned char RemoteFileCommandType;
 typedef int RemoteFileIOHandle;
 
 static unsigned maxConnectTime = 0;
@@ -349,17 +348,26 @@ enum
     RFCsetthrottle2,
     RFCsetfileperms,
 // 2.0
-    RFCreadfilteredindex,    // No longer used
+    RFCreadfilteredindex,    // No longer used     // 40
     RFCreadfilteredindexcount,
     RFCreadfilteredindexblob,
 // 2.2
-    RFCStreamRead,
-    RFCStreamReadTestSocket = '{',
+    RFCStreamRead,                                 // 43
+// 2.4
+    RFCStreamReadTestSocket,                       // 44
+    RFCStreamReadJSON = '{',
     RFCmaxnormal,
     RFCmax,
     RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
 };
 
+// used by testsocket only
+RemoteFileCommandType queryRemoteStreamCmd()
+{
+    return RFCStreamReadTestSocket;
+}
+
+
 #define RFCText(cmd) #cmd
 
 const char *RFCStrings[] =
@@ -408,11 +416,12 @@ const char *RFCStrings[] =
     RFCText(RFCreadfilteredcount),
     RFCText(RFCreadfilteredblob),
     RFCText(RFCStreamRead),
+    RFCText(RFCStreamReadTestSocket),
 };
 static const char *getRFCText(RemoteFileCommandType cmd)
 {
-    if (cmd==RFCStreamReadTestSocket)
-        return "RFCStreamReadTestSocket";
+    if (cmd==RFCStreamReadJSON)
+        return "RFCStreamReadJSON";
     else
     {
         unsigned elems = sizeof(RFCStrings) / sizeof(RFCStrings[0]);
@@ -608,9 +617,25 @@ public:
     }
 };
 
-static IDAFS_Exception *createDafsException(int code,const char *msg)
+static IDAFS_Exception *createDafsException(int code, const char *msg)
 {
-    return new CDafsException(code,msg);
+    return new CDafsException(code, msg);
+}
+
+static IDAFS_Exception *createDafsExceptionVA(int code, const char *format, va_list args)
+{
+    StringBuffer eStr;
+    eStr.limited_valist_appendf(1024, format, args);
+    return new CDafsException(code, eStr);
+}
+
+static IDAFS_Exception *createDafsExceptionV(int code, const char *format, ...)
+{
+    va_list args;
+    va_start(args, format);
+    IDAFS_Exception *ret = createDafsExceptionVA(code, format, args);
+    va_end(args);
+    return ret;
 }
 
 void setDafsEndpointPort(SocketEndpoint &ep)
@@ -1836,7 +1861,7 @@ public:
         }
         MemoryBuffer actualTypeInfo;
         if (!dumpTypeInfo(actualTypeInfo, actual->querySerializedDiskMeta()->queryTypeInfo()))
-            throw MakeStringException(0, "Format not supported by remote read");
+            throw createDafsException(DAFSERR_cmdstream_unsupported_recfmt, "Format not supported by remote read");
         request.append(",\n \"inputBin\" : \"");
         JBASE64_Encode(actualTypeInfo.toByteArray(), actualTypeInfo.length(), request, false);
         request.append("\"");
@@ -2137,7 +2162,7 @@ public:
             fieldFilters.addFilter(OLINK(_fieldFilters.queryFilter(f)));
         iRemoteFileIO.setown(new CRemoteFilteredKeyIO(ep, filename, crc, actual, projected, fieldFilters, rowLimit));
         if (!iRemoteFileIO)
-            throw MakeStringException(0, "Unable to open remote key part: '%s'", filename.get());
+            throwStringExceptionV(DAFSERR_cmdstream_openfailure, "Unable to open remote key part: '%s'", filename.get());
         strm.setown(createFileSerialStream(iRemoteFileIO));
         prefetcher.setown(projected->createDiskPrefetcher());
         assertex(prefetcher);
@@ -4029,7 +4054,7 @@ protected:
     {
         fileName.set(config.queryProp("fileName"));
         if (isEmptyString(fileName))
-            throw MakeStringException(0, "CRemoteDiskBaseActivity: fileName missing");
+            throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteDiskBaseActivity: fileName missing");
 
         record = &inMeta->queryRecordAccessor(true);
         translator.setown(createRecordTranslator(outMeta->queryRecordAccessor(true), *record));
@@ -4136,7 +4161,7 @@ class CRemoteDiskReadActivity : public CRemoteDiskBaseActivity
         {
             iFileIO.setown(iFile->open(IFOread));
             if (!iFileIO)
-                throw MakeStringException(1, "Failed to open: '%s'", fileName.get());
+                throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Failed to open: '%s'", fileName.get());
             if (compressed)
             {
                 WARNLOG("meta info marked file '%s' as compressed, but detected file as uncompressed", fileName.get());
@@ -4468,13 +4493,21 @@ void checkExpiryTime(IPropertyTree &metaInfo)
 {
     const char *expiryTime = metaInfo.queryProp("expiryTime");
     if (isEmptyString(expiryTime))
-        throwStringExceptionV(0, "createRemoteActivity: invalid expiry specification");
+        throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
+
     CDateTime expiryTimeDt;
-    expiryTimeDt.setString(expiryTime);
+    try
+    {
+        expiryTimeDt.setString(expiryTime);
+    }
+    catch (IException *e)
+    {
+        throw createDafsException(DAFSERR_cmdstream_invalidexpiry, "createRemoteActivity: invalid expiry specification");
+    }
     CDateTime nowDt;
     nowDt.setNow();
     if (nowDt >= expiryTimeDt)
-        throwStringExceptionV(0, "createRemoteActivity: authorization expired");
+        throw createDafsException(DAFSERR_cmdstream_authexpired, "createRemoteActivity: authorization expired");
 }
 
 void verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IPropertyTree *keyPairInfo)
@@ -4487,7 +4520,7 @@ void verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IProperty
     StringBuffer metaInfoB64;
     actNode.getProp("metaInfo", metaInfoB64);
     if (0 == metaInfoB64.length())
-        throwStringExceptionV(0, "createRemoteActivity: missing metaInfo");
+        throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: missing metaInfo");
 
     MemoryBuffer compressedMetaInfoMb;
     JBASE64_Decode(metaInfoB64.str(), compressedMetaInfoMb);
@@ -4502,7 +4535,7 @@ void verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IProperty
 
     bool isSigned = metaInfoBlob.length() != 0;
     if (authorizedOnly && !isSigned)
-        throwStringExceptionV(0, "createRemoteActivity: unathorized");
+        throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: unathorized");
 
     if (isSigned)
     {
@@ -4515,18 +4548,18 @@ void verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IProperty
 
         StringBuffer metaInfoSignature;
         if (!metaInfoEnvelope->getProp("signature", metaInfoSignature))
-            throwStringExceptionV(0, "createRemoteActivity: missing signature");
+            throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing signature");
 
         VStringBuffer keyPairPath("KeyPair[@name=\"%s\"]", keyPairName);
         IPropertyTree *keyPair = keyPairInfo->queryPropTree(keyPairPath);
         if (!keyPair)
-            throwStringExceptionV(0, "createRemoteActivity: missing key pair definition");
+            throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing key pair definition");
         const char *publicKeyFName = keyPair->queryProp("@publicKey");
         if (isEmptyString(publicKeyFName))
-            throwStringExceptionV(0, "createRemoteActivity: missing public key definition");
+            throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: missing public key definition");
         Owned<CLoadedKey> publicKey = loadPublicKeyFromFile(publicKeyFName, nullptr); // NB: if cared could cache loaded keys
         if (!digiVerify(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *publicKey))
-            throwStringExceptionV(0, "createRemoteActivity: signature verification failed");
+            throw createDafsException(DAFSERR_cmdstream_unauthorized, "createRemoteActivity: signature verification failed");
 
         checkExpiryTime(*metaInfo);
     }
@@ -4547,7 +4580,7 @@ void verifyMetaInfo(IPropertyTree &actNode, bool authorizedOnly, const IProperty
     StringBuffer partFileName;
     fileInfo->getProp(xpath, partFileName);
     if (!partFileName.length())
-        throwStringExceptionV(0, "createRemoteActivity: invalid file info");
+        throw createDafsException(DAFSERR_cmdstream_protocol_failure, "createRemoteActivity: invalid file info");
 
     actNode.setProp("fileName", partFileName.str());
     verifyex(actNode.removeProp("metaInfo")); // no longer needed
@@ -4599,7 +4632,7 @@ IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnl
                     if (streq("count", action))
                         activity.setown(createRemoteIndexCount(actNode));
                     else
-                        throwStringExceptionV(0, "Unknown action '%s' on index '%s'", action, partFileName);
+                        throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on index '%s'", action, partFileName);
                 }
                 else
                     activity.setown(createRemoteIndexRead(actNode));
@@ -4609,9 +4642,9 @@ IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnl
                 if (!isEmptyString(action))
                 {
                     if (streq("count", action))
-                        throwStringExceptionV(0, "Remote Disk Counts currently unsupported");
+                        throw createDafsException(DAFSERR_cmdstream_protocol_failure, "Remote Disk Counts currently unsupported");
                     else
-                        throwStringExceptionV(0, "Unknown action '%s' on flat file '%s'", action, partFileName);
+                        throw createDafsExceptionV(DAFSERR_cmdstream_protocol_failure, "Unknown action '%s' on flat file '%s'", action, partFileName);
                 }
                 else
                     activity.setown(createRemoteDiskRead(actNode));
@@ -6689,22 +6722,29 @@ public:
         }
     }
 
-    void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
     {
-        reply.append('J');
-        /* testsocket is not actually passing in a command, and is interpreting '{' as the cmd to get here.
-         * so rewind so it can be read/parsed as JSON by cmdStreamRead
-         */
-        msg.reset(msg.getPos()-sizeof(RemoteFileCommandType));
+        reply.append(RFEnoerror);
         cmdStreamReadCommon(msg, reply, client);
     }
 
-    void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
     {
+        /* NB: exactly the same handling as cmdStreamReadStd(RFCStreamRead) for now,
+         * may want to differentiate later
+         * i.e. return format is { len[unsigned4-bigendian], errorcode[unsigned4-bigendian], result } - where result format depends on request output type.
+         * errorcode = 0 means no error
+         */
         reply.append(RFEnoerror);
         cmdStreamReadCommon(msg, reply, client);
     }
 
+    void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    {
+        reply.append('J');
+        cmdStreamReadCommon(msg, reply, client);
+    }
+
     // legacy version
     void cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
     {
@@ -6811,6 +6851,7 @@ public:
             case RFCunlock:
             case RFCStreamRead:
             case RFCStreamReadTestSocket:
+            case RFCStreamReadJSON:
                 stdCmdThrottler.addCommand(cmd, msg, client);
                 return;
             // NB: The following commands are still bound by the the thread pool
@@ -6865,8 +6906,9 @@ public:
                 MAPCOMMAND(RFCgetinfo, cmdGetInfo);
                 MAPCOMMAND(RFCfirewall, cmdFirewall);
                 MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
-                MAPCOMMANDCLIENTTESTSOCKET(RFCStreamReadTestSocket, cmdStreamReadTestSocket, *client);
                 MAPCOMMANDCLIENT(RFCStreamRead, cmdStreamReadStd, *client);
+                MAPCOMMANDCLIENT(RFCStreamReadJSON, cmdStreamReadJSON, *client);
+                MAPCOMMANDCLIENTTESTSOCKET(RFCStreamReadTestSocket, cmdStreamReadTestSocket, *client);
                 MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);

+ 4 - 1
common/remote/sockfile.hpp

@@ -61,7 +61,7 @@ interface IRemoteFileServer : extends IInterface
     virtual StringBuffer &getStats(StringBuffer &stats, bool reset) = 0;
 };
 
-#define FILESRV_VERSION 23 // don't forget VERSTRING in sockfile.cpp
+#define FILESRV_VERSION 24 // don't forget VERSTRING in sockfile.cpp
 
 interface IKeyManager;
 interface IDelayedFile;
@@ -118,4 +118,7 @@ extern bool clientAsyncCopyFileSection(const char *uuid,    // from genUUID - mu
 extern void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime);
 extern void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket);
 
+typedef unsigned char RemoteFileCommandType;
+extern RemoteFileCommandType queryRemoteStreamCmd(); // used by testsocket only
+
 #endif

+ 2 - 0
tools/testsocket/CMakeLists.txt

@@ -27,6 +27,7 @@ include_directories (
          ./../../system/jlib
          ./../../system/include 
          ./../../system/security/securesocket
+         ./../../common/remote
     )
 
 ADD_DEFINITIONS( -D_CONSOLE )
@@ -35,6 +36,7 @@ set ( SRCS testsocket.cpp )
 HPCC_ADD_EXECUTABLE ( testsocket ${SRCS} )
 target_link_libraries ( testsocket
          jlib
+         remote
          )
 
 IF (USE_OPENSSL)

+ 21 - 3
tools/testsocket/testsocket.cpp

@@ -26,6 +26,7 @@
 #include "jthread.hpp"
 #include "jfile.hpp"
 #include "securesocket.hpp"
+#include "sockfile.hpp"
 
 bool abortEarly = false;
 bool forceHTTP = false;
@@ -45,6 +46,7 @@ bool doLock = false;
 bool roxieLogMode = false;
 bool rawOnly = false;
 bool rawSend = false;
+bool remoteStreamQuery = false;
 bool remoteStreamForceResend = false;
 bool remoteStreamSendCursor = false;
 int verboseDbgLevel = 0;
@@ -335,7 +337,11 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         }
         else if (remoteReadRequest)
         {
-            Owned<IPropertyTree> requestTree = createPTreeFromJSONString(queryLen, query);
+            auto cmd = queryRemoteStreamCmd();
+            size32_t remoteStreamCmdSz = sizeof(cmd);
+            size32_t jsonQueryLen = queryLen - remoteStreamCmdSz;
+            const char *jsonQuery = query + remoteStreamCmdSz;
+            Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonQueryLen, jsonQuery);
             Owned<IPropertyTree> responseTree; // used if response is xml or json
             const char *outputFmtStr = requestTree->queryProp("format");
             const char *response = nullptr;
@@ -406,12 +412,13 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
 
                 requestTree->setProp("format", outputFmtStr);
                 StringBuffer requestStr;
+                requestStr.append(queryRemoteStreamCmd());
                 toJSON(requestTree, requestStr);
 
                 if (verboseDbgLevel > 0)
                 {
                     fputs("\nNext request:", stdout);
-                    fputs(requestStr, stdout);
+                    fputs(requestStr.str()+remoteStreamCmdSz, stdout);
                     fputs("\n", stdout);
                     fflush(stdout);
                 }
@@ -439,9 +446,10 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
                 PROGLOG("Retry send for handle: %u", cursorHandle);
                 requestTree->setProp("cursorBin", remoteReadCursor);
                 StringBuffer requestStr;
+                requestStr.append(queryRemoteStreamCmd());
                 toJSON(requestTree, requestStr);
 
-                PROGLOG("requestStr = %s", requestStr.str());
+                PROGLOG("requestStr = %s", requestStr.str()+remoteStreamCmdSz);
                 sendlen = requestStr.length();
                 _WINREV(sendlen);
                 if (!rawSend && !useHTTP)
@@ -637,7 +645,11 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
             }
         }
         else
+        {
+            if (remoteStreamQuery)
+                fullQuery.append(queryRemoteStreamCmd());
             fullQuery.append(base);
+        }
     }
 
     const char * query = fullQuery.str();
@@ -784,6 +796,7 @@ void usage(int exitCode)
     printf("  -qname xx Use xx as queryname in place of the xml root element name\n");
     printf("  -r <n>    repeat the query several times\n");
     printf("  -rl       roxie logfile mode\n");
+    printf("  -rs       remote stream request\n");
     printf("  -rsr      force remote stream resend per continuation request\n");
     printf("  -rssc     send cursor per continuation request\n");
     printf("  -s        add stars to indicate transfer packets\n");
@@ -988,6 +1001,11 @@ int main(int argc, char **argv)
             }
             arg+=2;
         }
+        else if (strieq(argv[arg], "-rs"))
+        {
+            remoteStreamQuery = true;
+            ++arg;
+        }
         else if (strieq(argv[arg], "-rsr"))
         {
             remoteStreamForceResend = true;