Explorar el Código

HPCC-12937 Look up replication scheme when cloning roxie subfiles

Signed-off-by: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Anthony Fishbeck hace 10 años
padre
commit
1af1ef6293

+ 9 - 10
common/workunit/referencedfilelist.cpp

@@ -167,7 +167,7 @@ public:
             ep.set(NULL);
         return ep;
     }
-    virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite=false, bool cloneForeign=false);
+    virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
     void cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *user, INode *remote, bool overwrite);
     virtual const char *queryPackageId() const {return pkgid.get();}
     virtual __int64 getFileSize()
@@ -227,11 +227,12 @@ public:
     void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
 
     virtual IReferencedFileIterator *getFiles();
-    virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false);
+    virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
+
     virtual void cloneRelationships();
-    virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false)
+    virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
     {
-        cloneFileInfo(helper, overwrite, cloneSuperInfo, cloneForeign);
+        cloneFileInfo(helper, overwrite, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
         cloneRelationships();
     }
     virtual void resolveFiles(const char *process, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool resolveForeign=false);
@@ -407,7 +408,7 @@ void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUs
         resolveLocal(dstCluster, srcCluster, user, subfiles);
 }
 
-void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign)
+void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
 {
     if ((flags & RefFileCloned) || (flags & RefFileSuper) || (flags & RefFileInPackage))
         return;
@@ -424,9 +425,7 @@ void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const
         if (filePrefix.length())
             srcLFN.append(filePrefix.str()).append("::");
         srcLFN.append(logicalName.str());
-
-        helper->createSingleFileClone(srcLFN, srcCluster, logicalName, dstCluster, filePrefix,
-            DFUcpdm_c_replicated_by_d, true, NULL, user, daliip, NULL, overwrite, false);
+        helper->cloneRoxieSubFile(srcLFN, srcCluster, logicalName, dstCluster, filePrefix, redundancy, channelsPerNode, replicateOffset, defReplicateFolder, user, daliip, overwrite);
         flags |= RefFileCloned;
     }
     catch (IException *e)
@@ -707,11 +706,11 @@ void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP
         resolveSubFiles(subfiles, checkLocalFirst, resolveForeign);
 }
 
-void ReferencedFileList::cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign)
+void ReferencedFileList::cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
 {
     ReferencedFileIterator files(this);
     ForEach(files)
-        files.queryObject().cloneInfo(helper, user, process, srcCluster, overwrite, cloneForeign);
+        files.queryObject().cloneInfo(helper, user, process, srcCluster, overwrite, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
     if (cloneSuperInfo)
         ForEach(files)
             files.queryObject().cloneSuperInfo(this, user, remote, overwrite);

+ 2 - 2
common/workunit/referencedfilelist.hpp

@@ -60,8 +60,8 @@ interface IReferencedFileList : extends IInterface
 
     virtual IReferencedFileIterator *getFiles()=0;
     virtual void resolveFiles(const char *process, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool resolveForeign=false)=0;
-    virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false)=0;
-    virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false)=0;
+    virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
+    virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
     virtual void cloneRelationships()=0;
 };
 

+ 35 - 1
common/workunit/workunit.cpp

@@ -4809,11 +4809,14 @@ class CEnvironmentClusterInfo: public CInterface, implements IConstWUClusterInfo
     StringBuffer ldapPassword;
     ClusterType platform;
     unsigned clusterWidth;
+    unsigned roxieRedundancy;
+    unsigned channelsPerNode;
+    int roxieReplicateOffset;
 
 public:
     IMPLEMENT_IINTERFACE;
     CEnvironmentClusterInfo(const char *_name, const char *_prefix, IPropertyTree *agent, IArrayOf<IPropertyTree> &thors, IPropertyTree *roxie)
-        : name(_name), prefix(_prefix)
+        : name(_name), prefix(_prefix), roxieRedundancy(0), channelsPerNode(0), roxieReplicateOffset(1)
     {
         StringBuffer queue;
         if (thors.ordinality())
@@ -4857,6 +4860,24 @@ public:
             StringBuffer encPassword = roxie->queryProp("@ldapPassword");
             if (encPassword.length())
                 decrypt(ldapPassword, encPassword);
+            const char *redundancyMode = roxie->queryProp("@slaveConfig");
+            if (redundancyMode && *redundancyMode)
+            {
+                unsigned dataCopies = roxie->getPropInt("@numDataCopies", 1);
+                if (strieq(redundancyMode, "overloaded"))
+                    channelsPerNode = roxie->getPropInt("@channelsPernode", 1);
+                else if (strieq(redundancyMode, "full redundancy"))
+                {
+                    roxieRedundancy = dataCopies-1;
+                    roxieReplicateOffset = 0;
+                }
+                else if (strieq(redundancyMode, "cyclic redundancy"))
+                {
+                    roxieRedundancy = dataCopies-1;
+                    channelsPerNode = dataCopies;
+                    roxieReplicateOffset = roxie->getPropInt("@cyclicOffset", 1);
+                }
+            }
         }
         else 
         {
@@ -4874,6 +4895,7 @@ public:
         // MORE - does this need to be conditional?
         serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
     }
+
     IStringVal & getName(IStringVal & str) const
     {
         str.set(name.get());
@@ -4925,6 +4947,18 @@ public:
     {
         return roxieServers;
     }
+    unsigned getRoxieRedundancy() const
+    {
+        return roxieRedundancy;
+    }
+    unsigned getChannelsPerNode() const
+    {
+        return channelsPerNode;
+    }
+    int getRoxieReplicateOffset() const
+    {
+        return roxieReplicateOffset;
+    }
     const char *getLdapUser() const
     {
         return ldapUser.get();

+ 3 - 0
common/workunit/workunit.hpp

@@ -546,6 +546,9 @@ interface IConstWUClusterInfo : extends IInterface
     virtual const SocketEndpointArray & getRoxieServers() const = 0;
     virtual const char *getLdapUser() const = 0;
     virtual const char *getLdapPassword() const = 0;
+    virtual unsigned getRoxieRedundancy() const = 0;
+    virtual unsigned getChannelsPerNode() const = 0;
+    virtual int getRoxieReplicateOffset() const = 0;
 };
 
 //! IWorkflowItem

+ 26 - 0
dali/dfu/dfuutil.cpp

@@ -1019,6 +1019,32 @@ public:
         cloner.cloneFile(srcname,dstname);
     }
 
+    virtual void cloneRoxieSubFile(const char *srcLFN,             // src LFN (can't be super)
+                         const char *srcCluster,
+                         const char *dstLFN,                       // dst LFN
+                         const char *dstCluster,                   // group name of roxie cluster
+                         const char *prefix,
+                         unsigned redundancy,                      // Number of "spare" copies of the data
+                         unsigned channelsPerNode,                 // Overloaded and cyclic modes
+                         int replicateOffset,                      // Used In cyclic mode only
+                         const char *defReplicateFolder,
+                         IUserDescriptor *userdesc,                // user desc for local dali
+                         const char *foreigndali,                  // can be omitted if srcname foreign or local
+                         bool overwrite                            // overwrite destination if exists
+                         )
+    {
+        DBGLOG("cloneRoxieSubFile src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=false", srcLFN, srcCluster, dstLFN, dstCluster, prefix, overwrite);
+        CFileCloner cloner;
+        cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, overwrite, false);
+        cloner.spec1.setRoxie(redundancy, channelsPerNode, replicateOffset);
+        if (defReplicateFolder)
+            cloner.spec1.setDefaultReplicateDir(defReplicateFolder);
+        cloner.srcCluster.set(srcCluster);
+        cloner.prefix.set(prefix);
+        cloner.cloneFile(srcLFN, dstLFN);
+    }
+
+
     void cloneFileRelationships(
         const char *foreigndali,     // where src relationships are retrieved from (can be NULL for local)
         StringArray &srcfns,             // file names on source

+ 15 - 0
dali/dfu/dfuutil.hpp

@@ -70,6 +70,21 @@ interface IDFUhelper: extends IInterface
                          bool dophysicalcopy=false          // NB *not* using DFU server (so use with care)
                          ) = 0;
 
+
+    virtual void cloneRoxieSubFile(const char *srcLFN,             // src LFN (can't be super)
+                         const char *srcCluster,
+                         const char *dstLFN,                       // dst LFN
+                         const char *dstCluster,                   // group name of roxie cluster
+                         const char *prefix,
+                         unsigned redundancy,                      // Number of "spare" copies of the data
+                         unsigned channelsPerNode,                 // Overloaded and cyclic modes
+                         int replicateOffset,                      // Used In cyclic mode only
+                         const char *defReplicateFolder,
+                         IUserDescriptor *userdesc,                // user desc for local dali
+                         const char *foreigndali,                  // can be omitted if srcname foreign or local
+                         bool overwrite                            // overwrite destination if exists
+                         ) = 0;
+
     virtual void cloneFileRelationships(
         const char *foreigndali,        // where src relationships are retrieved from (can be NULL for local)
         StringArray &srcfns,            // file names on source

+ 5 - 1
esp/services/ws_packageprocess/ws_packageprocessService.cpp

@@ -140,8 +140,12 @@ void cloneFileInfoToDali(StringArray &notFound, IPropertyTree *packageMap, const
     SCMStringBuffer processName;
     dstInfo->getRoxieProcess(processName);
     wufiles->resolveFiles(processName.str(), lookupDaliIp, remotePrefix, srcCluster, !overWrite, false);
+
+    StringBuffer defReplicateFolder;
+    getConfigurationDirectory(NULL, "data2", "roxie", processName.str(), defReplicateFolder);
+
     Owned<IDFUhelper> helper = createIDFUhelper();
-    wufiles->cloneAllInfo(helper, overWrite, true);
+    wufiles->cloneAllInfo(helper, overWrite, true, false, dstInfo->getRoxieRedundancy(), dstInfo->getChannelsPerNode(), dstInfo->getRoxieReplicateOffset(), defReplicateFolder);
 
     Owned<IReferencedFileIterator> iter = wufiles->getFiles();
     ForEach(*iter)

+ 11 - 2
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -690,8 +690,10 @@ void copyQueryFilesToCluster(IEspContext &context, IConstWorkUnit *cw, const cha
             queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
         wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, queryname);
         wufiles->resolveFiles(process.str(), remoteIP, remotePrefix, srcCluster, !overwrite, true, true);
+        StringBuffer defReplicateFolder;
+        getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
         Owned<IDFUhelper> helper = createIDFUhelper();
-        wufiles->cloneAllInfo(helper, overwrite, true, true);
+        wufiles->cloneAllInfo(helper, overwrite, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
     }
 }
 
@@ -2246,7 +2248,14 @@ public:
         {
             wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !overwriteDfs, true, true);
             Owned<IDFUhelper> helper = createIDFUhelper();
-            wufiles->cloneAllInfo(helper, overwriteDfs, true, true);
+            Owned <IConstWUClusterInfo> cl = getTargetClusterInfo(target);
+            if (cl)
+            {
+                SCMStringBuffer process;
+                StringBuffer defReplicateFolder;
+                getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
+                wufiles->cloneAllInfo(helper, overwriteDfs, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
+            }
         }
     }
 private: