rmtclient_impl.hpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef RMTCLIENT_IMPL_HPP
  14. #define RMTCLIENT_IMPL_HPP
  15. #include "rmtclient.hpp"
  16. #ifdef _DEBUG
  17. //#define SIMULATE_PACKETLOSS 1
  18. #endif
  19. #if SIMULATE_PACKETLOSS
  20. #define TESTING_FAILURE_RATE_LOST_SEND 10 // per 1000
  21. #define TESTING_FAILURE_RATE_LOST_RECV 10 // per 1000
  22. #define DUMMY_TIMEOUT_MAX (1000*10)
  23. struct DAFSCLIENT_API dummyReadWrite
  24. {
  25. static ISocket *timeoutreadsock = NULL; // used to trigger
  26. class X
  27. {
  28. dummyReadWrite *parent;
  29. public:
  30. X(dummyReadWrite *_parent)
  31. {
  32. parent = _parent;
  33. }
  34. ~X()
  35. {
  36. delete parent;
  37. }
  38. };
  39. class TimeoutSocketException: public CInterface, public IJSOCK_Exception
  40. {
  41. public:
  42. IMPLEMENT_IINTERFACE;
  43. TimeoutSocketException()
  44. {
  45. }
  46. virtual ~TimeoutSocketException()
  47. {
  48. }
  49. int errorCode() const { return JSOCKERR_timeout_expired; }
  50. StringBuffer & errorMessage(StringBuffer &str) const
  51. {
  52. return str.append("timeout expired");
  53. }
  54. MessageAudience errorAudience() const
  55. {
  56. return MSGAUD_user;
  57. }
  58. };
  59. ISocket *sock;
  60. dummyReadWrite(ISocket *_sock)
  61. {
  62. sock = _sock;
  63. }
  64. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, time_t timeout)
  65. {
  66. X x(this);
  67. unsigned t = msTick();
  68. unsigned r = getRandom();
  69. bool timeoutread = (timeoutreadsock==sock);
  70. timeoutreadsock=NULL;
  71. if (!timeoutread)
  72. sock->readtms(buf, min_size, max_size, size_read, timeout);
  73. if (timeoutread||((TESTING_FAILURE_RATE_LOST_RECV>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_RECV))) {
  74. PrintStackReport();
  75. if (timeoutread)
  76. PROGLOG("** Simulate timeout");
  77. else
  78. PROGLOG("** Simulate Packet loss (size %d,%d)",min_size,max_size);
  79. if (timeout>DUMMY_TIMEOUT_MAX)
  80. timeout = DUMMY_TIMEOUT_MAX;
  81. t = msTick()-t;
  82. if (t<timeout)
  83. Sleep(timeout-t);
  84. IJSOCK_Exception *e = new TimeoutSocketException;
  85. throw e;
  86. }
  87. }
  88. size32_t write(void const* buf, size32_t size)
  89. {
  90. X x(this);
  91. timeoutreadsock=NULL;
  92. unsigned r = getRandom();
  93. if ((TESTING_FAILURE_RATE_LOST_SEND>0)&&(r%1000<TESTING_FAILURE_RATE_LOST_SEND)) {
  94. PrintStackReport();
  95. PROGLOG("** Simulate Packet loss (size %d)",size);
  96. timeoutreadsock=sock;
  97. return size;
  98. }
  99. return sock->write(buf,size);
  100. }
  101. };
  102. #define SOCKWRITE(sock) (new dummyReadWrite(sock))->write
  103. #define SOCKREADTMS(sock) (new dummyReadWrite(sock))->readtms
  104. #else
  105. #define SOCKWRITE(sock) sock->write
  106. #define SOCKREADTMS(sock) sock->readtms
  107. #endif
  108. #ifdef SIMULATE_PACKETLOSS
  109. #define NORMAL_RETRIES (1)
  110. #define LENGTHY_RETRIES (1)
  111. #else
  112. #define NORMAL_RETRIES (3)
  113. #define LENGTHY_RETRIES (12)
  114. #endif
  115. extern DAFSCLIENT_API void sendDaFsBuffer(ISocket * socket, MemoryBuffer & src, bool testSocketFlag=false);
  116. extern DAFSCLIENT_API size32_t receiveDaFsBufferSize(ISocket * socket, unsigned numtries=NORMAL_RETRIES,CTimeMon *timemon=NULL);
  117. extern DAFSCLIENT_API void receiveDaFsBuffer(ISocket * socket, MemoryBuffer & tgt, unsigned numtries=1, size32_t maxsz=0x7fffffff);
  118. extern DAFSCLIENT_API void cleanupDaFsSocket(ISocket *sock);
  119. extern DAFSCLIENT_API byte traceFlags;
  120. #define TF_TRACE (traceFlags&1)
  121. #define TF_TRACE_PRE_IO (traceFlags&2)
  122. #define TF_TRACE_FULL (traceFlags&4)
  123. #define TF_TRACE_CLIENT_CONN (traceFlags&8)
  124. #define TF_TRACE_TREE_COPY (traceFlags&0x10)
  125. #define TF_TRACE_CLIENT_STATS (traceFlags&0x20)
  126. class CRemoteBase : public CSimpleInterfaceOf<IDaFsConnection>
  127. {
  128. Owned<ISocket> socket;
  129. static SocketEndpoint lastfailep;
  130. static unsigned lastfailtime;
  131. static CriticalSection lastFailEpCrit;
  132. DAFSConnectCfg connectMethod;
  133. void connectSocket(SocketEndpoint &ep, unsigned connectTimeoutMs=0, unsigned connectRetries=INFINITE);
  134. void killSocket(SocketEndpoint &tep);
  135. protected: friend class CRemoteFileIO;
  136. StringAttr filename;
  137. CriticalSection crit;
  138. SocketEndpoint ep;
  139. void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false, bool handleErrCode=true);
  140. void sendRemoteCommand(MemoryBuffer & src, bool retry);
  141. public:
  142. CRemoteBase(const SocketEndpoint &_ep, const char * _filename);
  143. CRemoteBase(const SocketEndpoint &_ep, DAFSConnectCfg _connectMethod, const char * _filename);
  144. void disconnect();
  145. const char *queryLocalName()
  146. {
  147. return filename;
  148. }
  149. // IDaFsConnection impl.
  150. virtual void close(int handle) override;
  151. virtual void send(MemoryBuffer &sendMb, MemoryBuffer &reply) override;
  152. virtual unsigned getVersion(StringBuffer &ver) override;
  153. virtual const SocketEndpoint &queryEp() const override;
  154. };
  155. typedef enum { ACScontinue, ACSdone, ACSerror} AsyncCommandStatus;
  156. extern void clientSetDaliServixSocketCaching(bool set);
  157. extern void clientDisconnectRemoteFile(IFile *file);
  158. extern void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set);
  159. extern bool clientResetFilename(IFile *file, const char *newname); // returns false if not remote
  160. extern bool clientAsyncCopyFileSection(const char *uuid, // from genUUID - must be same for subsequent calls
  161. IFile *from, // expected to be remote
  162. RemoteFilename &to,
  163. offset_t toofs, // (offset_t)-1 created file and copies to start
  164. offset_t fromofs,
  165. offset_t size, // (offset_t)-1 for all file
  166. ICopyFileProgress *progress,
  167. unsigned timeout // 0 to start, non-zero to wait
  168. ); // returns true when done
  169. extern void clientSetRemoteFileTimeouts(unsigned maxconnecttime, unsigned maxreadtime);
  170. extern void clientAddSocketToCache(SocketEndpoint &ep, ISocket *socket);
  171. #endif //