瀏覽代碼

Merge pull request #15749 from jakesmith/HPCC-27047-k8s-foreign-access

HPCC-27047 K8s Foreign file access

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父節點
當前提交
fd14ea0322

+ 0 - 63
common/workunit/workunit.cpp

@@ -14307,25 +14307,6 @@ KeepK8sJobs translateKeepJobs(const char *keepJob)
     return KeepK8sJobs::none;
 }
 
-// NB: will fire an exception if command fails (returns non-zero exit code)
-static void runKubectlCommand(const char *title, const char *cmd, const char *input, StringBuffer *output)
-{
-    StringBuffer _output, error;
-    if (!output)
-        output = &_output;
-    unsigned ret = runExternalCommand(title, *output, error, cmd, input, ".", nullptr);
-    if (output->length())
-        MLOG(MCExtraneousInfo, unknownJob, "%s: ret=%u, stdout=%s", cmd, ret, output->trimRight().str());
-    if (error.length())
-        MLOG(MCinternalError, unknownJob, "%s: ret=%u, stderr=%s", cmd, ret, error.trimRight().str());
-    if (ret)
-    {
-        if (input)
-            MLOG(MCinternalError, unknownJob, "Using input %s", input);
-        throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", cmd, ret, error.str());
-    }
-}
-
 bool isActiveK8sService(const char *serviceName)
 {
     VStringBuffer getEndpoints("kubectl get endpoints %s \"--output=jsonpath={range .subsets[*].addresses[*]}{.ip}{'\\n'}{end}\"", serviceName);
@@ -14479,50 +14460,6 @@ void runK8sJob(const char *componentName, const char *wuid, const char *job, con
         throw exception.getClear();
 }
 
-
-std::pair<std::string, unsigned> getExternalService(const char *serviceName)
-{
-    static CTimeLimitedCache<std::string, std::pair<std::string, unsigned>> externalServiceCache;
-    static CriticalSection externalServiceCacheCrit;
-
-    {
-        CriticalBlock b(externalServiceCacheCrit);
-        std::pair<std::string, unsigned> cachedExternalSevice;
-        if (externalServiceCache.get(serviceName, cachedExternalSevice))
-            return cachedExternalSevice;
-    }
-
-    StringBuffer output;
-    try
-    {
-        VStringBuffer getServiceCmd("kubectl get svc --selector=server=%s --output=jsonpath={.items[0].status.loadBalancer.ingress[0].hostname},{.items[0].status.loadBalancer.ingress[0].ip},{.items[0].spec.ports[0].port}", serviceName);
-        runKubectlCommand("get-external-service", getServiceCmd, nullptr, &output);
-    }
-    catch (IException *e)
-    {
-        EXCLOG(e);
-        VStringBuffer exceptionText("Failed to get external service for '%s'. Error: [%d, ", serviceName, e->errorCode());
-        e->errorMessage(exceptionText).append("]");
-        e->Release();
-        throw makeStringException(-1, exceptionText);
-    }
-    StringArray fields;
-    fields.appendList(output, ",");
-
-    // NB: add even if no result, want non-result to be cached too
-    std::string host, port;
-    if (fields.ordinality() == 3) // hostname,ip,port. NB: hostname may be missing, but still present as a blank field
-    {
-        host = fields.item(0); // hostname
-        if (0 == host.length())
-            host = fields.item(1); // ip
-        port = fields.item(2);
-    }
-    auto servicePair = std::make_pair(host, atoi(port.c_str()));
-    externalServiceCache.add(serviceName, servicePair);
-    return servicePair;
-}
-
 // returns a vector of {pod-name, node-name} vectors,
 // represented as a nested vector for extensibility, e.g. to add other meta fields
 std::vector<std::vector<std::string>> getPodNodes(const char *selector)

+ 0 - 3
common/workunit/workunit.hpp

@@ -1770,9 +1770,6 @@ extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job,
 extern WORKUNIT_API bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *suffix, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional);
 extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});
 
-// return the k8s external host and port for serviceName
-extern WORKUNIT_API std::pair<std::string, unsigned> getExternalService(const char *serviceName);
-
 // returns a vector of {pod-name, node-name} vectors,
 extern WORKUNIT_API std::vector<std::vector<std::string>> getPodNodes(const char *selector);
 #endif

+ 54 - 12
dali/base/dadfs.cpp

@@ -10435,6 +10435,8 @@ class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements I
     bool stopped;
     unsigned defaultTimeout;
     unsigned numThreads;
+    Owned<INode> dafileSrvNode;
+    CriticalSection dafileSrvNodeCS;
 
 public:
 
@@ -10764,10 +10766,12 @@ public:
         unsigned ver;
         if (mb.length()<mb.getPos()+sizeof(unsigned))
             ver = 0;
-        else {
+        else
+        {
             mb.read(ver);
             // this is a bit of a mess - for backward compatibility where user descriptor specified
-            if (ver>MDFS_GET_FILE_TREE_V2) {
+            if (ver>MDFS_GET_FILE_TREE_V2)
+            {
                 mb.reset(mb.getPos()-sizeof(unsigned));
                 ver = 0;
             }
@@ -10776,7 +10780,8 @@ public:
         if (queryTransactionLogging())
             transactionLog.log("%s", trc.str());
         Owned<IUserDescriptor> udesc;
-        if (mb.getPos()<mb.length()) {
+        if (mb.getPos()<mb.length())
+        {
             udesc.setown(createUserDescriptor());
             udesc->deserialize(mb);
         }
@@ -10785,17 +10790,49 @@ public:
         dlfn.set(lname);
         CDfsLogicalFileName *logicalname=&dlfn;
         Owned<IDfsLogicalFileNameIterator> redmatch;
-        for (;;) {
+        for (;;)
+        {
             StringBuffer tail;
             checkLogicalName(*logicalname,udesc,true,false,true,"getFileTree on");
             CScopeConnectLock sconnlock("getFileTree", *logicalname, false, false, false, defaultTimeout);
             IPropertyTree* sroot = sconnlock.conn()?sconnlock.conn()->queryRoot():NULL;
             logicalname->getTail(tail);
             Owned<IPropertyTree> tree = getNamedPropTree(sroot,queryDfsXmlBranchName(DXB_File),"@name",tail.str(),false);
-            if (tree) {
-                if (ver>=MDFS_GET_FILE_TREE_V2) {
+            if (tree)
+            {
+                if (ver>=MDFS_GET_FILE_TREE_V2)
+                {
                     Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(tree,&queryNamedGroupStore(),IFDSF_EXCLUDE_CLUSTERNAMES);
-                    if (fdesc) {
+                    if (fdesc)
+                    {
+#ifdef _CONTAINERIZED
+                        unsigned nc = fdesc->numClusters();
+                        for (unsigned c=0; c<nc; c++)
+                        {
+                            IClusterInfo *clusterInfo = fdesc->queryClusterNum(c);
+                            const char *planeName = clusterInfo->queryGroupName();
+                            Owned<IStoragePlane> plane = getDataStoragePlane(planeName, true);
+                            if (!plane->queryHosts() && isAbsolutePath(plane->queryPrefix())) // if host group, or url, don't touch
+                            {
+                                {
+                                    CriticalBlock b(dafileSrvNodeCS);
+                                    if (nullptr == dafileSrvNode)
+                                    {
+                                        auto externalService = getDafileServiceFromConfig("directio");
+                                        VStringBuffer dafilesrvEpStr("%s:%u", externalService.first.c_str(), externalService.second);
+                                        dafileSrvNode.setown(createINode(dafilesrvEpStr));
+                                    }
+                                }
+                                IGroup *oldGroup = clusterInfo->queryGroup();
+                                std::vector<INode *> nodes;
+                                for (unsigned n=0; n<oldGroup->ordinality(); n++)
+                                    nodes.push_back(dafileSrvNode);
+                                Owned<IGroup> newGroup = createIGroup((rank_t)oldGroup->ordinality(), &nodes[0]);
+                                clusterInfo->setGroup(newGroup); // NB: links
+                            }
+                        }
+#endif
+
                         ver = MDFS_GET_FILE_TREE_V2;
                         mb.append((int)-2).append(ver);
                         fdesc->serialize(mb);
@@ -10809,7 +10846,8 @@ public:
                     else
                         ver = 0;
                 }
-                if (ver==0) {
+                if (ver==0)
+                {
                     tree.setown(createPTreeFromIPT(tree));
                     StringBuffer cname;
                     logicalname->getCluster(cname);
@@ -10818,18 +10856,22 @@ public:
                 }
                 break;
             }
-            else {
+            else
+            {
                 tree.setown(getNamedPropTree(sroot,queryDfsXmlBranchName(DXB_SuperFile),"@name",tail.str(),false));
-                if (tree) {
+                if (tree)
+                {
                     tree->serialize(mb);
                     break;
                 }
             }
-            if (redmatch.get()) {
+            if (redmatch.get())
+            {
                 if (!redmatch->next())
                     break;
             }
-            else {
+            else
+            {
                 redmatch.setown(queryDistributedFileDirectory().queryRedirection().getMatch(logicalname->get()));
                 if (!redmatch.get())
                     break;

+ 1 - 1
dali/dfuplus/dfuplus.cpp

@@ -78,7 +78,7 @@ public:
     int run()
     {
         try {
-            server->run(securitySettings.queryDAFSConnectCfg(), listenep);
+            server->run(nullptr, securitySettings.queryDAFSConnectCfg(), listenep);
         }
         catch (IException *e) {
             EXCLOG(e,"dfuplus(dafilesrv)");

+ 1 - 1
esp/clients/wsdfuaccess/wsdfuaccess.cpp

@@ -666,7 +666,7 @@ protected:
             virtual void threadmain() override
             {
                 DAFSConnectCfg sslCfg = SSLNone;
-                server->run(sslCfg, socket, nullptr, nullptr);
+                server->run(nullptr, sslCfg, socket, nullptr, nullptr);
             }
         };
         Owned<IRemoteFileServer> server = createRemoteFileServer();

+ 3 - 27
esp/services/ws_dfu/ws_dfuService.cpp

@@ -6021,30 +6021,6 @@ SecAccessFlags translateToSecAccessFlags(CSecAccessType from)
     }
 }
 
-#ifdef _CONTAINERIZED
-std::pair<std::string, unsigned> getDafileServiceFromConfig(IFileDescriptor &fileDesc)
-{
-    /* NB: For now expect 1 dafilesrv in configuration only
-     * We could have multiple dafilesrv services with e.g. different specs./replicas etc. that
-     * serviced different planes. At the moment dafilesrv mounts all data planes.
-     */
-    Owned<IPropertyTreeIterator> dafilesrvServices = getGlobalConfigSP()->getElements("services[@type='dafilesrv']");
-    if (!dafilesrvServices->first())
-        throw makeStringException(-1, "dafilesrv service not defined");
-    const IPropertyTree &dafilesrv = dafilesrvServices->query();
-    if (!dafilesrv.getPropBool("@public"))
-        throw makeStringException(-1, "dafilesrv service has no public service defined");
-    StringBuffer dafilesrvName;
-    dafilesrv.getProp("@name", dafilesrvName);
-    auto externalService = getExternalService(dafilesrvName);
-    if (externalService.first.empty())
-        throw makeStringExceptionV(-1, "dafilesrv '%s': external service not found", dafilesrvName.str());
-    if (0 == externalService.second)
-        throw makeStringExceptionV(-1, "dafilesrv '%s': external service port not defined", dafilesrvName.str());
-    return externalService;
-}
-#endif
-
 void CWsDfuEx::dFUFileAccessCommon(IEspContext &context, const CDfsLogicalFileName &lfn, SessionId clientSessionId, const char *requestId, unsigned expirySecs, bool returnTextResponse, unsigned lockTimeoutMs, IEspDFUFileAccessResponse &resp)
 {
     double version = context.getClientVersion();
@@ -6083,7 +6059,7 @@ void CWsDfuEx::dFUFileAccessCommon(IEspContext &context, const CDfsLogicalFileNa
     if (!info)
         throw makeStringExceptionV(-1, "dFUFileAccessCommon: file signing certificate ('%s') not defined in configuration.", keyPairName.str());
 
-    auto externalService = getDafileServiceFromConfig(*fileDesc);
+    auto externalService = getDafileServiceFromConfig("stream");
     dafilesrvHost.set(externalService.first.c_str());
     port = externalService.second;
     secure = true;
@@ -6461,11 +6437,11 @@ bool CWsDfuEx::onDFUFileCreateV2(IEspContext &context, IEspDFUFileCreateV2Reques
             throw makeStringExceptionV(-1, "onDFUFileCreateV2: file signing certificate ('%s' ) not defined in configuration.", keyPairName.str());
 
         const char *planeName = clusterName;
-        unsigned numParts = 0; // in future perhaps client can specify, for now default is = to plane default (fefaultSprayParts)
+        unsigned numParts = 0; // in future perhaps client can specify, for now default is = to plane default (defaultSprayParts)
         fileDesc.setown(createFileDescriptor(tempFileName, planeName, numParts));
         numParts = fileDesc->numParts();
 
-        auto externalService = getDafileServiceFromConfig(*fileDesc);
+        auto externalService = getDafileServiceFromConfig("stream");
         dafilesrvHost.set(externalService.first.c_str());
         port = externalService.second;
         secure = true;

+ 10 - 8
fs/dafilesrv/dafilesrv.cpp

@@ -391,7 +391,8 @@ int main(int argc, const char* argv[])
     unsigned short  sslport;
     unsigned dedicatedRowServicePort = DEFAULT_ROWSERVICE_PORT;
 #ifdef _CONTAINERIZED
-    connectMethod = SSLOnly;
+    bool directIO = strsame(config->queryProp("@application"), "directio");
+    connectMethod = directIO ? SSLNone : SSLOnly;
     dedicatedRowServicePort = 0; // row service always runs on same secure ssl port in containerized mode
     port = 0;
     sslport = config->getPropInt("service/@port", SECURE_DAFILESRV_PORT);
@@ -648,6 +649,7 @@ int main(int argc, const char* argv[])
         {
             bool stopped;
             bool started;
+            Linked<IPropertyTree> config;
             DAFSConnectCfg connectMethod;
             SocketEndpoint listenep;
             unsigned maxThreads;
@@ -685,7 +687,7 @@ int main(int argc, const char* argv[])
 
         public:
 
-            cserv(DAFSConnectCfg _connectMethod, SocketEndpoint _listenep,
+            cserv(IPropertyTree *_config, DAFSConnectCfg _connectMethod, SocketEndpoint _listenep,
                         unsigned _maxThreads, unsigned _maxThreadsDelayMs, unsigned _maxAsyncCopy,
                         unsigned _parallelRequestLimit, unsigned _throttleDelayMs, unsigned _throttleCPULimit,
                         unsigned _parallelSlowRequestLimit, unsigned _throttleSlowDelayMs, unsigned _throttleSlowCPULimit,
@@ -693,7 +695,7 @@ int main(int argc, const char* argv[])
                         IPropertyTree *_keyPairInfo,
                         const char *_rowServiceConfiguration,
                         unsigned _dedicatedRowServicePort, bool _dedicatedRowServiceSSL, bool _rowServiceOnStdPort)
-            : connectMethod(_connectMethod), listenep(_listenep), pollthread(this),
+                : config(_config), connectMethod(_connectMethod), listenep(_listenep), pollthread(this),
                   maxThreads(_maxThreads), maxThreadsDelayMs(_maxThreadsDelayMs), maxAsyncCopy(_maxAsyncCopy),
                   parallelRequestLimit(_parallelRequestLimit), throttleDelayMs(_throttleDelayMs), throttleCPULimit(_throttleCPULimit),
                   parallelSlowRequestLimit(_parallelSlowRequestLimit), throttleSlowDelayMs(_throttleSlowDelayMs), throttleSlowCPULimit(_throttleSlowCPULimit),
@@ -773,10 +775,10 @@ int main(int argc, const char* argv[])
                     {
                         SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
                         rowServiceEp.port = dedicatedRowServicePort;
-                        server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
+                        server->run(config, connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
                     }
                     else
-                        server->run(connectMethod, listenep, sslport);
+                        server->run(config, connectMethod, listenep, sslport);
                 }
                 catch (IException *e)
                 {
@@ -786,7 +788,7 @@ int main(int argc, const char* argv[])
                 PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Stopped");
                 stopped = true;
             }
-        } service(connectMethod, listenep,
+        } service(config, connectMethod, listenep,
                 maxThreads, maxThreadsDelayMs, maxAsyncCopy,
                 parallelRequestLimit, throttleDelayMs, throttleCPULimit,
                 parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit, sslport,
@@ -870,10 +872,10 @@ int main(int argc, const char* argv[])
         {
             SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
             rowServiceEp.port = dedicatedRowServicePort;
-            server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
+            server->run(config, connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
         }
         else
-            server->run(connectMethod, listenep, sslport);
+            server->run(config, connectMethod, listenep, sslport);
     }
     catch (IException *e)
     {

+ 28 - 9
fs/dafsserver/dafsserver.cpp

@@ -3424,7 +3424,8 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
     Owned<ISocket>      acceptsock;
     Owned<ISocket>      securesock;
     Owned<ISocket>      rowServiceSock;
-
+    Linked<IPropertyTree> componentConfig;
+    bool directIO = true;
     bool rowServiceOnStdPort = true; // should row service commands be processed on std. service port
     bool rowServiceSSL = false;
 
@@ -4786,7 +4787,7 @@ public:
             }
 
 #ifdef _CONTAINERIZED
-            bool authorizedOnly = true;
+            bool authorizedOnly = !directIO; // unless in directIO application mode, only allow signed requests
 #else
             /* NB: In bare-metal, unless client call is on dedicated service, allow non-authorized requests through, e.g. from engines talking to unsecured port
              * In a locked down secure setup, this service will be configured on a dedicated port, and the std. insecure dafilesrv will be unreachable.
@@ -5017,9 +5018,23 @@ public:
         unsigned posOfErr = reply.length();
         try
         {
-            switch(cmd)
+            /* isRowServiceClient only set for bare-metal clients
+             * Check they aren't sending a non-rowservice command to the row service
+             */
+            switch (cmd)
+            {
+                case RFCStreamGeneral:
+                case RFCStreamRead:
+                case RFCStreamReadJSON:
+                    break;
+                default:
+                    if (!directIO || client->isRowServiceClient())
+                        throw createDafsException(DAFSERR_cmdstream_unauthorized, "Unauthorized command");
+                    break;
+            }
+
+            switch (cmd)
             {
-#ifndef _CONTAINERIZED // only bare-metal for now
                 MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
                 MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
                 MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
@@ -5062,7 +5077,6 @@ public:
                     cmdStreamReadTestSocket(msg, reply, *client, *stats);
                     break;
                 }
-#endif
                 // row service commands
                 case RFCStreamGeneral:
                 {
@@ -5109,7 +5123,7 @@ public:
             handleTracer.traceIfReady();
     }
 
-    virtual void run(DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
+    virtual void run(IPropertyTree *componentConfig, DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
     {
         SocketEndpoint sslep(listenep);
 #ifndef _CONTAINERIZED
@@ -5196,14 +5210,19 @@ public:
 #endif
         }
 
-        run(_connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
+        run(componentConfig, _connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
     }
 
-    virtual void run(DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock) override
+    virtual void run(IPropertyTree *_componentConfig, DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock) override
     {
         acceptsock.setown(_acceptSock);
         securesock.setown(_secureSock);
         rowServiceSock.setown(_rowServiceSock);
+        componentConfig.set(_componentConfig);
+#ifdef _CONTAINERIZED
+        if (componentConfig) // will be null in some scenarios (test cases)
+            directIO = strsame(componentConfig->queryProp("@application"), "directio");
+#endif
         if (_connectMethod != SSLOnly)
         {
             if (!acceptsock)
@@ -5727,7 +5746,7 @@ protected:
             virtual void threadmain() override
             {
                 DAFSConnectCfg sslCfg = SSLNone;
-                server->run(sslCfg, socket, nullptr, nullptr);
+                server->run(nullptr, sslCfg, socket, nullptr, nullptr);
             }
         };
         Owned<IRemoteFileServer> server = createRemoteFileServer();

+ 2 - 2
fs/dafsserver/dafsserver.hpp

@@ -58,8 +58,8 @@ enum RowServiceCfg
 
 interface IRemoteFileServer : extends IInterface
 {
-    virtual void run(DAFSConnectCfg connectMethod, const SocketEndpoint &listenep, unsigned sslPort=0, const SocketEndpoint *rowServiceEp=nullptr, bool rowServiceSSL=false, bool rowServiceOnStdPort=true) = 0;
-    virtual void run(DAFSConnectCfg _connectMethod, ISocket *listenSocket, ISocket *secureSocket, ISocket *rowServiceSocket) = 0;
+    virtual void run(IPropertyTree *componentConfig, DAFSConnectCfg connectMethod, const SocketEndpoint &listenep, unsigned sslPort=0, const SocketEndpoint *rowServiceEp=nullptr, bool rowServiceSSL=false, bool rowServiceOnStdPort=true) = 0;
+    virtual void run(IPropertyTree *componentConfig, DAFSConnectCfg _connectMethod, ISocket *listenSocket, ISocket *secureSocket, ISocket *rowServiceSocket) = 0;
     virtual void stop() = 0;
     virtual unsigned idleTime() = 0; // in ms
     virtual void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs=DEFAULT_STDCMD_THROTTLEDELAYMS, unsigned cpuThreshold=DEFAULT_STDCMD_THROTTLECPULIMIT, unsigned queueLimit=DEFAULT_STDCMD_THROTTLEQUEUELIMIT) = 0;

+ 6 - 3
helm/hpcc/templates/_helpers.tpl

@@ -577,7 +577,10 @@ Add config arg for a component
 Add dali arg for a component
 */}}
 {{- define "hpcc.daliArg" -}}
-"--daliServers={{ (index .Values.dali 0).name }}"
+{{- $dali := (index .Values.dali 0) -}}
+{{- $daliService := $dali.service | default dict -}}
+{{- $daliServicePort := $daliService.servicePort | default 7070 -}}
+"--daliServers={{ (index .Values.dali 0).name }}:{{ $daliServicePort }}"
 {{- end -}}
 
 {{/*
@@ -885,8 +888,8 @@ Generate list of available services
 {{- range $.Values.dafilesrv -}}
  {{- if not .disabled -}}
 - name: {{ .name }}
-  type: dafilesrv
-  port: {{ .servicePort | default 7600 }}
+  type: {{ .application | default "stream" }}
+  port: {{ .service.servicePort | default 7600 }}
   public: {{ (ne ( include "hpcc.isVisibilityPublic" (dict "root" $ "visibility" .service.visibility))  "") | ternary "true" "false" }}
  {{- end -}}
 {{- end -}}

+ 14 - 7
helm/hpcc/templates/dafilesrv.yaml

@@ -21,18 +21,17 @@ data:
 {{- if not .disabled -}}
 {{- $env := concat ($.Values.global.env | default list) (.env | default list) -}}
 {{- $commonCtx := dict "root" $ "me" . "env" $env "exposure" "local" "visibility" .service.visibility "includeCategories" (list "data") -}}
-{{- if ($.Values.certificates | default dict).enabled -}}
+{{- $_ := set $commonCtx "certificatesEnabled" (($.Values.certificates | default dict).enabled) -}}
+{{- if $commonCtx.certificatesEnabled -}}
  {{- $externalCert := ne (include "hpcc.isVisibilityPublic" $commonCtx) "" -}}
  {{- $issuerName := ternary "public" "local" $externalCert -}}
  {{- $issuer := get $.Values.certificates.issuers $issuerName -}}
- {{- if $issuer -}}
+ {{- if $issuer }}
   {{- $_ := set $commonCtx "exposure" (ternary "public" "local" $externalCert) -}}
- {{- else -}}
+ {{ else }}
   {{- $_ := fail (printf "dafilesrv - unable to locate issuer '%s'" $issuerName) -}}
- {{- end -}}
-{{- else -}}
- {{- $_ := fail (printf "dafilesrv - certificates must be enabled to use") -}}
-{{- end -}}
+ {{ end }}
+{{ end }}
 {{- $configSHA := include "hpcc.getConfigSHA" ($commonCtx | merge (dict "configMapHelper" "hpcc.dafilesrvConfigMap" "component" "dafilesrv" "excludeKeys" "global")) }}
 apiVersion: apps/v1
 kind: Deployment
@@ -72,11 +71,19 @@ spec:
         volumeMounts:
 {{ include "hpcc.addConfigMapVolumeMount" . | indent 8 }}
 {{ include "hpcc.addVolumeMounts" $commonCtx | indent 8 }}
+{{- if $commonCtx.certificatesEnabled }}
 {{ include "hpcc.addCertificateVolumeMount" (dict "root" $ "name" .name "component" "dafilesrv" "certificate" .certificate "visibility" .service.visibility) | indent 8 }}
+{{- else }}
+ {{- if eq .application "stream" }}
+  {{- $_ := fail (printf "dafilesrv[application=stream]- certificates must be enabled to use") -}}
+ {{- end }}
+{{- end }}
       volumes:
 {{ include "hpcc.addConfigMapVolume" . | indent 6 }}
 {{ include "hpcc.addVolumes" $commonCtx | indent 6 }}
+{{- if $commonCtx.certificatesEnabled }}
 {{ include "hpcc.addCertificateVolume" (dict "root" $ "name" .name "component" "dafilesrv" "certificate" .certificate "visibility" .service.visibility) | indent 6 }}
+{{- end }}
 ---
 kind: ConfigMap 
 {{ include "hpcc.generateConfig" ($commonCtx | merge (dict "configMapHelper" "hpcc.dafilesrvConfigMap")) }}

+ 25 - 16
helm/hpcc/templates/dali.yaml

@@ -81,7 +81,7 @@ spec:
 {{- end }}
     spec:
       {{- include "hpcc.placementsByPodTargetType" (dict "root" $ "pod" $dali.name "type" "dali") | indent 6 }}
-      serviceAccountName: "hpcc-default"
+      serviceAccountName: "hpcc-dali"
       initContainers: 
       {{- include "hpcc.changePlaneMountPerms" (dict "root" $ "includeCategories" $tmpDaliScope.aggregatePlaneCategories "includeNames" $tmpDaliScope.aggregateSashaNamedPlanes) | indent 6 }}
       {{- include "hpcc.addImagePullSecrets" $commonCtx | nindent 6 -}}
@@ -186,22 +186,31 @@ kind: ConfigMap
 {{- end }}
 {{- end }}
 {{- end }}
-apiVersion: v1
-kind: Service
-metadata:
-  name: {{ .name | quote }}
-  labels:
-    helmVersion: 8.6.3-closedown0
-spec:
-  ports:
-  - port: 7070
-    protocol: TCP
-    targetPort: 7070
-  selector:
-    server: {{ .name | quote }}
-  type: ClusterIP
+{{/*
+Expose dali as a external service, only if there is a service definition and dafilesrv directio service is active.
+*/}}
+{{- $dafilesrvCtx := dict -}}
+{{- $service := deepCopy (.service | default dict) -}}
+{{- $_ := set $service "visibility" ($service.visibility | default "cluster") -}}
+{{- $_ := set $service "servicePort" ($service.servicePort | default 7070) -}}
+{{- if and (hasKey . "service") (not (eq $service.visibility "cluster")) -}}
+ {{ range $.Values.dafilesrv -}}
+  {{- if not .disabled -}}
+   {{- if (eq "directio" .application) -}}
+    {{- $_ := set $dafilesrvCtx "directAccess" true -}}
+   {{- end -}}
+  {{- end -}}
+ {{- end -}}
+{{- end }}
+{{- if $dafilesrvCtx.directAccess -}}
+ {{- if not (hasKey $service "labels") -}}
+  {{- $_ := set $service "labels" dict -}}
+ {{- end -}}
+ {{- $_ := set $service "labels" (merge $service.labels (dict "server" .name)) -}}
+{{- end }}
+{{- include "hpcc.addService" ( dict "root" $ "name" .name "service" $service "selector" .name "defaultPort" 7070 ) | nindent 0 }}
 ---
-{{ include "hpcc.addCertificate" (dict "root" $ "name" .name "service" .service "component" "dali" "external" false) }}
+{{ include "hpcc.addCertificate" (dict "root" $ "name" .name "service" $service "component" "dali" "external" false) }}
 
 {{- end }}
 {{- end }}

+ 2 - 2
helm/hpcc/templates/eclagent.yaml

@@ -67,7 +67,7 @@ data:
 {{- end }}
         spec:
           {{- include "hpcc.placementsByJobTargetType" (dict "root" .root "job" $appJobName "target" .me.name "type" "eclagent") | indent 10 }}
-          serviceAccountName: "hpcc-default"
+          serviceAccountName: "hpcc-agent"
 {{- if $misc.postJobCommandViaSidecar }}
           shareProcessNamespace: true
 {{- end }}            
@@ -142,7 +142,7 @@ spec:
 {{- end }}
     spec:
       {{- include "hpcc.placementsByPodTargetType" (dict "root" $ "pod" .name "target" .name "type" "eclagent") | indent 6 }}
-      serviceAccountName: {{ .useChildProcesses | default false | ternary "hpcc-default" "hpcc-agent" }}
+      serviceAccountName: "hpcc-agent"
       initContainers:
       {{- include "hpcc.changePlaneMountPerms" $commonCtx | indent 6 }}
       {{- include "hpcc.addImagePullSecrets" $commonCtx | nindent 6 -}}

+ 32 - 4
helm/hpcc/templates/service-account.yaml

@@ -39,10 +39,10 @@ metadata:
   name: hpcc-agent
 rules:
   - apiGroups: [ "" ] # core API group
-    resources: [ "pods" ]
+    resources: [ "pods", "services" ]
     verbs: [ "get", "list" ]
   - apiGroups: [ "batch" ]
-    resources: [ "jobs" ]
+    resources: [ "jobs", "services" ]
     verbs: [ "get", "create", "list", "delete", "watch" ]
 ---
 apiVersion: rbac.authorization.k8s.io/v1
@@ -71,13 +71,13 @@ metadata:
   name: hpcc-thoragent
 rules:
   - apiGroups: [ "" ] # core API group
-    resources: [ "pods" ]
+    resources: [ "pods", "services" ]
     verbs: [ "get", "list", "create" ]
   - apiGroups: [ "networking.k8s.io" ]
     resources: [ "networkpolicies" ]
     verbs: [ "get", "create", "delete" ]
   - apiGroups: [ "batch" ]
-    resources: [ "jobs" ]
+    resources: [ "jobs", "services" ]
     verbs: [ "get", "create", "list", "delete", "watch" ]
 ---
 apiVersion: rbac.authorization.k8s.io/v1
@@ -120,3 +120,31 @@ subjects:
   - kind: ServiceAccount
     name: hpcc-esp-service
     namespace: {{ .Release.Namespace }}
+---
+# The hpcc-dali service account is used by dali components that need to be able to find the external directio dafilesrv service
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: hpcc-dali
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: hpcc-dali
+rules:
+  - apiGroups: [ "" ] # core API group
+    resources: [ "services" ]
+    verbs: [ "get", "list" ]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+  name: hpcc-dali
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: Role
+  name: hpcc-dali
+subjects:
+  - kind: ServiceAccount
+    name: hpcc-dali
+    namespace: {{ .Release.Namespace }}

+ 7 - 0
helm/hpcc/values.schema.json

@@ -887,6 +887,10 @@
       "type": "object",
       "required": [ "servicePort", "visibility" ],
       "properties": {
+        "application": {
+          "type": "string",
+          "description": "Application name"
+        },
         "port": {
           "type": "integer",
           "description": "The local port used by the pod",
@@ -963,6 +967,9 @@
           "description": "sasha services",
           "type": "object",
           "$ref": "#/definitions/sashaservice"
+        },
+        "service": {
+          "$ref": "#/definitions/service"
         }
       }
     },

+ 11 - 1
helm/hpcc/values.yaml

@@ -338,11 +338,21 @@ bundles: []
 # - name: DataPatterns
 
 dafilesrv:
-- name: mydafilesrv
+- name: rowservice
   disabled: true # disabled by default because requires cert-manager etc. (see certificates section)
+  application: stream
   service:
     servicePort: 7600
     visibility: local
+
+ # Enabling this service exposes a dafilesrv and Dali for foreign file access for backward compatibility.
+ # If enabled, both services should be secured with strict ingress rules.
+- name: direct-access
+  disabled: true
+  application: directio
+  service:
+    servicePort: 7200
+    visibility: local
     
 
 dali:

+ 88 - 0
system/jlib/jmisc.cpp

@@ -1021,3 +1021,91 @@ jlib_decl char **getSystemEnv()
     return environ;
 #endif
 }
+
+
+#ifdef _CONTAINERIZED
+// NB: will fire an exception if command fails (returns non-zero exit code)
+void runKubectlCommand(const char *title, const char *cmd, const char *input, StringBuffer *output)
+{
+    StringBuffer _output, error;
+    if (!output)
+        output = &_output;
+    unsigned ret = runExternalCommand(title, *output, error, cmd, input, ".", nullptr);
+    if (output->length())
+        MLOG(MCExtraneousInfo, unknownJob, "%s: ret=%u, stdout=%s", cmd, ret, output->trimRight().str());
+    if (error.length())
+        MLOG(MCinternalError, unknownJob, "%s: ret=%u, stderr=%s", cmd, ret, error.trimRight().str());
+    if (ret)
+    {
+        if (input)
+            MLOG(MCinternalError, unknownJob, "Using input %s", input);
+        throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", cmd, ret, error.str());
+    }
+}
+
+static CTimeLimitedCache<std::string, std::pair<std::string, unsigned>> externalServiceCache;
+static CriticalSection externalServiceCacheCrit;
+std::pair<std::string, unsigned> getExternalService(const char *serviceName)
+{
+    {
+        CriticalBlock b(externalServiceCacheCrit);
+        std::pair<std::string, unsigned> cachedExternalSevice;
+        if (externalServiceCache.get(serviceName, cachedExternalSevice))
+            return cachedExternalSevice;
+    }
+
+    StringBuffer output;
+    try
+    {
+        VStringBuffer getServiceCmd("kubectl get svc --selector=server=%s --output=jsonpath={.items[0].status.loadBalancer.ingress[0].hostname},{.items[0].status.loadBalancer.ingress[0].ip},{.items[0].spec.ports[0].port}", serviceName);
+        runKubectlCommand("get-external-service", getServiceCmd, nullptr, &output);
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e);
+        VStringBuffer exceptionText("Failed to get external service for '%s'. Error: [%d, ", serviceName, e->errorCode());
+        e->errorMessage(exceptionText).append("]");
+        e->Release();
+        throw makeStringException(-1, exceptionText);
+    }
+    StringArray fields;
+    fields.appendList(output, ",");
+
+    // NB: add even if no result, want non-result to be cached too
+    std::string host, port;
+    if (fields.ordinality() == 3) // hostname,ip,port. NB: hostname may be missing, but still present as a blank field
+    {
+        host = fields.item(0); // hostname
+        if (0 == host.length())
+            host = fields.item(1); // ip
+        port = fields.item(2);
+    }
+    auto servicePair = std::make_pair(host, atoi(port.c_str()));
+    externalServiceCache.add(serviceName, servicePair);
+    return servicePair;
+}
+
+
+std::pair<std::string, unsigned> getDafileServiceFromConfig(const char *application)
+{
+    /* NB: For now expect 1 dafilesrv in configuration only
+     * We could have multiple dafilesrv services with e.g. different specs./replicas etc. that
+     * serviced different planes. At the moment dafilesrv mounts all data planes.
+     */
+    VStringBuffer serviceXPath("services[@type='%s']", application);
+    Owned<IPropertyTreeIterator> dafilesrvServices = getGlobalConfigSP()->getElements(serviceXPath);
+    if (!dafilesrvServices->first())
+        throw makeStringException(-1, "dafilesrv service not defined");
+    const IPropertyTree &dafilesrv = dafilesrvServices->query();
+    if (!dafilesrv.getPropBool("@public"))
+        throw makeStringException(-1, "dafilesrv service has no public service defined");
+    StringBuffer dafilesrvName;
+    dafilesrv.getProp("@name", dafilesrvName);
+    auto externalService = getExternalService(dafilesrvName);
+    if (externalService.first.empty())
+        throw makeStringExceptionV(-1, "dafilesrv '%s': external service not found", dafilesrvName.str());
+    if (0 == externalService.second)
+        throw makeStringExceptionV(-1, "dafilesrv '%s': external service port not defined", dafilesrvName.str());
+    return externalService;
+}
+#endif

+ 9 - 0
system/jlib/jmisc.hpp

@@ -333,4 +333,13 @@ extern jlib_decl char *mkdtemp(char *_template);
 #endif
 
 extern jlib_decl char **getSystemEnv();
+
+
+extern jlib_decl void runKubectlCommand(const char *title, const char *cmd, const char *input, StringBuffer *output);
+
+// return the k8s external host and port for serviceName
+extern jlib_decl std::pair<std::string, unsigned> getExternalService(const char *serviceName);
+
+extern jlib_decl std::pair<std::string, unsigned> getDafileServiceFromConfig(const char *application);
+
 #endif

+ 1 - 1
thorlcr/slave/thslavemain.cpp

@@ -558,7 +558,7 @@ int main( int argc, const char *argv[]  )
                     try
                     {
                         PROGLOG("Starting dafilesrv");
-                        dafsInstance->run(SSLNone, listenEp);
+                        dafsInstance->run(nullptr, SSLNone, listenEp);
                     }
                     catch (IException *e)
                     {