udplib.hpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef UDPMSGPK_INCL
  14. #define UDPMSGPK_INCL
  15. #include "jiface.hpp"
  16. #include "jstring.hpp"
  17. #include "jisem.hpp"
  18. #include "jsocket.hpp"
  19. #include "roxiemem.hpp"
  20. #ifdef UDPLIB_EXPORTS
  21. #define UDPLIB_API DECL_EXPORT
  22. #else
  23. #define UDPLIB_API DECL_IMPORT
  24. #endif
  25. typedef unsigned ruid_t; // at 1000/sec recycle every 49 days
  26. #define RUIDF "0x%.8x"
  27. #define RUID_PING 0
  28. #define RUID_DISCARD 1
  29. #define RUID_FIRST 2
  30. typedef unsigned RecordLengthType;
  31. #define MAX_RECORD_LENGTH 0xffffffff
  32. class UDPLIB_API ServerIdentifier
  33. {
  34. private:
  35. unsigned netAddress = 0;
  36. public:
  37. ServerIdentifier() { }
  38. ServerIdentifier(const ServerIdentifier &from) : netAddress(from.netAddress) { }
  39. ServerIdentifier(const IpAddress &from) { setIp(from); }
  40. const IpAddress getIpAddress() const;
  41. unsigned getIp4() const { return netAddress; };
  42. inline bool isNull() const { return netAddress==0; }
  43. inline void clear() { netAddress=0; }
  44. const ServerIdentifier & operator=(const ServerIdentifier &from)
  45. {
  46. netAddress = from.netAddress;
  47. return *this;
  48. }
  49. bool operator==(const ServerIdentifier &from) const
  50. {
  51. return netAddress == from.netAddress;
  52. }
  53. unsigned hash() const
  54. {
  55. return hashc((const byte *)&netAddress,sizeof(netAddress),0);
  56. }
  57. unsigned fasthash() const
  58. {
  59. return netAddress >> 24;
  60. }
  61. inline void setIp(const IpAddress &_ip)
  62. {
  63. netAddress = _ip.getIP4();
  64. }
  65. StringBuffer &getTraceText(StringBuffer &s) const
  66. {
  67. IpAddress serverIp;
  68. serverIp.setIP4(netAddress);
  69. return serverIp.getIpText(s);
  70. }
  71. bool isMe() const;
  72. };
  73. extern UDPLIB_API ServerIdentifier myNode;
  74. interface IMessagePacker : extends IInterface
  75. {
  76. virtual void *getBuffer(unsigned len, bool variable) = 0;
  77. virtual void putBuffer(const void *buf, unsigned len, bool variable) = 0;
  78. virtual void flush() = 0;
  79. virtual void sendMetaInfo(const void *buf, unsigned len) = 0;
  80. virtual unsigned size() const = 0; // Total amount written via putBuffer plus any overhead from record length prefixes
  81. // Notes:
  82. // 1. Parameters to putBuffer must have been returned from getBuffer.
  83. // 2 putBuffer must me called before any call to flush
  84. // 3. There is an implicit abort if is released without final flush
  85. // 4 This interface is single threaded
  86. // 5. call to getbuffer, without call to makes the previous call to getBuffer and NULL operation
  87. // 6. if flush has been called with last_message true, then any calls to getBuffer or putBuffer is undefined.
  88. };
  89. interface IException;
  90. interface IMessageUnpackCursor : extends IInterface
  91. {
  92. virtual const void *getNext(int length) = 0;
  93. virtual bool atEOF() const = 0;
  94. virtual bool isSerialized() const = 0;
  95. // if one tries to read past the last record then NULL will be returned,
  96. // if one asks for more data than available then throws exception.
  97. };
  98. interface IMessageResult : extends IInterface
  99. {
  100. virtual IMessageUnpackCursor *getCursor(roxiemem::IRowManager *rowMgr) const = 0;
  101. virtual const void *getMessageHeader(unsigned &length) const = 0;
  102. virtual const void *getMessageMetadata(unsigned &length) const = 0;
  103. virtual void discard() const = 0;
  104. };
  105. interface IMessageCollator : extends IInterface
  106. {
  107. virtual IMessageResult *getNextResult(unsigned time_out, bool &anyActivity) = 0;
  108. virtual void interrupt(IException *E = NULL) = 0;
  109. virtual ruid_t queryRUID() const = 0;
  110. virtual unsigned queryBytesReceived() const = 0;
  111. virtual unsigned queryDuplicates() const = 0;
  112. virtual unsigned queryResends() const = 0;
  113. };
  114. interface IReceiveManager : extends IInterface
  115. {
  116. virtual IMessageCollator *createMessageCollator(roxiemem::IRowManager *rowManager, ruid_t ruid) = 0;
  117. virtual void detachCollator(const IMessageCollator *collator) = 0;
  118. };
  119. // Opaque data structure that SendManager gives to message packer describing how to talk to a particular target node
  120. interface IUdpReceiverEntry
  121. {
  122. };
  123. interface ISendManager : extends IInterface
  124. {
  125. virtual IMessagePacker *createMessagePacker(ruid_t id, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) = 0;
  126. virtual void writeOwn(IUdpReceiverEntry &receiver, roxiemem::DataBuffer *buffer, unsigned len, unsigned queue) = 0;
  127. virtual bool dataQueued(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) = 0;
  128. virtual bool abortData(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) = 0;
  129. virtual void abortAll(const ServerIdentifier &destNode) = 0;
  130. virtual bool allDone() = 0;
  131. };
  132. extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int queue_size, bool encrypted);
  133. extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, const IpAddress &myIP, TokenBucket *rateLimiter, bool encryptionInTransit);
  134. extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
  135. extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted);
  136. extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted);
  137. #if defined( __linux__) || defined(__APPLE__)
  138. extern UDPLIB_API void setLinuxThreadPriority(int level);
  139. #endif
  140. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats);
  141. extern UDPLIB_API void stopAeronDriver();
  142. interface IRoxieQueryPacket;
  143. class RoxiePacketHeader;
  144. interface IPendingCallback;
  145. interface IRoxieContextLogger;
  146. interface IRoxieOutputQueueManager : public IInterface
  147. {
  148. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx) = 0;
  149. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) = 0;
  150. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx) = 0;
  151. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx) = 0;
  152. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &x, bool outOfBand, const IRoxieContextLogger &logctx) = 0;
  153. virtual bool replyPending(RoxiePacketHeader &x) = 0;
  154. virtual bool abortCompleted(RoxiePacketHeader &x) = 0;
  155. virtual unsigned getHeadRegionSize() const = 0;
  156. virtual void setHeadRegionSize(unsigned newsize) = 0;
  157. virtual void start() = 0;
  158. virtual void stop() = 0;
  159. virtual void join() = 0;
  160. virtual IReceiveManager *queryReceiveManager() = 0;
  161. virtual IPendingCallback *notePendingCallback(const RoxiePacketHeader &header, const char *lfn) = 0;
  162. virtual void removePendingCallback(IPendingCallback *x) = 0;
  163. virtual void abortPendingData(const SocketEndpoint &ep) = 0;
  164. };
  165. extern UDPLIB_API IRoxieOutputQueueManager *ROQ;
  166. //This following might be clearer if they were grouped into an options structure and a metrics structure
  167. // -- Options for controlling the UDP layer --------------------------------------------------------------------------
  168. // These are documented in detail at the head of udptrr.cpp
  169. extern UDPLIB_API bool udpTraceFlow;
  170. extern UDPLIB_API bool udpTraceTimeouts;
  171. extern UDPLIB_API unsigned udpTraceLevel;
  172. extern UDPLIB_API unsigned udpOutQsPriority;
  173. extern UDPLIB_API unsigned udpFlowSocketsSize;
  174. extern UDPLIB_API unsigned udpLocalWriteSocketSize;
  175. extern UDPLIB_API unsigned udpMaxPermitDeadTimeouts; // How many permit grants are allowed to expire (with no flow message) until sender is assumed down
  176. extern UDPLIB_API unsigned udpRequestDeadTimeout; // Timeout for sender getting no response to request to send before assuming that the receiver is dead
  177. // All in milliseconds
  178. extern UDPLIB_API unsigned udpFlowAckTimeout;
  179. extern UDPLIB_API unsigned updDataSendTimeout;
  180. extern UDPLIB_API unsigned udpRequestTimeout;
  181. extern UDPLIB_API unsigned udpPermitTimeout;
  182. extern UDPLIB_API unsigned udpResendDelay;
  183. extern UDPLIB_API bool udpResendLostPackets;
  184. extern UDPLIB_API unsigned udpMaxPendingPermits;
  185. extern UDPLIB_API unsigned udpMaxClientPercent;
  186. extern UDPLIB_API unsigned udpMinSlotsPerSender;
  187. extern UDPLIB_API bool udpAssumeSequential;
  188. extern UDPLIB_API bool udpResendAllMissingPackets;
  189. extern UDPLIB_API unsigned udpStatsReportInterval;
  190. extern UDPLIB_API bool udpAdjustThreadPriorities;
  191. extern UDPLIB_API bool udpAllowAsyncPermits;
  192. //Should be in ccd
  193. extern UDPLIB_API unsigned multicastTTL;
  194. // -- Reported metrics --------------------------------------------------------------------------
  195. extern UDPLIB_API RelaxedAtomic<unsigned> unwantedDiscarded;
  196. extern UDPLIB_API RelaxedAtomic<unsigned> packetsResent;
  197. extern UDPLIB_API RelaxedAtomic<unsigned> packetsOOO;
  198. extern UDPLIB_API RelaxedAtomic<unsigned> flowPermitsSent;
  199. extern UDPLIB_API RelaxedAtomic<unsigned> flowRequestsReceived;
  200. extern UDPLIB_API RelaxedAtomic<unsigned> dataPacketsReceived;
  201. extern UDPLIB_API RelaxedAtomic<unsigned> flowRequestsSent;
  202. extern UDPLIB_API RelaxedAtomic<unsigned> flowPermitsReceived;
  203. extern UDPLIB_API RelaxedAtomic<unsigned> dataPacketsSent;
  204. #endif