udplib.hpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef UDPMSGPK_INCL
  14. #define UDPMSGPK_INCL
  15. #include "jiface.hpp"
  16. #include "jstring.hpp"
  17. #include "jisem.hpp"
  18. #include "jsocket.hpp"
  19. #include "roxiemem.hpp"
  20. #ifdef UDPLIB_EXPORTS
  21. #define UDPLIB_API DECL_EXPORT
  22. #else
  23. #define UDPLIB_API DECL_IMPORT
  24. #endif
  25. typedef unsigned ruid_t; // at 1000/sec recycle every 49 days
  26. #define RUIDF "0x%.8x"
  27. #define RUID_PING 0
  28. #define RUID_DISCARD 1
  29. #define RUID_FIRST 2
  30. typedef unsigned RecordLengthType;
  31. #define MAX_RECORD_LENGTH 0xffffffff
  32. class UDPLIB_API ServerIdentifier
  33. {
  34. private:
  35. IpAddress serverIp; // MORE - should really be an endpoint?
  36. public:
  37. ServerIdentifier() : serverIp() { }
  38. ServerIdentifier(const ServerIdentifier &from) : serverIp(from.serverIp) { }
  39. ServerIdentifier(const IpAddress &from) : serverIp(from) { }
  40. const IpAddress &getNodeAddress() const;
  41. const ServerIdentifier & operator=(const ServerIdentifier &from)
  42. {
  43. serverIp = from.serverIp;
  44. return *this;
  45. }
  46. bool operator==(const ServerIdentifier &from) const
  47. {
  48. return serverIp.ipequals(from.serverIp);
  49. }
  50. unsigned hash() const
  51. {
  52. return serverIp.iphash(0);
  53. }
  54. inline void setIp(const IpAddress &_ip)
  55. {
  56. serverIp = _ip;
  57. }
  58. StringBuffer &getTraceText(StringBuffer &s) const
  59. {
  60. return serverIp.getIpText(s);
  61. }
  62. };
  63. extern UDPLIB_API ServerIdentifier myNode;
  64. interface IMessagePacker : extends IInterface
  65. {
  66. virtual void *getBuffer(unsigned len, bool variable) = 0;
  67. virtual void putBuffer(const void *buf, unsigned len, bool variable) = 0;
  68. virtual void flush() = 0;
  69. virtual void sendMetaInfo(const void *buf, unsigned len) = 0;
  70. virtual unsigned size() const = 0; // Total amount written via putBuffer plus any overhead from record length prefixes
  71. // Notes:
  72. // 1. Parameters to putBuffer must have been returned from getBuffer.
  73. // 2 putBuffer must me called before any call to flush
  74. // 3. There is an implicit abort if is released without final flush
  75. // 4 This interface is single threaded
  76. // 5. call to getbuffer, without call to makes the previous call to getBuffer and NULL operation
  77. // 6. if flush has been called with last_message true, then any calls to getBuffer or putBuffer is undefined.
  78. };
  79. interface IException;
  80. interface IMessageUnpackCursor : extends IInterface
  81. {
  82. virtual const void *getNext(int length) = 0;
  83. virtual bool atEOF() const = 0;
  84. virtual bool isSerialized() const = 0;
  85. // if one tries to read past the last record then NULL will be returned,
  86. // if one asks for more data than available then throws exception.
  87. };
  88. interface IMessageResult : extends IInterface
  89. {
  90. virtual IMessageUnpackCursor *getCursor(roxiemem::IRowManager *rowMgr) const = 0;
  91. virtual const void *getMessageHeader(unsigned &length) const = 0;
  92. virtual const void *getMessageMetadata(unsigned &length) const = 0;
  93. virtual void discard() const = 0;
  94. };
  95. interface IMessageCollator : extends IInterface
  96. {
  97. virtual IMessageResult *getNextResult(unsigned time_out, bool &anyActivity) = 0;
  98. virtual void interrupt(IException *E = NULL) = 0;
  99. virtual ruid_t queryRUID() const = 0;
  100. virtual unsigned queryBytesReceived() const = 0;
  101. };
  102. interface IReceiveManager : extends IInterface
  103. {
  104. virtual IMessageCollator *createMessageCollator(roxiemem::IRowManager *rowManager, ruid_t ruid) = 0;
  105. virtual void detachCollator(const IMessageCollator *collator) = 0;
  106. };
  107. // Opaque data structure that SendManager gives to message packer describing how to talk to a particular target node
  108. interface IUdpReceiverEntry
  109. {
  110. };
  111. interface ISendManager : extends IInterface
  112. {
  113. virtual IMessagePacker *createMessagePacker(ruid_t id, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) = 0;
  114. virtual void writeOwn(IUdpReceiverEntry &receiver, roxiemem::DataBuffer *buffer, unsigned len, unsigned queue) = 0;
  115. virtual bool dataQueued(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) = 0;
  116. virtual bool abortData(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) = 0;
  117. virtual bool allDone() = 0;
  118. };
  119. extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender);
  120. extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter);
  121. extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
  122. extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep);
  123. extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP);
  124. extern UDPLIB_API RelaxedAtomic<unsigned> unwantedDiscarded;
  125. extern UDPLIB_API unsigned udpTraceLevel;
  126. extern UDPLIB_API unsigned udpTraceCategories;
  127. extern UDPLIB_API unsigned udpOutQsPriority;
  128. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats);
  129. extern UDPLIB_API unsigned multicastTTL;
  130. #if defined( __linux__) || defined(__APPLE__)
  131. extern UDPLIB_API void setLinuxThreadPriority(int level);
  132. #endif
  133. extern UDPLIB_API unsigned udpFlowSocketsSize;
  134. extern UDPLIB_API unsigned udpLocalWriteSocketSize;
  135. extern UDPLIB_API unsigned udpMaxRetryTimedoutReqs;
  136. extern UDPLIB_API unsigned udpRequestToSendTimeout;
  137. extern UDPLIB_API unsigned udpRequestToSendAckTimeout;
  138. extern UDPLIB_API unsigned udpRetryBusySenders;
  139. extern UDPLIB_API bool udpSnifferEnabled;
  140. extern UDPLIB_API unsigned udpSnifferReadThreadPriority;
  141. extern UDPLIB_API unsigned udpSnifferSendThreadPriority;
  142. extern UDPLIB_API void stopAeronDriver();
  143. #endif