Browse Source

HPCC-22944 Roxie incorrectly copying files to hpcc-data2 in "simple" mode

Incorrectly calculating replication level in "simple" replication mode.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 years ago
parent
commit
ad87ea6ff2
4 changed files with 41 additions and 16 deletions
  1. 17 9
      roxie/ccd/ccdmain.cpp
  2. 1 1
      roxie/ccd/ccdqueue.cpp
  3. 19 5
      roxie/udplib/udptopo.cpp
  4. 4 1
      roxie/udplib/udptopo.hpp

+ 17 - 9
roxie/ccd/ccdmain.cpp

@@ -410,7 +410,7 @@ public:
 static SocketEndpointArray topologyServers;
 static std::vector<RoxieEndpointInfo> myRoles;
 static std::vector<unsigned> farmerPorts;
-static std::vector<unsigned> slaveChannels;
+static std::vector<std::pair<unsigned, unsigned>> slaveChannels;
 
 static bool splitarg(const char *arg, std::string &name, std::string &value)
 {
@@ -461,7 +461,7 @@ void readStaticTopology()
         {
             IPropertyTree &roxieFarm = roxieFarms->query();
             unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
-            RoxieEndpointInfo server = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, ip }};
+            RoxieEndpointInfo server = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, ip }, 0};
             allRoles.push_back(server);
         }
     }
@@ -491,7 +491,7 @@ void readStaticTopology()
                 int channel = (int)i+1 - (copy * cyclicOffset);
                 while (channel < 1)
                     channel = channel + numNodes;
-                RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, (unsigned) channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }};
+                RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, (unsigned) channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
                 allRoles.push_back(slave);
             }
         }
@@ -506,7 +506,7 @@ void readStaticTopology()
             for (unsigned i=0; i<numNodes; i++)
             {
                 unsigned channel = i+1;
-                RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }};
+                RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
                 allRoles.push_back(slave);
                 channel += numNodes;
             }
@@ -520,7 +520,7 @@ void readStaticTopology()
         unsigned channel = 1;
         for (unsigned i=0; i<numNodes; i++)
         {
-            RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }};
+            RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, 0 };
             allRoles.push_back(slave);
             channel++;
             if (channel > calcNumChannels)
@@ -647,7 +647,15 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 }
                 else if (name=="--channel")
                 {
-                    slaveChannels.push_back(atoi(value.c_str()));
+                    char *tail = nullptr;
+                    unsigned channel = strtoul(value.c_str(), &tail, 10);
+                    unsigned repl = 0;
+                    if (*tail==':')
+                    {
+                        tail++;
+                        repl = atoi(tail);
+                    }
+                    slaveChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
                     continue;
                 }
             }
@@ -1128,13 +1136,13 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 VStringBuffer xpath("./RoxieFarmProcess[@port='%u']", port);
                 if (!topology->hasProp(xpath))
                     topology->addPropTree("./RoxieFarmProcess")->setPropInt("@port", port);
-                RoxieEndpointInfo me = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, myIP }};
+                RoxieEndpointInfo me = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, myIP }, 0};
                 myRoles.push_back(me);
             }
-            for (unsigned channel: slaveChannels)
+            for (std::pair<unsigned, unsigned> channel: slaveChannels)
             {
                 mySlaveEP.set(ccdMulticastPort, myIP);
-                RoxieEndpointInfo me = { RoxieEndpointInfo::RoxieSlave, channel, mySlaveEP };
+                RoxieEndpointInfo me = { RoxieEndpointInfo::RoxieSlave, channel.first, mySlaveEP, channel.second };
                 myRoles.push_back(me);
             }
         }

+ 1 - 1
roxie/ccd/ccdqueue.cpp

@@ -206,7 +206,7 @@ unsigned getReplicationLevel(unsigned channel)
     if (!channel)
         return 0;
     Owned<const ITopologyServer> topology = getTopology();
-    return topology->queryChannelInfo(channel).subChannel();
+    return topology->queryChannelInfo(channel).replicationLevel();
 }
 
 //============================================================================================

+ 19 - 5
roxie/udplib/udptopo.cpp

@@ -62,8 +62,8 @@ void ChannelInfo::noteChannelHealthy(unsigned subChannel) const
     currentDelay[subChannel] = initIbytiDelay;
 }
 
-ChannelInfo::ChannelInfo(unsigned _mySubChannel, unsigned _numSubChannels)
-: mySubChannel(_mySubChannel), numSubChannels(_numSubChannels)
+ChannelInfo::ChannelInfo(unsigned _mySubChannel, unsigned _numSubChannels, unsigned _replicationLevel)
+: mySubChannel(_mySubChannel), numSubChannels(_numSubChannels), myReplicationLevel(_replicationLevel)
 {
     for (unsigned i = 0; i < numSubChannels; i++)
         currentDelay.emplace_back(initIbytiDelay);
@@ -103,6 +103,7 @@ private:
     std::map<unsigned, ChannelInfo> channelInfo;
     std::map<unsigned, unsigned> mySubChannels;
     std::vector<unsigned> channels;
+    std::vector<unsigned> replicationLevels;
 };
 
 SocketEndpoint mySlaveEP;
@@ -119,11 +120,12 @@ CTopologyServer::CTopologyServer(const char *topologyInfo)
     {
         StringArray fields;
         fields.appendList(line.c_str(), "|", true);
-        if (fields.length()==3)
+        if (fields.length()==4)
         {
             const char *role = fields.item(0);
             const char *channelStr = fields.item(1);
             const char *epStr = fields.item(2);
+            const char *replStr = fields.item(3);
             char *tail = nullptr;
             unsigned channel = strtoul(channelStr, &tail, 10);
             if (*tail)
@@ -131,6 +133,13 @@ CTopologyServer::CTopologyServer(const char *topologyInfo)
                 DBGLOG("Unexpected characters parsing channel in topology entry %s", line.c_str());
                 continue;
             }
+            tail = nullptr;
+            unsigned repl = strtoul(replStr, &tail, 10);
+            if (*tail)
+            {
+                DBGLOG("Unexpected characters parsing replication level in topology entry %s", line.c_str());
+                continue;
+            }
             SocketEndpoint ep;
             if (!ep.set(epStr))
             {
@@ -144,6 +153,7 @@ CTopologyServer::CTopologyServer(const char *topologyInfo)
                 {
                     mySubChannels[channel] = slaves[channel].ordinality()-1;
                     channels.push_back(channel);
+                    replicationLevels.push_back(repl);
                 }
                 slaves[0].append(ep);
             }
@@ -151,9 +161,12 @@ CTopologyServer::CTopologyServer(const char *topologyInfo)
                 servers[ep.port].append(ep);
         }
     }
-    for (auto& c : mySubChannels)
+    for (unsigned i = 0; i < channels.size(); i++)
     {
-        channelInfo.emplace(std::make_pair(c.first, ChannelInfo(c.second, slaves[c.first].ordinality())));
+        unsigned channel = channels[i];
+        unsigned repl = replicationLevels[i];
+        unsigned subChannel = mySubChannels[channel];
+        channelInfo.emplace(std::make_pair(channel, ChannelInfo(subChannel, slaves[channel].ordinality(), repl)));
     }
 }
 
@@ -303,6 +316,7 @@ void TopologyManager::setRoles(const std::vector<RoxieEndpointInfo> &myRoles)
         }
         topoBuf.append(role.channel).append('|');
         role.ep.getUrlStr(topoBuf);
+        topoBuf.append('|').append(role.replicationLevel);
         topoBuf.append('\n');
     }
     Owned<const ITopologyServer> newServer = new CTopologyServer(topoBuf);   // We set the initial topology to just the local information we know about

+ 4 - 1
roxie/udplib/udptopo.hpp

@@ -73,13 +73,14 @@ extern UDPLIB_API SocketEndpoint mySlaveEP;
 class UDPLIB_API ChannelInfo
 {
 public:
-    ChannelInfo(unsigned _subChannel, unsigned _numSubChannels);
+    ChannelInfo(unsigned _subChannel, unsigned _numSubChannels, unsigned _replicationLevel);
     ChannelInfo(ChannelInfo && ) = default;
 
     unsigned getIbytiDelay(unsigned primarySubChannel) const;
     void noteChannelsSick(unsigned primarySubChannel) const;
     void noteChannelHealthy(unsigned subChannel) const;
     inline unsigned subChannel() const { return mySubChannel; }
+    inline unsigned replicationLevel() const { return myReplicationLevel; }
 
     /*
      * Determine whether to abort on receipt of an IBYTI for a packet which I have already started processing
@@ -90,6 +91,7 @@ public:
 
 private:
     unsigned mySubChannel = 0;     // Which subChannel does this node implement for this channel - zero-based
+    unsigned myReplicationLevel = 0; // Which data location is this channel pulling its data from - zero-based
     unsigned numSubChannels = 0;   // How many subchannels are there for this channel, across all slaves. Equivalently, the number of slaves that implement this channel
     mutable std::vector<unsigned> currentDelay;  // NOTE - technically should be atomic, but in the event of a race we don't really care who wins
 };
@@ -110,6 +112,7 @@ struct RoxieEndpointInfo
     enum Role { RoxieServer, RoxieSlave } role;
     unsigned channel;
     SocketEndpoint ep;
+    unsigned replicationLevel;
 };
 
 extern UDPLIB_API void startTopoThread(const SocketEndpointArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);