mpcomm.hpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef MPCOMM_HPP
  14. #define MPCOMM_HPP
  15. #ifndef mp_decl
  16. #define mp_decl DECL_IMPORT
  17. #endif
  18. #include "mpbase.hpp"
  19. #include "mpbuff.hpp"
  20. #include "mptag.hpp"
  21. // timeout values
  22. #define MP_WAIT_FOREVER ((unsigned)-1)
  23. #define MP_ASYNC_SEND ((unsigned)-2)
  24. interface ICommunicator: extends IInterface
  25. {
  26. virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) = 0;
  27. // blocking send (unless MP_ASYNC_SEND used for timeout), NB, mbuf clear on exit
  28. // returns false if timedout
  29. virtual unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender=NULL, unsigned timeout=0) = 0;
  30. // default non-blocking, check message returns sender if message waiting
  31. // returns 0 if no message available in time given (0 for poll) otherwise number waiting
  32. virtual bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER) = 0;
  33. // receive, returns senders rank or false if no message available in time given or cancel called
  34. virtual IGroup &queryGroup() = 0; // query group for communicator
  35. virtual IGroup *getGroup() = 0; // link and return group for communicator
  36. virtual bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER) = 0;
  37. virtual bool reply (CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER) = 0;
  38. virtual void cancel (rank_t srcrank, mptag_t tag) = 0; // cancels in-progress recvs
  39. virtual void flush (mptag_t tag) = 0; // flushes pending buffers
  40. virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) = 0; // verifies connected to rank
  41. virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30, unsigned perConnectionTimeout=0) = 0;
  42. virtual void disconnect(INode *node) = 0;
  43. virtual void barrier() = 0;
  44. virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) const = 0;
  45. };
  46. interface IInterCommunicator: extends IInterface
  47. // Non-grouped communication
  48. {
  49. virtual bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) = 0;
  50. // blocking send (unless MP_ASYNC_SEND used for timeout), NB, mbuf clear on exit
  51. // returns false if timedout
  52. virtual unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0) = 0;
  53. // default non-blocking, check message returns sender if message waiting
  54. // returns 0 if no message available or recv cancelled, or number waiting
  55. virtual bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER) = 0;
  56. // receive, returns false if no message available in time given
  57. virtual bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER) = 0;
  58. virtual bool reply (CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER) = 0;
  59. virtual void cancel (INode *src, mptag_t tag) = 0; // cancels in-progress recvs
  60. virtual void flush (mptag_t tag) = 0; // flushes pending buffers
  61. virtual bool verifyConnection (INode *node, unsigned timeout=1000*60*5) = 0; // verifies connected to node
  62. virtual bool verifyAll(IGroup *group,bool duplex=false, unsigned timeout=1000*60*30) = 0;
  63. virtual void verifyAll(StringBuffer &log) = 0;
  64. virtual void disconnect(INode *node) = 0;
  65. };
  66. extern mp_decl mptag_t createReplyTag(); // creates (short-lived) reply-tag;
  67. extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false); // outer allows nodes outside group to send
  68. extern mp_decl IInterCommunicator &queryWorldCommunicator();
  69. extern mp_decl bool hasMPServerStarted();
  70. enum MPServerOpts { mpsopt_null, mpsopt_channelreopen };
  71. interface IMPServer : extends IInterface
  72. {
  73. virtual mptag_t createReplyTag() = 0;
  74. virtual ICommunicator *createCommunicator(IGroup *group, bool outer=false) = 0;
  75. virtual void stop() = 0;
  76. virtual INode *queryMyNode() = 0;
  77. virtual void setOpt(MPServerOpts opt, const char *value) = 0;
  78. virtual void installAllowListCallback(IAllowListHandler *allowListCallback) = 0;
  79. virtual IAllowListHandler *queryAllowListCallback() const = 0;
  80. };
  81. extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false);
  82. extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false);
  83. extern mp_decl void stopMPServer();
  84. extern mp_decl IMPServer *getMPServer();
  85. extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false);
  86. interface IConnectionMonitor: extends IInterface
  87. {
  88. virtual void onClose(SocketEndpoint &ep)=0;
  89. };
  90. extern mp_decl void addMPConnectionMonitor(IConnectionMonitor *monitor);
  91. extern mp_decl void removeMPConnectionMonitor(IConnectionMonitor *monitor);
  92. extern mp_decl StringBuffer &getReceiveQueueDetails(StringBuffer &buf);
  93. interface IMPProtocol: extends IInterface
  94. {
  95. virtual void send (CMessageBuffer &mbuf, rank_t dstrank)=0;
  96. virtual void sendStop (rank_t dstrank)=0;
  97. virtual bool recv (CMessageBuffer &mbuf, rank_t &sender)=0; // returns false if stopped
  98. virtual void confirm (CMessageBuffer &mbuf)=0; // confirms recv
  99. virtual unsigned remaining()=0; // number not stopped
  100. };
  101. extern mp_decl void registerSelfDestructChildProcess(HANDLE handle);
  102. extern mp_decl void unregisterSelfDestructChildProcess(HANDLE handle);
  103. #endif