jbroadcast.ipp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef JBROADCAST_IPP
  14. #define JBROADCAST_IPP
  15. #include "jarray.hpp"
  16. #include "jbuff.hpp"
  17. #include "jsuperhash.hpp"
  18. #include "jbroadcast.hpp"
  19. enum MCPacket_Cmd { MCPacket_None, MCPacket_Poll, MCPacket_Stop } ;
  20. #define MC_PACKET_SIZE 64000
  21. #define MC_ACK_PACKET_SIZE 64000
  22. struct MCPacketHeader
  23. {
  24. bctag_t tag; // filters to different client receivers
  25. unsigned jobId; // sequenced per broadcast to identify new streams from old resent packets in old streams
  26. unsigned id; // sequence id
  27. unsigned length; // length of data in packet
  28. unsigned offset; // offset of data in stream
  29. unsigned total; // total # of packets in stream
  30. byte cmd; // optional cmd (e.g. poll)
  31. };
  32. class CDataPacket : public CInterface
  33. {
  34. public:
  35. CDataPacket()
  36. {
  37. header = (MCPacketHeader *)mb.reserveTruncate(MC_PACKET_SIZE);
  38. pktData = ((byte*)header)+sizeof(MCPacketHeader);
  39. }
  40. MCPacketHeader *header;
  41. void *queryData() { return pktData; }
  42. MCPacketHeader *detachPacket() { return (MCPacketHeader *) mb.detach(); }
  43. private:
  44. void *pktData;
  45. MemoryBuffer mb;
  46. };
  47. struct MCAckPacketHeader
  48. {
  49. unsigned node;
  50. bctag_t tag;
  51. unsigned jobId;
  52. bool ackDone;
  53. };
  54. class CUIntValue : public CInterface
  55. {
  56. unsigned value;
  57. public:
  58. CUIntValue(unsigned _value) : value(_value) { }
  59. const unsigned &queryValue() { return value; }
  60. const void *queryFindParam() const { return &value; }
  61. };
  62. typedef OwningSimpleHashTableOf<CUIntValue, unsigned> CUIntTable;
  63. class CMCastReceiver : public CInterface, implements IBroadcastReceiver
  64. {
  65. public:
  66. IMPLEMENT_IINTERFACE;
  67. CMCastReceiver(bctag_t _tag);
  68. ~CMCastReceiver();
  69. bool packetReceived(CDataPacket &dataPacket, bool &complete);
  70. bool buildNack(MCAckPacketHeader *ackPacket, size32_t &sz, unsigned total=0);
  71. void reset();
  72. const void *queryFindParam() const { return &tag; }
  73. // IBroadcastReceiver impl.
  74. virtual bool eos();
  75. virtual bool read(MemoryBuffer &mb);
  76. virtual void stop();
  77. private:
  78. CIArrayOf<CDataPacket> dataPackets;
  79. UnsignedArray pktsReceived;
  80. Semaphore receivedSem;
  81. bctag_t tag;
  82. unsigned nextPacket;
  83. CriticalSection crit;
  84. bool aborted, eosHit;
  85. CTimeMon logTmRecv, logTmCons;
  86. };
  87. class CMCastRecvServer : public Thread
  88. {
  89. CUIntTable oldJobIds; // could expire these after a short time.
  90. StringAttr broadcastRoot;
  91. bool stopped;
  92. Owned<ISocket> sock, ackSock;
  93. SimpleHashTableOf<CMCastReceiver, bctag_t> receivers;
  94. CIArrayOf<CDataPacket> dataPackets;
  95. CriticalSection receiversCrit;
  96. unsigned groupMember;
  97. SocketEndpoint mcastEp;
  98. unsigned ackPort;
  99. public:
  100. IMPLEMENT_IINTERFACE;
  101. CMCastRecvServer(const char *_broadcastRoot, unsigned groupMember, SocketEndpoint &mcastEp, unsigned broadcastAckPort);
  102. CMCastReceiver *getReceiver(bctag_t tag);
  103. void registerReceiver(CMCastReceiver &receiver);
  104. void deregisterReceiver(CMCastReceiver &receiver);
  105. void stop();
  106. virtual int run();
  107. };
  108. class CUIntTableItem : public CUIntTable
  109. {
  110. unsigned pkt;
  111. public:
  112. CUIntTableItem(unsigned _pkt) : pkt(_pkt) { }
  113. const void *queryFindParam() const { return &pkt; }
  114. const unsigned &queryPacket() { return pkt; }
  115. };
  116. typedef OwningSimpleHashTableOf<CUIntTableItem, unsigned> CPktNodeTable;
  117. class CCountedItem : public CUIntValue
  118. {
  119. public:
  120. unsigned count;
  121. CCountedItem(unsigned value) : CUIntValue(value) { count = 0; }
  122. };
  123. class CCountTable : public OwningSimpleHashTableOf<CCountedItem, unsigned>
  124. {
  125. public:
  126. inline unsigned incItem(unsigned v)
  127. {
  128. CCountedItem *c = find(v);
  129. if (!c) c = new CCountedItem(v);
  130. add(* c);
  131. c->count++;
  132. return c->count;
  133. }
  134. };
  135. #endif