소스 검색

Merge pull request #14302 from richardkchapman/HPCC-24933

HPCC-24933 Serialize server IP as 4 bytes in Roxie header

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 4 년 전
부모
커밋
799b2cfe9f

+ 1 - 1
roxie/ccd/ccdfile.cpp

@@ -3541,7 +3541,7 @@ private:
                 throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
                         cluster, dFile->queryLogicalName());
             SocketEndpointArray eps;
-            SocketEndpoint me(0, myNode.getNodeAddress());
+            SocketEndpoint me(0, myNode.getIpAddress());
             eps.append(me);
             localCluster.setown(createIGroup(eps));
             StringBuffer clusterName(cluster);

+ 3 - 3
roxie/ccd/ccdmain.cpp

@@ -450,7 +450,7 @@ void readStaticTopology()
         {
             myNodeSet = true;
             myNode.setIp(ip);
-            myAgentEP.set(ccdMulticastPort, myNode.getNodeAddress());
+            myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
         }
         ForEachItemIn(idx, nodeTable)
         {
@@ -1231,7 +1231,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             else
             {
                 Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
-                Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(myNode.getNodeAddress(), 0, 1, false), 0, 0, NULL);
+                Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(myNode.getIpAddress(), 0, 1, false), 0, 0, NULL);
                 try
                 {
                     const char *format = topology->queryProp("@format");
@@ -1277,7 +1277,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
                     unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
                     //unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
                     // NOTE: farmer name [@name=] is not copied into topology
-                    const IpAddress &ip = myNode.getNodeAddress();
+                    const IpAddress ip = myNode.getIpAddress();
                     if (!roxiePort)
                     {
                         roxiePort = port;

+ 6 - 6
roxie/ccd/ccdqueue.cpp

@@ -116,7 +116,7 @@ void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, uns
 
 StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
 {
-    const IpAddress &serverIP = serverId.getNodeAddress();
+    const IpAddress serverIP = serverId.getIpAddress();
     ret.appendf("uid=" RUIDF " activityId=", uid);
     switch(activityId & ~ROXIE_PRIORITY_MASK)
     {
@@ -758,7 +758,7 @@ struct PingRecord
 void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
 {
     const RoxiePacketHeader &header = packet->queryHeader();
-    const IpAddress &serverIP = header.serverId.getNodeAddress();
+    const IpAddress serverIP = header.serverId.getIpAddress();
     unsigned contextLength = packet->getContextLength();
     if (contextLength != sizeof(PingRecord))
     {
@@ -2180,10 +2180,10 @@ public:
     RoxieAeronSocketQueueManager(unsigned _numWorkers) : RoxieSocketQueueManager(_numWorkers)
     {
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
-        SocketEndpoint ep(dataPort, myNode.getNodeAddress());
+        SocketEndpoint ep(dataPort, myNode.getIpAddress());
         receiveManager.setown(createAeronReceiveManager(ep));
-        assertex(!myNode.getNodeAddress().isNull());
-        sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getNodeAddress()));
+        assertex(!myNode.getIpAddress().isNull());
+        sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getIpAddress()));
     }
 
 };
@@ -2730,7 +2730,7 @@ class PingTimer : public Thread
                 DBGLOG("PING sent");
 
             PingRecord data;
-            data.senderIP.ipset(myNode.getNodeAddress());
+            data.senderIP.ipset(myNode.getIpAddress());
             data.tick = usTick();
             memcpy(finger, &data, sizeof(PingRecord));
             Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);

+ 1 - 1
roxie/udplib/udpaeron.cpp

@@ -425,7 +425,7 @@ public:
 
 IMessagePacker *CRoxieAeronSendManager::createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue)
 {
-    const IpAddress &dest = destNode.getNodeAddress();
+    const IpAddress dest = destNode.getIpAddress();
     return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue);
 }
 

+ 12 - 9
roxie/udplib/udplib.hpp

@@ -41,31 +41,34 @@ typedef unsigned RecordLengthType;
 class UDPLIB_API ServerIdentifier
 {
 private:
-    IpAddress serverIp;  // MORE - should really be an endpoint?
+    unsigned netAddress = 0;
 public:
-    ServerIdentifier() : serverIp() { }
-    ServerIdentifier(const ServerIdentifier &from) : serverIp(from.serverIp) { }
-    ServerIdentifier(const IpAddress &from) : serverIp(from) { }
-    const IpAddress &getNodeAddress() const;
+    ServerIdentifier() { }
+    ServerIdentifier(const ServerIdentifier &from) : netAddress(from.netAddress) { }
+    ServerIdentifier(const IpAddress &from) { setIp(from); }
+    const IpAddress getIpAddress() const;
+    unsigned getIp4() const { return netAddress; };
     const ServerIdentifier & operator=(const ServerIdentifier &from)
     {
-        serverIp = from.serverIp;
+        netAddress = from.netAddress;
         return *this;
     }
     bool operator==(const ServerIdentifier &from) const
     {
-        return serverIp.ipequals(from.serverIp);
+        return netAddress == from.netAddress;
     }
     unsigned hash() const
     {
-        return serverIp.iphash(0);
+        return netAddress;
     }
     inline void setIp(const IpAddress &_ip)
     {
-        serverIp = _ip;
+        netAddress = _ip.getIP4();
     }
     StringBuffer &getTraceText(StringBuffer &s) const
     {
+        IpAddress serverIp;
+        serverIp.setIP4(netAddress);
         return serverIp.getIpText(s);
     }
 };

+ 1 - 3
roxie/udplib/udpmsgpk.cpp

@@ -435,9 +435,7 @@ public:
 PUID GETPUID(DataBuffer *dataBuff)
 {
     UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
-    unsigned ip4;
-    if (pktHdr->node.getNodeAddress().getNetAddress(sizeof(ip4), &ip4) != sizeof(ip4))
-        throw makeStringException(ROXIE_INTERNAL_ERROR, "IPv6 not supported in roxie"); // MORE - do we ever care about ipv6?
+    unsigned ip4 = pktHdr->node.getIp4();
     return (((PUID) ip4) << 32) | (PUID) pktHdr->msgSeq;
 }
 

+ 4 - 2
roxie/udplib/udpsha.cpp

@@ -54,9 +54,11 @@ MODULE_EXIT()
 }
 
 
-const IpAddress &ServerIdentifier::getNodeAddress() const
+const IpAddress ServerIdentifier::getIpAddress() const
 {
-    return serverIp;
+    IpAddress ret;
+    ret.setIP4(netAddress);
+    return ret;
 }
 
 ServerIdentifier myNode;

+ 4 - 4
roxie/udplib/udptrr.cpp

@@ -269,7 +269,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     unsigned int res;
                     sniff_msg msg;
                     sniffer_socket->read(&msg, 1, sizeof(msg), res, 5);
-                    update(msg.nodeIp.getNodeAddress(), msg.cmd == sniffType::busy);
+                    update(msg.nodeIp.getIpAddress(), msg.cmd == sniffType::busy);
                 }
                 catch (IException *e) 
                 {
@@ -336,7 +336,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     pendingRequests = requester;
                 lastPending = requester;
             }
-            requester->requestToSend(0, myNode.getNodeAddress());  // Acknowledge receipt of the request
+            requester->requestToSend(0, myNode.getIpAddress());  // Acknowledge receipt of the request
         }
 
         unsigned okToSend(UdpSenderEntry *requester)
@@ -347,7 +347,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 max_transfer = maxSlotsPerSender;
             unsigned timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
             currentRequester = requester;
-            requester->requestToSend(max_transfer, myNode.getNodeAddress());
+            requester->requestToSend(max_transfer, myNode.getIpAddress());
             return timeout;
         }
 
@@ -491,7 +491,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                             StringBuffer ipStr;
                             DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
                         }
-                        UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode.getNodeAddress()];
+                        UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode.getIpAddress()];
                         switch (msg.cmd)
                         {
                         case flowType::request_to_send:

+ 7 - 7
roxie/udplib/udptrs.cpp

@@ -522,7 +522,7 @@ class CSendManager : implements ISendManager, public CInterface
                                 StringBuffer s;
                                 DBGLOG("UdpSender: received request_received msg from node=%s", f.destNode.getTraceText(s).str());
                             }
-                            parent.receiversTable[f.destNode.getNodeAddress()].requestAcknowledged();
+                            parent.receiversTable[f.destNode.getIpAddress()].requestAcknowledged();
                             break;
 
                         default: 
@@ -643,7 +643,7 @@ class CSendManager : implements ISendManager, public CInterface
 
                 if (udpSnifferEnabled)
                     send_sniff(sniffType::busy);
-                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode.getNodeAddress()];
+                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode.getIpAddress()];
                 unsigned payload = receiverInfo.sendData(permit, bucket);
                 if (udpSnifferEnabled)
                     send_sniff(sniffType::idle);
@@ -721,13 +721,13 @@ public:
 
     virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
     {
-        const IpAddress &dest = destNode.getNodeAddress();
+        const IpAddress dest = destNode.getIpAddress();
         return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue);
     }
 
     virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
     {
-        const IpAddress &dest = destNode.getNodeAddress();
+        const IpAddress dest = destNode.getIpAddress();
         UdpPacketHeader pkHdr;
         pkHdr.ruid = ruid;
         pkHdr.msgId = msgId;
@@ -736,7 +736,7 @@ public:
 
     virtual bool abortData(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode)
     {
-        const IpAddress &dest = destNode.getNodeAddress();
+        const IpAddress dest = destNode.getIpAddress();
         UdpPacketHeader pkHdr;
         pkHdr.ruid = ruid;
         pkHdr.msgId = msgId;
@@ -758,8 +758,8 @@ public:
 
 ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter)
 {
-    assertex(!myNode.getNodeAddress().isNull());
-    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getNodeAddress(), rateLimiter);
+    assertex(!myNode.getIpAddress().isNull());
+    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter);
 }
 
 class CMessagePacker : implements IMessagePacker, public CInterface

+ 3 - 3
roxie/udplib/uttest.cpp

@@ -179,7 +179,7 @@ public:
         Owned<IReceiveManager> rcvMgr;
         if (useAeron)
         {
-            SocketEndpoint myEP(7000, myNode.getNodeAddress());
+            SocketEndpoint myEP(7000, myNode.getIpAddress());
             rcvMgr.setown(createAeronReceiveManager(myEP));
         }
         else
@@ -291,7 +291,7 @@ void testNxN()
         maxPacketsPerSender = udpQueueSize;
     Owned <ISendManager> sendMgr;
     if (useAeron)
-        sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getNodeAddress()));
+        sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress()));
     else
         sendMgr.setown(createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL));
     Receiver receiver;
@@ -778,7 +778,7 @@ int main(int argc, char * argv[] )
         myIndex = numNodes;
         ForEachItemIn(idx, allNodes)
         {
-            if (allNodes.item(idx).ipequals(myNode.getNodeAddress()))
+            if (allNodes.item(idx).ipequals(myNode.getIpAddress()))
             {
                 myIndex = idx;
                 break;

+ 17 - 0
system/jlib/jsocket.cpp

@@ -3156,6 +3156,23 @@ inline bool isIp4(const unsigned *netaddr)
     return false;
 }
 
+void IpAddress::setIP4(unsigned ip)
+{
+    netaddr[0] = 0;
+    netaddr[1] = 0;
+    if (ip)
+        netaddr[2] = 0xffff0000;
+    else
+        netaddr[2] = 0;
+    netaddr[3] = ip;
+}
+
+unsigned IpAddress::getIP4() const
+{
+    assertex(isIp4());
+    return netaddr[3];
+}
+
 bool IpAddress::isIp4() const
 {
     return ::isIp4(netaddr);

+ 2 - 0
system/jlib/jsocket.hpp

@@ -97,6 +97,8 @@ public:
     void ipserialize(MemoryBuffer & out) const;         
     void ipdeserialize(MemoryBuffer & in);          
     unsigned ipdistance(const IpAddress &ip,unsigned offset=0) const;       // network order distance (offset: 0-3 word (leat sig.), 0=Ipv4)
+    unsigned getIP4() const;
+    void setIP4(unsigned);
     bool ipincrement(unsigned count,byte minoctet=0,byte maxoctet=255,unsigned short minipv6piece=0,unsigned maxipv6piece=0xffff);
     unsigned ipsetrange( const char *text); // e.g. 10.173.72.1-65  ('-' may be omitted)
                                             // returns number in range (use ipincrement to iterate through)