فهرست منبع

HPCC-8390 Change strategy to use inner struct

Signed-off-by: Renato Golin <rengolin@hpccsystems.com>
Renato Golin 12 سال پیش
والد
کامیت
1e6c7a7597
6فایلهای تغییر یافته به همراه57 افزوده شده و 61 حذف شده
  1. 1 0
      roxie/ccd/ccdactivities.hpp
  2. 0 3
      roxie/roxiemem/CMakeLists.txt
  3. 0 3
      roxie/udplib/udplib.cmake
  4. 29 28
      roxie/udplib/udpsha.hpp
  5. 8 8
      roxie/udplib/udptrr.cpp
  6. 19 19
      roxie/udplib/udptrs.cpp

+ 1 - 0
roxie/ccd/ccdactivities.hpp

@@ -81,6 +81,7 @@ struct KeyedJoinHeader
 
 
 static inline int KEYEDJOIN_RECORD_SIZE(int dataSize)
 static inline int KEYEDJOIN_RECORD_SIZE(int dataSize)
 {
 {
+    // As long as KeyedJoinHeader remains a POD, this is OK
     return offsetof(KeyedJoinHeader, rhsdata) + dataSize;
     return offsetof(KeyedJoinHeader, rhsdata) + dataSize;
 }
 }
 
 

+ 0 - 3
roxie/roxiemem/CMakeLists.txt

@@ -44,9 +44,6 @@ include_directories (
     )
     )
 
 
 ADD_DEFINITIONS( -D_USRDLL -DROXIEMEM_EXPORTS)
 ADD_DEFINITIONS( -D_USRDLL -DROXIEMEM_EXPORTS)
-if (${CMAKE_COMPILER_IS_GNUCXX})
-    ADD_DEFINITIONS( -Wno-invalid-offsetof )
-endif (${CMAKE_COMPILER_IS_GNUCXX})
 
 
 HPCC_ADD_LIBRARY( roxiemem SHARED ${SRCS} ${INCLUDES})
 HPCC_ADD_LIBRARY( roxiemem SHARED ${SRCS} ${INCLUDES})
 install ( TARGETS roxiemem RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
 install ( TARGETS roxiemem RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )

+ 0 - 3
roxie/udplib/udplib.cmake

@@ -40,9 +40,6 @@ include_directories (
          ./../../roxie/roxie
          ./../../roxie/roxie
     )
     )
 
 
-if (${CMAKE_COMPILER_IS_GNUCXX})
-    ADD_DEFINITIONS( -Wno-invalid-offsetof )
-endif (${CMAKE_COMPILER_IS_GNUCXX})
 HPCC_ADD_LIBRARY( udplib SHARED ${SRCS} )
 HPCC_ADD_LIBRARY( udplib SHARED ${SRCS} )
 set_target_properties( udplib PROPERTIES 
 set_target_properties( udplib PROPERTIES 
     COMPILE_FLAGS -D_USRDLL
     COMPILE_FLAGS -D_USRDLL

+ 29 - 28
roxie/udplib/udpsha.hpp

@@ -179,29 +179,29 @@ public:
 #pragma pack(push,1)
 #pragma pack(push,1)
 struct UdpPermitToSendMsg
 struct UdpPermitToSendMsg
 {
 {
-    unsigned short  length;
-    unsigned short  cmd;
-    unsigned short  destNodeIndex;
-    unsigned short  max_data;
-    unsigned        lastSequenceSeen;
-    unsigned        missingCount;
+    // New static fields must be included inside this block, so that
+    // size calculations work correctly
+    struct StaticBlock {
+        unsigned short  length;
+        unsigned short  cmd;
+        unsigned short  destNodeIndex;
+        unsigned short  max_data;
+        unsigned        lastSequenceSeen;
+        unsigned        missingCount;
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
-    unsigned        crc;
-    // Change this value when new fields are added above (+crc)
-    static const size_t messageFixedSize = 4*sizeof(unsigned short) + 3*sizeof(unsigned);
-#else
-    // Change this value when new fields are added above
-    static const size_t messageFixedSize = 4*sizeof(unsigned short) + 2*sizeof(unsigned);
+        unsigned        crc;
 #endif
 #endif
+    } sb;
+    // WARNING: Do not add any field below this line, if it must be included in the message
     unsigned        missingSequences[MAX_RESEND_TABLE_SIZE]; // only [missingCount] actually sent
     unsigned        missingSequences[MAX_RESEND_TABLE_SIZE]; // only [missingCount] actually sent
 
 
     StringBuffer &toString(StringBuffer &str) const
     StringBuffer &toString(StringBuffer &str) const
     {
     {
-        str.appendf("lastSeen=%u missingCount=%u", lastSequenceSeen, missingCount);
-        if (missingCount)
+        str.appendf("lastSeen=%u missingCount=%u", sb.lastSequenceSeen, sb.missingCount);
+        if (sb.missingCount)
         {
         {
             str.append(" missing=");
             str.append(" missing=");
-            for (unsigned j = 0; j < missingCount; j++)
+            for (unsigned j = 0; j < sb.missingCount; j++)
                 str.appendf(j?",%u":"%u", missingSequences[j]);
                 str.appendf(j?",%u":"%u", missingSequences[j]);
         }
         }
         return str;
         return str;
@@ -210,7 +210,8 @@ struct UdpPermitToSendMsg
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
     unsigned calcCRC()
     unsigned calcCRC()
     {
     {
-        unsigned expectedCRC = crc32((const char *) this, offsetof(UdpPermitToSendMsg, crc), 0);
+        size_t len = sizeof(StaticBlock) - sizeof(sb.crc);
+        unsigned expectedCRC = crc32((const char *) this, len, 0);
         if (missingCount)
         if (missingCount)
             expectedCRC = crc32((const char *) &missingSequences, missingCount * sizeof(missingSequences[0]), expectedCRC); 
             expectedCRC = crc32((const char *) &missingSequences, missingCount * sizeof(missingSequences[0]), expectedCRC); 
         return expectedCRC;
         return expectedCRC;
@@ -219,26 +220,26 @@ struct UdpPermitToSendMsg
 
 
     UdpPermitToSendMsg()
     UdpPermitToSendMsg()
     {
     {
-        length = cmd = destNodeIndex = max_data = 0;
-        lastSequenceSeen = 0;
-        missingCount = 0;
+        sb.length = sb.cmd = sb.destNodeIndex = sb.max_data = 0;
+        sb.lastSequenceSeen = 0;
+        sb.missingCount = 0;
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
-        crc = calcCRC();
+        sb.crc = calcCRC();
 #endif
 #endif
     }
     }
 
 
     UdpPermitToSendMsg(const UdpPermitToSendMsg &from)
     UdpPermitToSendMsg(const UdpPermitToSendMsg &from)
     {
     {
-        length = from.length;
-        cmd = from.cmd;
-        destNodeIndex = from.destNodeIndex;
-        max_data = from.max_data;
-        lastSequenceSeen = from.lastSequenceSeen;
-        missingCount = from.missingCount;
+        sb.length = from.sb.length;
+        sb.cmd = from.sb.cmd;
+        sb.destNodeIndex = from.sb.destNodeIndex;
+        sb.max_data = from.sb.max_data;
+        sb.lastSequenceSeen = from.sb.lastSequenceSeen;
+        sb.missingCount = from.sb.missingCount;
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
-        crc = from.crc;
+        sb.crc = from.sb.crc;
 #endif
 #endif
-        memcpy(missingSequences, from.missingSequences, from.missingCount * sizeof(missingSequences[0]));
+        memcpy(missingSequences, from.missingSequences, from.sb.missingCount * sizeof(missingSequences[0]));
     }
     }
 };
 };
 
 

+ 8 - 8
roxie/udplib/udptrr.cpp

@@ -230,13 +230,13 @@ class CReceiveManager : public CInterface, implements IReceiveManager
                     UdpPermitToSendMsg msg;
                     UdpPermitToSendMsg msg;
                     {
                     {
                         SpinBlock block(resendInfoLock);
                         SpinBlock block(resendInfoLock);
-                        msg.length = UdpPermitToSendMsg::messageFixedSize + missingCount*sizeof(msg.missingSequences[0]);
-                        msg.cmd = flow_t::ok_to_send;
+                        msg.sb.length = sizeof(UdpPermitToSendMsg::StaticBlock) + missingCount*sizeof(msg.missingSequences[0]);
+                        msg.sb.cmd = flow_t::ok_to_send;
 
 
-                        msg.destNodeIndex = myNodeIndex;
-                        msg.max_data = maxTransfer;
-                        msg.lastSequenceSeen = lastSeen;
-                        msg.missingCount = missingCount;
+                        msg.sb.destNodeIndex = myNodeIndex;
+                        msg.sb.max_data = maxTransfer;
+                        msg.sb.lastSequenceSeen = lastSeen;
+                        msg.sb.missingCount = missingCount;
                         for (unsigned i = 0; i < missingCount; i++)
                         for (unsigned i = 0; i < missingCount; i++)
                         {
                         {
                             unsigned idx = (missingIndex + i) % missingTableSize;
                             unsigned idx = (missingIndex + i) % missingTableSize;
@@ -245,7 +245,7 @@ class CReceiveManager : public CInterface, implements IReceiveManager
                                 DBGLOG("Requesting resend of packet %d", missing[idx]);
                                 DBGLOG("Requesting resend of packet %d", missing[idx]);
                         }
                         }
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
-                        msg.crc = msg.calcCRC();
+                        msg.sb.crc = msg.calcCRC();
 #endif
 #endif
                     }
                     }
                     if (checkTraceLevel(TRACE_RETRY_DATA, 5))
                     if (checkTraceLevel(TRACE_RETRY_DATA, 5))
@@ -253,7 +253,7 @@ class CReceiveManager : public CInterface, implements IReceiveManager
                         StringBuffer s;
                         StringBuffer s;
                         DBGLOG("requestToSend %s", msg.toString(s).str());
                         DBGLOG("requestToSend %s", msg.toString(s).str());
                     }
                     }
-                    flowSocket->write(&msg, msg.length);
+                    flowSocket->write(&msg, msg.sb.length);
                 }
                 }
                 catch(IException *e) 
                 catch(IException *e) 
                 {
                 {

+ 19 - 19
roxie/udplib/udptrs.cpp

@@ -123,13 +123,13 @@ public:
             permit.toString(permitStr);
             permit.toString(permitStr);
             DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence); 
             DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence); 
         }
         }
-        unsigned lastReceived = permit.lastSequenceSeen;
+        unsigned lastReceived = permit.sb.lastSequenceSeen;
         unsigned missingIndex = 0;
         unsigned missingIndex = 0;
-        unsigned missingCount = permit.missingCount;
+        unsigned missingCount = permit.sb.missingCount;
         unsigned i = 0;
         unsigned i = 0;
         if (maxRetryData)
         if (maxRetryData)
         {
         {
-            while (i < retryDataCount && retries.length() < permit.max_data)
+            while (i < retryDataCount && retries.length() < permit.sb.max_data)
             {
             {
                 unsigned idx = (retryDataIdx + i) % maxRetryData;
                 unsigned idx = (retryDataIdx + i) % maxRetryData;
                 DataBuffer *buffer = retryData[idx];
                 DataBuffer *buffer = retryData[idx];
@@ -187,7 +187,7 @@ public:
     unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
     unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
     {
     {
         moreRequested = false;
         moreRequested = false;
-        maxPackets = permit.max_data;
+        maxPackets = permit.sb.max_data;
         PointerArray toSend;
         PointerArray toSend;
         unsigned totalSent = cleanRetryData(permit, toSend);
         unsigned totalSent = cleanRetryData(permit, toSend);
         while (toSend.length() < maxPackets && dataQueued())
         while (toSend.length() < maxPackets && dataQueued())
@@ -213,7 +213,7 @@ public:
             if (isRetry)
             if (isRetry)
             {
             {
                 if (checkTraceLevel(TRACE_RETRY_DATA, 1))
                 if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                    DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.destNodeIndex, header->udpSequence);
+                    DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.sb.destNodeIndex, header->udpSequence);
                 atomic_inc(&packetsRetried);
                 atomic_inc(&packetsRetried);
             }
             }
             else
             else
@@ -264,7 +264,7 @@ public:
                 else
                 else
                 {
                 {
                     if (udpTraceLevel > 0)
                     if (udpTraceLevel > 0)
-                        DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.destNodeIndex, header->udpSequence);
+                        DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.sb.destNodeIndex, header->udpSequence);
                     ::Release(retryData[slot]);
                     ::Release(retryData[slot]);
                 }
                 }
                 retryData[slot] = buffer;
                 retryData[slot] = buffer;
@@ -372,7 +372,7 @@ public:
         retryData = NULL;
         retryData = NULL;
     }
     }
 
 
-    void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal) 
+    void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
     {
     {
         assert(!initialized);
         assert(!initialized);
         maxRetryData = _maxRetryData;
         maxRetryData = _maxRetryData;
@@ -948,20 +948,20 @@ class CSendManager : public CInterface, implements ISendManager
                     {
                     {
                         unsigned int res ;
                         unsigned int res ;
                         flow_socket->read(&f, 1, sizeof(f), res, 5);
                         flow_socket->read(&f, 1, sizeof(f), res, 5);
-                        assertex(res == f.length);
+                        assertex(res == f.sb.length);
 #ifdef CRC_MESSAGES
 #ifdef CRC_MESSAGES
-                        assertex(f.crc == f.calcCRC());
+                        assertex(f.sb.crc == f.calcCRC());
 #endif
 #endif
-                        switch (f.cmd) 
+                        switch (f.sb.cmd)
                         {
                         {
                         case flow_t::ok_to_send:
                         case flow_t::ok_to_send:
                             if (udpTraceLevel > 1) 
                             if (udpTraceLevel > 1) 
-                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
+                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.sb.max_data, f.sb.destNodeIndex, res);
                             parent.data->ok_to_send(f);
                             parent.data->ok_to_send(f);
                             break;
                             break;
 
 
                         default: 
                         default: 
-                            DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
+                            DBGLOG("UdpSender: received unknown flow message type=%d", f.sb.cmd);
                         }
                         }
                     }
                     }
                     catch (IException *e) 
                     catch (IException *e) 
@@ -1060,7 +1060,7 @@ class CSendManager : public CInterface, implements ISendManager
                 return true;
                 return true;
             else 
             else 
             {
             {
-                DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
+                DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.sb.destNodeIndex, msg.sb.max_data);
                 return false;
                 return false;
             }
             }
         }
         }
@@ -1081,19 +1081,19 @@ class CSendManager : public CInterface, implements ISendManager
 
 
                 if (udpSnifferEnabled)
                 if (udpSnifferEnabled)
                     send_sniff(true);
                     send_sniff(true);
-                parent.send_flow->clear_to_send_received(permit.destNodeIndex);
-                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
+                parent.send_flow->clear_to_send_received(permit.sb.destNodeIndex);
+                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.sb.destNodeIndex];
                 bool moreRequested;
                 bool moreRequested;
                 unsigned maxPackets;
                 unsigned maxPackets;
-                unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
+                unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.sb.destNodeIndex), bucket, moreRequested, maxPackets);
                 if (udpSendCompletedInData && !maxPackets)
                 if (udpSendCompletedInData && !maxPackets)
-                    parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
-                parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
+                    parent.sendRequest(permit.sb.destNodeIndex, flow_t::send_completed);
+                parent.send_flow->send_done(permit.sb.destNodeIndex, moreRequested);
                 if (udpSnifferEnabled)
                 if (udpSnifferEnabled)
                     send_sniff(false);
                     send_sniff(false);
                 
                 
                 if (udpTraceLevel > 1) 
                 if (udpTraceLevel > 1) 
-                    DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
+                    DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.sb.destNodeIndex);
                 
                 
             }
             }
             if (udpTraceLevel > 0)
             if (udpTraceLevel > 0)