فهرست منبع

HPCC-11443 Allow Roxie to specify remote cluster priorities

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 سال پیش
والد
کامیت
60dee94188

+ 6 - 0
initfiles/componentfiles/configxml/RoxieTopology.xsl

@@ -146,6 +146,12 @@
                     <xsl:copy-of select="@regex"/>
                 </xsl:copy>
             </xsl:for-each>
+            <xsl:for-each select="PreferredCluster">
+                <xsl:copy>
+                    <xsl:copy-of select="@name"/>
+                    <xsl:copy-of select="@priority"/>
+                </xsl:copy>
+            </xsl:for-each>
             <xsl:for-each select="RoxieFarmProcess">
                 <xsl:element name="RoxieFarmProcess">
                     <xsl:copy-of select="@*[name()!='name' and name()!='level']"/>

+ 25 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -212,6 +212,31 @@
             </xs:attribute>
           </xs:complexType>
         </xs:element>
+        <xs:element name="PreferredCluster" maxOccurs="unbounded">
+          <xs:annotation>
+            <xs:appinfo>
+              <title>Preferred Clusters</title>
+            </xs:appinfo>
+          </xs:annotation>
+          <xs:complexType>
+            <xs:attribute name="name" type="xs:string" use="required" >
+              <xs:annotation>
+                <xs:appinfo>
+                  <tooltip>Name of the cluster</tooltip>
+                                    <colIndex>1</colIndex>
+                </xs:appinfo>
+              </xs:annotation>
+            </xs:attribute>
+            <xs:attribute name="priority" type="xs:integer" use="required">
+              <xs:annotation>
+                <xs:appinfo>
+                  <tooltip>Priority (negative to disable)</tooltip>
+                                    <colIndex>2</colIndex>
+                </xs:appinfo>
+              </xs:annotation>
+            </xs:attribute>
+          </xs:complexType>
+        </xs:element>
         <xs:element name="UserMetric" maxOccurs="unbounded">
           <xs:annotation>
             <xs:appinfo>

+ 3 - 2
roxie/ccd/ccd.hpp

@@ -322,8 +322,9 @@ extern unsigned slaTimeout;
 extern unsigned headRegionSize;
 extern unsigned ccdMulticastPort;
 extern CriticalSection ccdChannelsCrit;
-extern IPropertyTree* ccdChannels;
-extern IPropertyTree* topology;
+extern IPropertyTree *ccdChannels;
+extern IPropertyTree *topology;
+extern MapStringTo<int> *preferredClusters;
 extern StringArray allQuerySetNames;
 
 extern bool allFilesDynamic;

+ 44 - 34
roxie/ccd/ccdfile.cpp

@@ -485,32 +485,32 @@ static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDes
     return NULL;
 }
 
-static bool checkClusterCount(UnsignedArray &counts, unsigned clusterNo, unsigned max)
+static int getClusterPriority(const char *clusterName)
 {
-    while (!counts.isItem(clusterNo))
-        counts.append(0);
-    unsigned count = counts.item(clusterNo);
-    if (count>=max)
-        return false;
-    counts.replace(++count, clusterNo);
-    return true;
-}
-
-static bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const char *name)
-{
-    StringBuffer s;
-    return strieq(name, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
+    assertex(preferredClusters);
+    int *priority = preferredClusters->getValue(clusterName);
+    return priority ? *priority : 100;
 }
 
 static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
 {
-    UnsignedArray clusterCounts;
+    IFileDescriptor &fdesc = pdesc->queryOwner();
     unsigned numCopies = pdesc->numCopies();
+    unsigned lastClusterNo = (unsigned) -1;
+    unsigned numThisCluster = 0;
+    int priority = 0;
+    IntArray priorities;
     for (unsigned copy = 0; copy < numCopies; copy++)
     {
         unsigned clusterNo = pdesc->copyClusterNum(copy);
-        if (fromCluster && *fromCluster && isCopyFromCluster(pdesc, clusterNo, fromCluster)!=includeFromCluster)
-            continue;
+        StringBuffer clusterName;
+        fdesc.getClusterGroupName(clusterNo, clusterName);
+        if (fromCluster && *fromCluster)
+        {
+            bool matches = strieq(clusterName.str(), fromCluster);
+            if (matches!=includeFromCluster)
+                continue;
+        }
         RemoteFilename r;
         pdesc->getFilename(copy,r);
         StringBuffer path;
@@ -522,26 +522,36 @@ static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
             if (streq(l, localFileName))
                 continue; // don't add ourself
         }
-        if (!checkClusterCount(clusterCounts, clusterNo, 2))  // Don't add more than 2 from one cluster
-            continue;
-        locations.append(path.str());
-    }
-}
-
-static void appendPeerLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName)
-{
-    const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
-    if (peerCluster)
-    {
-        if (*peerCluster=='-') //a remote cluster was specified explicitly
-            return;
-        if (streq(peerCluster, roxieName))
-            peerCluster=NULL;
+        if (clusterNo == lastClusterNo)
+        {
+            numThisCluster++;
+            if (numThisCluster > 2)  // Don't add more than 2 from one cluster
+                continue;
+        }
+        else
+        {
+            numThisCluster = 1;
+            lastClusterNo = clusterNo;
+            if (preferredClusters)
+            {
+                priority = getClusterPriority(clusterName);
+            }
+            else
+                priority = copy;
+        }
+        if (priority >= 0)
+        {
+            ForEachItemIn(idx, priorities)
+            {
+                if (priorities.item(idx) < priority)
+                    break;
+            }
+            priorities.add(priority, idx);
+            locations.add(path.str(), idx);
+        }
     }
-    appendRemoteLocations(pdesc, locations, localFileName, peerCluster, true);
 }
 
-
 //----------------------------------------------------------------------------------------------
 
 typedef StringArray *StringArrayPtr;

+ 15 - 2
roxie/ccd/ccdmain.cpp

@@ -86,7 +86,8 @@ bool runOnce = false;
 unsigned udpMulticastBufferSize = 262142;
 bool roxieMulticastEnabled = true;
 
-IPropertyTree* topology;
+IPropertyTree *topology;
+MapStringTo<int> *preferredClusters;
 StringBuffer topologyFile;
 CriticalSection ccdChannelsCrit;
 IPropertyTree* ccdChannels;
@@ -507,7 +508,19 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
             topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
         }
-
+        if (topology->hasProp("PreferredCluster"))
+        {
+            preferredClusters = new MapStringTo<int>(true);
+            Owned<IPropertyTreeIterator> clusters = topology->getElements("PreferredCluster");
+            ForEach(*clusters)
+            {
+                IPropertyTree &child = clusters->query();
+                const char *name = child.queryProp("@name");
+                int priority = child.getPropInt("@priority", 100);
+                if (name && *name)
+                    preferredClusters->setValue(name, priority);
+            }
+        }
         topology->getProp("@name", roxieName);
         Owned<const IQueryDll> standAloneDll;
         if (globals->hasProp("--loadWorkunit"))