jbsocket.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlog.hpp"
  14. #include "jsocket.hpp"
  15. #define BSOCKET_BUFSIZE 1024
  16. class BufferedSocket : implements IBufferedSocket, public CInterface
  17. {
  18. private:
  19. char m_buf[BSOCKET_BUFSIZE + 1];
  20. unsigned short m_endptr;
  21. unsigned short m_curptr;
  22. unsigned int m_timeout;
  23. ISocket* m_socket;
  24. public:
  25. IMPLEMENT_IINTERFACE;
  26. BufferedSocket(ISocket* socket);
  27. virtual int readline(char* buf, int maxlen, IMultiException *me)
  28. { return readline(buf, maxlen, false, me); }
  29. virtual int readline(char* buf, int maxlen, bool keepcrlf, IMultiException *me);
  30. //always make the size of buf at lease maxlen+1
  31. virtual int read(char* buf, int maxlen);
  32. virtual void setReadTimeout(unsigned int timeout)
  33. { m_timeout = timeout; }
  34. };
  35. BufferedSocket::BufferedSocket(ISocket* socket)
  36. {
  37. m_timeout = BSOCKET_READ_TIMEOUT;
  38. if(socket == NULL)
  39. {
  40. throw MakeStringException(-1, "can't create BufferedSocket from NULL socket");
  41. }
  42. m_socket = socket;
  43. m_endptr = 0;
  44. m_curptr = 0;
  45. };
  46. //always make the size of buf at lease maxlen+1
  47. int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiException *me)
  48. {
  49. if(maxlen <= 0)
  50. return 0;
  51. int ptr = 0;
  52. try
  53. {
  54. while(ptr < maxlen)
  55. {
  56. bool foundCRLF = false;
  57. while(ptr < maxlen && m_curptr < m_endptr)
  58. {
  59. if(m_buf[m_curptr] == '\r') // standard case, \r\n marks a new header line.
  60. {
  61. m_curptr++;
  62. if(keepcrlf)
  63. buf[ptr++] = '\r';
  64. foundCRLF = true;
  65. //Skip \n
  66. if(m_curptr < m_endptr)
  67. {
  68. if(m_buf[m_curptr] == '\n')
  69. {
  70. m_curptr++;
  71. if(keepcrlf)
  72. buf[ptr++] = '\n';
  73. }
  74. }
  75. else
  76. {
  77. m_curptr = 0;
  78. m_endptr = 0;
  79. unsigned readlen;
  80. m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
  81. if(readlen > 0)
  82. {
  83. m_endptr = readlen;
  84. if(m_buf[m_curptr] == '\n')
  85. {
  86. m_curptr++;
  87. if(keepcrlf)
  88. buf[ptr++] = '\n';
  89. }
  90. }
  91. }
  92. break;
  93. }
  94. else if(m_buf[m_curptr] == '\n') // deal with non-standard case, when only a \n marks a new line.
  95. {
  96. m_curptr++;
  97. if(keepcrlf)
  98. buf[ptr++] = '\n';
  99. foundCRLF = true;
  100. break;
  101. }
  102. buf[ptr++] = m_buf[m_curptr++];
  103. }
  104. if(foundCRLF)
  105. break;
  106. // If no data left, read more
  107. if(m_curptr == m_endptr)
  108. {
  109. m_curptr = 0;
  110. m_endptr = 0;
  111. unsigned readlen;
  112. m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
  113. if(readlen <= 0)
  114. break;
  115. m_endptr = readlen;
  116. }
  117. }
  118. }
  119. catch (IException *e)
  120. {
  121. StringBuffer estr;
  122. int ret = -1;
  123. switch (e->errorCode())
  124. {
  125. //expected:
  126. case JSOCKERR_not_opened: //= -1, // accept,name,peer_name,read,write
  127. case JSOCKERR_broken_pipe: //= -4, // read,write
  128. case JSOCKERR_timeout_expired: //= -6, // read
  129. case JSOCKERR_graceful_close: //= -10 // read,send
  130. {
  131. DBGLOG("socket(%d) : Exception(%d, %s)", m_socket->OShandle(), e->errorCode(), e->errorMessage(estr).str());
  132. buf[ptr] = 0;
  133. ret = ptr;
  134. break; //these errors should not be trapped as errors, but need to be logged.
  135. }
  136. //unexpected:
  137. case JSOCKERR_ok:
  138. case JSOCKERR_bad_address: //= -2, // connect
  139. case JSOCKERR_connection_failed: //= -3, // connect
  140. case JSOCKERR_invalid_access_mode: //= -5, // accept
  141. case JSOCKERR_port_in_use: //= -7, // create
  142. case JSOCKERR_cancel_accept: //= -8, // accept
  143. case JSOCKERR_connectionless_socket: //= -9, // accept, cancel_accept
  144. default:
  145. {
  146. IERRLOG("In BufferedSocket::readline() -- Exception(%d, %s) reading from socket(%d).", e->errorCode(), e->errorMessage(estr).str(), m_socket->OShandle());
  147. break;
  148. }
  149. }
  150. if (me)
  151. me->append(*e);
  152. else
  153. e->Release();
  154. return ret;
  155. }
  156. catch(...)
  157. {
  158. IERRLOG("In BufferedSocket::readline() -- Unknown exception reading from socket(%d).", m_socket->OShandle());
  159. return -1;
  160. }
  161. buf[ptr] = 0;
  162. return ptr;
  163. }
  164. //always make the size of buf at lease maxlen+1
  165. int BufferedSocket::read(char* buf, int maxlen)
  166. {
  167. if(maxlen <= 0)
  168. return 0;
  169. int ptr = 0;
  170. while(ptr < maxlen)
  171. {
  172. while(ptr < maxlen && m_curptr < m_endptr)
  173. {
  174. buf[ptr++] = m_buf[m_curptr++];
  175. }
  176. if(ptr >= maxlen)
  177. break;
  178. // If no data left, read more
  179. if(m_curptr == m_endptr)
  180. {
  181. m_curptr = 0;
  182. m_endptr = 0;
  183. unsigned readlen;
  184. try
  185. {
  186. m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
  187. }
  188. catch (IException *e)
  189. {
  190. StringBuffer estr;
  191. int ret = -1;
  192. switch (e->errorCode())
  193. {
  194. case JSOCKERR_graceful_close: //= -10 // read,send
  195. {
  196. buf[ptr] = 0;
  197. ret = ptr;
  198. break;
  199. }
  200. //expected:
  201. case JSOCKERR_not_opened: //= -1, // accept,name,peer_name,read,write
  202. case JSOCKERR_broken_pipe: //= -4, // read,write
  203. case JSOCKERR_timeout_expired: //= -6, // read
  204. {
  205. DBGLOG("socket(%d) : Exception(%d, %s)", m_socket->OShandle(), e->errorCode(), e->errorMessage(estr).str());
  206. buf[ptr] = 0;
  207. ret = ptr;
  208. break;
  209. }
  210. //unexpected:
  211. case JSOCKERR_ok:
  212. case JSOCKERR_bad_address: //= -2, // connect
  213. case JSOCKERR_connection_failed: //= -3, // connect
  214. case JSOCKERR_invalid_access_mode: //= -5, // accept
  215. case JSOCKERR_port_in_use: //= -7, // create
  216. case JSOCKERR_cancel_accept: //= -8, // accept
  217. case JSOCKERR_connectionless_socket: //= -9, // accept, cancel_accept
  218. default:
  219. {
  220. IERRLOG("In BufferedSocket::readline() -- Exception(%d, %s) reading from socket(%d).", e->errorCode(), e->errorMessage(estr).str(), m_socket->OShandle());
  221. break;
  222. }
  223. }
  224. e->Release();
  225. return ret;
  226. }
  227. catch(...)
  228. {
  229. IERRLOG("In BufferedSocket::read() -- Unknown exception reading from socket(%d).", m_socket->OShandle());
  230. return -1;
  231. }
  232. if(readlen <= 0)
  233. break;
  234. m_endptr = readlen;
  235. }
  236. }
  237. buf[ptr] = 0;
  238. return ptr;
  239. }
  240. IBufferedSocket* createBufferedSocket(ISocket* socket)
  241. {
  242. return new BufferedSocket(socket);
  243. }