浏览代码

HPCC-24963 Refactor IpMap code in udplib to use ServerIdentifier

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 年之前
父节点
当前提交
c7be39e54a

+ 4 - 4
roxie/ccd/ccdqueue.cpp

@@ -1421,7 +1421,7 @@ public:
                 if (!abortJob)
                 {
                     for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
-                        noteNodeSick(header.subChannels[subChannel].getIpAddress());
+                        noteNodeSick(header.subChannels[subChannel]);
                 }
             }
 #else
@@ -2070,7 +2070,7 @@ public:
                 {
                     if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
                         break;
-                    noteNodeSick(header.subChannels[subChannel].getIpAddress());
+                    noteNodeSick(header.subChannels[subChannel]);
                 }
 
                 DelayedPacketEntry *goer = finger;
@@ -2437,7 +2437,7 @@ public:
 #ifndef SUBCHANNELS_IN_HEADER
                 channelInfo.noteChannelHealthy(subChannel);
 #else
-                noteNodeHealthy(header.subChannels[subChannel].getIpAddress());
+                noteNodeHealthy(header.subChannels[subChannel]);
 #endif
                 bool foundInQ = false;
 #ifdef NEW_IBYTI
@@ -2606,7 +2606,7 @@ public:
                     {
                         unsigned delay = 0;
                         for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++)
-                            delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
+                            delay += getIbytiDelay(header.subChannels[subChannel]);
                         delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
                     }
                     else

+ 1 - 1
roxie/udplib/udpaeron.cpp

@@ -393,7 +393,7 @@ public:
     CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP)
     : dataPort(_dataPort),
       numQueues(_numQueues),
-      receiversTable([this](const IpAddress &ip) { return new UdpAeronReceiverEntry(ip, dataPort, aeron, numQueues);}),
+      receiversTable([this](const ServerIdentifier &ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}),
       myIP(_myIP)
     {
         if (useEmbeddedAeronDriver && !is_running())

+ 8 - 5
roxie/udplib/udpipmap.cpp

@@ -30,11 +30,11 @@ class IpMapTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(IpMapTest);
         CPPUNIT_TEST(testIpMap);
-        CPPUNIT_TEST(testIpV6);
+        // CPPUNIT_TEST(testIpV6);
         CPPUNIT_TEST(testThread);
     CPPUNIT_TEST_SUITE_END();
 
-    static unsigned *createMapEntry(const IpAddress &)
+    static unsigned *createMapEntry(const ServerIdentifier &)
     {
         return new unsigned(3);
     }
@@ -42,10 +42,10 @@ class IpMapTest : public CppUnit::TestFixture
     void testIpMap()
     {
         unsigned five = 5;
-        auto createMapEntry = [five](const IpAddress &ip)
+        auto createMapEntry = [five](const ServerIdentifier &ip)
         {
             StringBuffer s;
-            printf("adding ip %s\n", ip.getIpText(s).str());
+            printf("adding ip %s\n", ip.getTraceText(s).str());
             return new unsigned(five);
         };
         IpMapOf<unsigned> map(createMapEntry);
@@ -65,6 +65,8 @@ class IpMapTest : public CppUnit::TestFixture
         printf("entries = %d\n", entries);
         ASSERT(entries == 3);
     }
+#if 0
+    // Current implementation of IpMapOf assumes ipv4
     void testIpV6()
     {
         IpAddress ip1("fe80::1c7e:ebe8:4ee8:6154");
@@ -74,6 +76,7 @@ class IpMapTest : public CppUnit::TestFixture
         ASSERT(&map.lookup(ip1)!=&map.lookup(ip2));
         ASSERT(&map.lookup(ip1)==&map.lookup(ip1));
     }
+#endif
 
     class IpEntry
     {
@@ -87,7 +90,7 @@ class IpMapTest : public CppUnit::TestFixture
 
     void testThread()
     {
-        IpMapOf<IpEntry> map([](const IpAddress &){return new IpEntry; });
+        IpMapOf<IpEntry> map([](const ServerIdentifier &){return new IpEntry; });
         std::thread threads[100];
         Semaphore ready;
         for (int i = 0; i < 100; i++)

+ 9 - 9
roxie/udplib/udpipmap.hpp

@@ -31,7 +31,7 @@ private:
     class list
     {
     public:
-        list(const IpAddress &_ip, const list *_next, std::function<T *(const IpAddress &)> tfunc) : ip(_ip), next(_next)
+        list(const ServerIdentifier &_ip, const list *_next, std::function<T *(const ServerIdentifier &)> tfunc) : ip(_ip), next(_next)
         {
             entry = tfunc(ip);
         }
@@ -39,7 +39,7 @@ private:
         {
             delete entry;
         }
-        const IpAddress ip;
+        const ServerIdentifier ip;
         const list *next;
         T *entry;
     };
@@ -86,11 +86,11 @@ private:
     };
 
 public:
-    IpMapOf<T>(std::function<T *(const IpAddress &)> _tfunc) : tfunc(_tfunc)
+    IpMapOf<T>(std::function<T *(const ServerIdentifier &)> _tfunc) : tfunc(_tfunc)
     {
     }
-    T &lookup(const IpAddress &) const;
-    inline T &operator[](const IpAddress &ip) const { return lookup(ip); }
+    T &lookup(const ServerIdentifier &) const;
+    inline T &operator[](const ServerIdentifier &ip) const { return lookup(ip); }
     myIterator begin()
     {
         // Take care as it's possible for firstHash to be updated on another thread as we are running
@@ -103,22 +103,22 @@ public:
     myIterator end()   { return myIterator(nullptr, 256, nullptr); }
 
 private:
-    const std::function<T *(const IpAddress &)> tfunc;
+    const std::function<T *(const ServerIdentifier &)> tfunc;
     mutable std::atomic<const list *> table[256] = {};
     mutable CriticalSection lock;
     mutable std::atomic<unsigned> firstHash = { 256 };
 };
 
-template<class T> T &IpMapOf<T>::lookup(const IpAddress &ip) const
+template<class T> T &IpMapOf<T>::lookup(const ServerIdentifier &ip) const
 {
-   unsigned hash = ip.fasthash() & 0xff;
+   unsigned hash = ip.hash() & 0xff;
    for (;;)
    {
        const list *head = table[hash].load(std::memory_order_acquire);
        const list *finger = head;
        while (finger)
        {
-           if (finger->ip.ipequals(ip))
+           if (finger->ip == ip)
                return *finger->entry;
            finger = finger->next;
        }

+ 4 - 4
roxie/udplib/udptopo.cpp

@@ -85,14 +85,14 @@ bool ChannelInfo::otherAgentHasPriority(unsigned priorityHash, unsigned otherAge
     return false;
 }
 
-static unsigned *createNewNodeHealthScore(const IpAddress &)
+static unsigned *createNewNodeHealthScore(const ServerIdentifier &)
 {
     return new unsigned(initIbytiDelay);
 }
 
 static IpMapOf<unsigned> buddyHealth(createNewNodeHealthScore);   // For each buddy IP ever seen, maintains a score of how long I should wait for it to respond when it is the 'first responder'
 
-void noteNodeSick(const IpAddress &node)
+void noteNodeSick(const ServerIdentifier &node)
 {
     // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
     unsigned current = buddyHealth[node];
@@ -102,13 +102,13 @@ void noteNodeSick(const IpAddress &node)
     buddyHealth[node] = newDelay;
 }
 
-void noteNodeHealthy(const IpAddress &node)
+void noteNodeHealthy(const ServerIdentifier &node)
 {
     // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
     buddyHealth[node] = initIbytiDelay;
 }
 
-unsigned getIbytiDelay(const IpAddress &node)
+unsigned getIbytiDelay(const ServerIdentifier &node)
 {
     return buddyHealth[node];
 }

+ 3 - 3
roxie/udplib/udptopo.hpp

@@ -98,9 +98,9 @@ private:
 
 // In containerized mode with dynamic topology , we prefer a different mechanism for tracking node health
 
-extern UDPLIB_API void noteNodeSick(const IpAddress &node);
-extern UDPLIB_API void noteNodeHealthy(const IpAddress &node);
-extern UDPLIB_API unsigned getIbytiDelay(const IpAddress &node);
+extern UDPLIB_API void noteNodeSick(const ServerIdentifier &node);
+extern UDPLIB_API void noteNodeHealthy(const ServerIdentifier &node);
+extern UDPLIB_API unsigned getIbytiDelay(const ServerIdentifier &node);
 
 interface ITopologyServer : public IInterface
 {

+ 2 - 2
roxie/udplib/udptrr.cpp

@@ -491,7 +491,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                             StringBuffer ipStr;
                             DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
                         }
-                        UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode.getIpAddress()];
+                        UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode];
                         switch (msg.cmd)
                         {
                         case flowType::request_to_send:
@@ -718,7 +718,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     public:
     IMPLEMENT_IINTERFACE;
     CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client)
-        : collatorThread(*this), sendersTable([client_flow_port](const IpAddress &ip) { return new UdpSenderEntry(ip, client_flow_port);})
+        : collatorThread(*this), sendersTable([client_flow_port](const ServerIdentifier &ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -15);

+ 6 - 9
roxie/udplib/udptrs.cpp

@@ -522,7 +522,7 @@ class CSendManager : implements ISendManager, public CInterface
                                 StringBuffer s;
                                 DBGLOG("UdpSender: received request_received msg from node=%s", f.destNode.getTraceText(s).str());
                             }
-                            parent.receiversTable[f.destNode.getIpAddress()].requestAcknowledged();
+                            parent.receiversTable[f.destNode].requestAcknowledged();
                             break;
 
                         default: 
@@ -643,7 +643,7 @@ class CSendManager : implements ISendManager, public CInterface
 
                 if (udpSnifferEnabled)
                     send_sniff(sniffType::busy);
-                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode.getIpAddress()];
+                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode];
                 unsigned payload = receiverInfo.sendData(permit, bucket);
                 if (udpSnifferEnabled)
                     send_sniff(sniffType::idle);
@@ -691,7 +691,7 @@ public:
     CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket)
         : bucket(_bucket),
           myIP(_myIP),
-          receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port](const IpAddress &ip) { return new UdpReceiverEntry(ip, _myIP, _numQueues, q_size, server_flow_port, data_port);})
+          receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port](const ServerIdentifier &ip) { return new UdpReceiverEntry(ip.getIpAddress(), _myIP, _numQueues, q_size, server_flow_port, data_port);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -3);
@@ -721,26 +721,23 @@ public:
 
     virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
     {
-        const IpAddress dest = destNode.getIpAddress();
-        return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue);
+        return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[destNode], myIP, getNextMessageSequence(), queue);
     }
 
     virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
     {
-        const IpAddress dest = destNode.getIpAddress();
         UdpPacketHeader pkHdr;
         pkHdr.ruid = ruid;
         pkHdr.msgId = msgId;
-        return receiversTable[dest].dataQueued(pkHdr);
+        return receiversTable[destNode].dataQueued(pkHdr);
     }
 
     virtual bool abortData(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode)
     {
-        const IpAddress dest = destNode.getIpAddress();
         UdpPacketHeader pkHdr;
         pkHdr.ruid = ruid;
         pkHdr.msgId = msgId;
-        return receiversTable[dest].removeData((void*) &pkHdr, &UdpReceiverEntry::comparePacket);
+        return receiversTable[destNode].removeData((void*) &pkHdr, &UdpReceiverEntry::comparePacket);
     }
 
     virtual bool allDone()