rmtclient.cpp 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // todo look at IRemoteFileServer stop
  14. #include "platform.h"
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jmisc.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jsecrets.hpp"
  24. #include "securesocket.hpp"
  25. #include "portlist.h"
  26. #include "jsocket.hpp"
  27. #include "jencrypt.hpp"
  28. #include "jlzw.hpp"
  29. #include "jset.hpp"
  30. #include "jhtree.hpp"
  31. #include "remoteerr.hpp"
  32. #include <atomic>
  33. #include <string>
  34. #include "dafscommon.hpp"
  35. #include "rmtclient_impl.hpp"
  36. #define SOCKET_CACHE_MAX 500
  37. #if SIMULATE_PACKETLOSS
  38. static ISocket *dummyReadWrite::timeoutreadsock = null; // used to trigger
  39. #endif
  40. void cleanupDaFsSocket(ISocket *sock)
  41. {
  42. shutdownAndCloseNoThrow(sock);
  43. }
  44. ///////////////////////////
  45. static const unsigned defaultDafsConnectTimeoutSeconds=100;
  46. static const unsigned defaultDafsConnectRetries=2;
  47. static const unsigned defaultDafsMaxRecieveTimeSeconds=0;
  48. static const unsigned defaultDafsConnectFailRetrySeconds=10;
  49. static unsigned dafsConnectTimeoutMs = defaultDafsConnectTimeoutSeconds * 1000;
  50. static unsigned dafsConnectRetries = defaultDafsConnectRetries;
  51. static unsigned dafsMaxReceiveTimeMs = defaultDafsMaxRecieveTimeSeconds * 1000;
  52. static unsigned dafsConnectFailRetryTimeMs = defaultDafsConnectFailRetrySeconds * 1000;
  53. MODULE_INIT(INIT_PRIORITY_DAFSCLIENT)
  54. {
  55. #ifdef _CONTAINERIZED
  56. //MORE: This function is called too soon to read them from the configuration file.
  57. #else
  58. const IProperties &confProps = queryEnvironmentConf();
  59. dafsConnectTimeoutMs = confProps.getPropInt("dafsConnectTimeoutSeconds", defaultDafsConnectTimeoutSeconds) * 1000;
  60. dafsConnectRetries = confProps.getPropInt("dafsConnectRetries", defaultDafsConnectRetries);
  61. dafsMaxReceiveTimeMs = confProps.getPropInt("dafsMaxReceiveTimeSeconds", defaultDafsMaxRecieveTimeSeconds);
  62. dafsConnectFailRetryTimeMs = confProps.getPropInt("daFsConnectFailRetrySeconds", defaultDafsConnectFailRetrySeconds) * 1000;
  63. #endif
  64. return true;
  65. }
  66. //Security and default port attributes
  67. static class _securitySettings
  68. {
  69. public:
  70. DAFSConnectCfg queryConnectMethod() { ensureReady(); return connectMethod; }
  71. unsigned short queryDaFileSrvPort() { ensureReady(); return daFileSrvPort; }
  72. unsigned short queryDaFileSrvSSLPort() { ensureReady(); return daFileSrvSSLPort; }
  73. const char * queryCertificate() { ensureReady(); return certificate; }
  74. const char * queryPrivateKey() { ensureReady(); return privateKey; }
  75. const char * queryPassPhrase() { ensureReady(); return passPhrase; }
  76. void ensureReady()
  77. {
  78. if (!init)
  79. {
  80. CriticalBlock block(cs);
  81. if (!init)
  82. {
  83. queryDafsSecSettings(&connectMethod, &daFileSrvPort, &daFileSrvSSLPort, &certificate, &privateKey, &passPhrase);
  84. init = true;
  85. }
  86. }
  87. }
  88. protected:
  89. DAFSConnectCfg connectMethod;
  90. unsigned short daFileSrvPort;
  91. unsigned short daFileSrvSSLPort;
  92. const char * certificate;
  93. const char * privateKey;
  94. const char * passPhrase;
  95. std::atomic<bool> init{false};
  96. CriticalSection cs;
  97. } securitySettings;
  98. static CriticalSection secureContextCrit;
  99. static Owned<ISecureSocketContext> secureContextClient;
  100. #ifdef _USE_OPENSSL
  101. static ISecureSocket *createSecureSocket(ISocket *sock)
  102. {
  103. {
  104. CriticalBlock b(secureContextCrit);
  105. if (!secureContextClient)
  106. {
  107. #ifdef _CONTAINERIZED
  108. IPropertyTree *info = queryTlsSecretInfo("local");
  109. if (!info)
  110. throw makeStringException(-1, "createSecureSocket() : missing MTLS configuration");
  111. secureContextClient.setown(createSecureSocketContextEx2(info, ClientSocket));
  112. #else
  113. secureContextClient.setown(createSecureSocketContextEx(securitySettings.queryCertificate(), securitySettings.queryPrivateKey(), securitySettings.queryPassPhrase(), ClientSocket));
  114. #endif
  115. }
  116. }
  117. int loglevel = SSLogNormal;
  118. #ifdef _DEBUG
  119. loglevel = SSLogMax;
  120. #endif
  121. return secureContextClient->createSecureSocket(sock, loglevel);
  122. }
  123. #else
  124. static ISecureSocket *createSecureSocket(ISocket *sock)
  125. {
  126. throwUnexpected();
  127. }
  128. #endif
  129. void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  130. {
  131. dafsConnectTimeoutMs = maxconnecttime;
  132. dafsMaxReceiveTimeMs = maxreadtime;
  133. }
  134. struct sRFTM
  135. {
  136. CTimeMon *timemon;
  137. sRFTM(unsigned limit) { timemon = limit ? new CTimeMon(limit) : NULL; }
  138. ~sRFTM() { delete timemon; }
  139. };
  140. // used by testsocket only
  141. RemoteFileCommandType queryRemoteStreamCmd()
  142. {
  143. return RFCStreamReadTestSocket;
  144. }
  145. #define CLIENT_TIMEOUT (1000*60*60*12) // long timeout in case zombies
  146. #define CLIENT_INACTIVEWARNING_TIMEOUT (1000*60*60*12) // time between logging inactive clients
  147. #define SERVER_TIMEOUT (1000*60*5) // timeout when waiting for dafilesrv to reply after command
  148. // (increased when waiting for large block)
  149. #ifdef SIMULATE_PACKETLOSS
  150. #define NORMAL_RETRIES (1)
  151. #define LENGTHY_RETRIES (1)
  152. #else
  153. #define NORMAL_RETRIES (3)
  154. #define LENGTHY_RETRIES (12)
  155. #endif
  156. #ifdef _DEBUG
  157. byte traceFlags=0x30;
  158. #else
  159. byte traceFlags=0x20;
  160. #endif
  161. #define TF_TRACE (traceFlags&1)
  162. #define TF_TRACE_PRE_IO (traceFlags&2)
  163. #define TF_TRACE_FULL (traceFlags&4)
  164. #define TF_TRACE_CLIENT_CONN (traceFlags&8)
  165. #define TF_TRACE_TREE_COPY (traceFlags&0x10)
  166. #define TF_TRACE_CLIENT_STATS (traceFlags&0x20)
  167. unsigned mapDafilesrvixCodes(unsigned err)
  168. {
  169. // old Solaris dali/remote/daliservix.cpp uses
  170. // different values for these error codes.
  171. switch (err)
  172. {
  173. case 8200:
  174. return RFSERR_InvalidCommand;
  175. case 8201:
  176. return RFSERR_NullFileIOHandle;
  177. case 8202:
  178. return RFSERR_InvalidFileIOHandle;
  179. case 8203:
  180. return RFSERR_TimeoutFileIOHandle;
  181. case 8204:
  182. return RFSERR_OpenFailed;
  183. case 8205:
  184. return RFSERR_ReadFailed;
  185. case 8206:
  186. return RFSERR_WriteFailed;
  187. case 8207:
  188. return RFSERR_RenameFailed;
  189. case 8208:
  190. return RFSERR_SetReadOnlyFailed;
  191. case 8209:
  192. return RFSERR_GetDirFailed;
  193. case 8210:
  194. return RFSERR_MoveFailed;
  195. }
  196. return err;
  197. }
  198. typedef byte OnceKey[16];
  199. static void genOnce(OnceKey &key)
  200. {
  201. static __int64 inc=0;
  202. *(unsigned *)&key[0] = getRandom();
  203. *(__int64 *)&key[4] = ++inc;
  204. *(unsigned *)&key[12] = getRandom();
  205. }
  206. static void mergeOnce(OnceKey &key,size32_t sz,const void *data)
  207. {
  208. assertex(sz<=sizeof(OnceKey));
  209. const byte *p = (const byte *)data;
  210. while (sz)
  211. key[--sz] ^= *(p++);
  212. }
  213. //---------------------------------------------------------------------------
  214. class DECL_EXCEPTION CDafsException: public IDAFS_Exception, public CInterface
  215. {
  216. int errcode;
  217. StringAttr msg;
  218. public:
  219. IMPLEMENT_IINTERFACE;
  220. CDafsException(int code,const char *_msg)
  221. : errcode(code), msg(_msg)
  222. {
  223. };
  224. int errorCode() const
  225. {
  226. return errcode;
  227. }
  228. StringBuffer & errorMessage(StringBuffer &str) const
  229. {
  230. return str.append(msg);
  231. }
  232. MessageAudience errorAudience() const
  233. {
  234. return MSGAUD_user;
  235. }
  236. };
  237. IDAFS_Exception *createDafsException(int code, const char *msg)
  238. {
  239. return new CDafsException(code, msg);
  240. }
  241. IDAFS_Exception *createDafsExceptionVA(int code, const char *format, va_list args) __attribute__((format(printf,2,0)));
  242. IDAFS_Exception *createDafsExceptionVA(int code, const char *format, va_list args)
  243. {
  244. StringBuffer eStr;
  245. eStr.limited_valist_appendf(1024, format, args);
  246. return new CDafsException(code, eStr);
  247. }
  248. IDAFS_Exception *createDafsExceptionV(int code, const char *format, ...) __attribute__((format(printf,2,3)));
  249. IDAFS_Exception *createDafsExceptionV(int code, const char *format, ...)
  250. {
  251. va_list args;
  252. va_start(args, format);
  253. IDAFS_Exception *ret = createDafsExceptionVA(code, format, args);
  254. va_end(args);
  255. return ret;
  256. }
  257. void setDafsEndpointPort(SocketEndpoint &ep)
  258. {
  259. // odd kludge (don't do this at home)
  260. byte ipb[4];
  261. if (ep.getNetAddress(sizeof(ipb),&ipb)==sizeof(ipb)) {
  262. if ((ipb[0]==255)&&(ipb[1]==255)) {
  263. ep.port = (((unsigned)ipb[2])<<8)+ipb[3];
  264. ep.ipset(queryLocalIP());
  265. }
  266. }
  267. if (ep.port==0)
  268. {
  269. if ( (securitySettings.queryConnectMethod() == SSLNone) || (securitySettings.queryConnectMethod() == UnsecureFirst) )
  270. ep.port = securitySettings.queryDaFileSrvPort();
  271. else
  272. ep.port = securitySettings.queryDaFileSrvSSLPort();
  273. }
  274. }
  275. void sendDaFsBuffer(ISocket * socket, MemoryBuffer & src, bool testSocketFlag)
  276. {
  277. unsigned length = src.length() - sizeof(unsigned);
  278. byte * buffer = (byte *)src.toByteArray();
  279. if (TF_TRACE_FULL)
  280. PROGLOG("sendDaFsBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]);
  281. if (testSocketFlag)
  282. length |= 0x80000000;
  283. _WINCPYREV(buffer, &length, sizeof(unsigned));
  284. SOCKWRITE(socket)(buffer, src.length());
  285. }
  286. size32_t receiveDaFsBufferSize(ISocket * socket, unsigned numtries,CTimeMon *timemon)
  287. {
  288. unsigned timeout = SERVER_TIMEOUT;
  289. if (numtries==0) {
  290. numtries = 1;
  291. timeout = 10*1000; // 10s
  292. }
  293. while (numtries--) {
  294. try {
  295. if (timemon) {
  296. unsigned remaining;
  297. if (timemon->timedout(&remaining)||(remaining<10))
  298. remaining = 10;
  299. if (remaining<timeout)
  300. timeout = remaining;
  301. }
  302. size32_t szread;
  303. size32_t gotLength;
  304. SOCKREADTMS(socket)(&gotLength, sizeof(gotLength), sizeof(gotLength), szread, timeout);
  305. _WINREV(gotLength);
  306. if (TF_TRACE_FULL)
  307. PROGLOG("receiveDaFsBufferSize %d",gotLength);
  308. return gotLength;
  309. }
  310. catch (IJSOCK_Exception *e) {
  311. if ((numtries==0)||(e->errorCode()!=JSOCKERR_timeout_expired)||(timemon&&timemon->timedout())) {
  312. throw;
  313. }
  314. StringBuffer err;
  315. char peername[256];
  316. socket->peer_name(peername,sizeof(peername)-1);
  317. WARNLOG("Remote connection %s: %s",peername,e->errorMessage(err).str()); // why no peername
  318. e->Release();
  319. Sleep(500+getRandom()%1000); // ~1s
  320. }
  321. }
  322. return 0;
  323. }
  324. void flushDaFsSocket(ISocket *socket)
  325. {
  326. MemoryBuffer sendbuf;
  327. initSendBuffer(sendbuf);
  328. sendbuf.append((RemoteFileCommandType)RFCgetver);
  329. sendbuf.append((unsigned)RFCgetver);
  330. MemoryBuffer reply;
  331. size32_t totread=0;
  332. try
  333. {
  334. sendDaFsBuffer(socket, sendbuf);
  335. char buf[16*1024];
  336. for (;;)
  337. {
  338. size32_t szread;
  339. SOCKREADTMS(socket)(buf, 1, sizeof(buf), szread, 1000*30);
  340. totread += szread;
  341. }
  342. }
  343. catch (IJSOCK_Exception *e)
  344. {
  345. if (totread)
  346. PROGLOG("%d bytes discarded",totread);
  347. if (e->errorCode()!=JSOCKERR_timeout_expired)
  348. EXCLOG(e,"flush");
  349. e->Release();
  350. }
  351. }
  352. void receiveDaFsBuffer(ISocket * socket, MemoryBuffer & tgt, unsigned numtries, size32_t maxsz)
  353. // maxsz is a guess at a resonable upper max to catch where protocol error
  354. {
  355. sRFTM tm(dafsMaxReceiveTimeMs);
  356. size32_t gotLength = receiveDaFsBufferSize(socket, numtries,tm.timemon);
  357. Owned<IException> exc;
  358. if (gotLength)
  359. {
  360. size32_t origlen = tgt.length();
  361. try
  362. {
  363. if (gotLength>maxsz)
  364. {
  365. StringBuffer msg;
  366. msg.appendf("receiveBuffer maximum block size exceeded %d/%d",gotLength,maxsz);
  367. throw createDafsException(DAFSERR_protocol_failure,msg.str());
  368. }
  369. unsigned timeout = SERVER_TIMEOUT*(numtries?numtries:1);
  370. if (tm.timemon)
  371. {
  372. unsigned remaining;
  373. if (tm.timemon->timedout(&remaining)||(remaining<10))
  374. remaining = 10;
  375. if (remaining<timeout)
  376. timeout = remaining;
  377. }
  378. size32_t szread;
  379. SOCKREADTMS(socket)((gotLength<4000)?tgt.reserve(gotLength):tgt.reserveTruncate(gotLength), gotLength, gotLength, szread, timeout);
  380. }
  381. catch (IException *e)
  382. {
  383. exc.setown(e);
  384. }
  385. if (exc.get())
  386. {
  387. tgt.setLength(origlen);
  388. EXCLOG(exc, "receiveDaFsBuffer");
  389. PrintStackReport();
  390. if (JSOCKERR_timeout_expired != exc->errorCode())
  391. {
  392. if (!tm.timemon||!tm.timemon->timedout())
  393. flushDaFsSocket(socket);
  394. }
  395. IJSOCK_Exception *JSexc = dynamic_cast<IJSOCK_Exception *>(exc.get());
  396. if (JSexc != nullptr)
  397. throw LINK(JSexc);
  398. else
  399. throw exc.getClear();
  400. }
  401. }
  402. tgt.setEndian(__BIG_ENDIAN);
  403. }
  404. //---------------------------------------------------------------------------
  405. struct CConnectionRec
  406. {
  407. SocketEndpoint ep;
  408. unsigned tick;
  409. IArrayOf<ISocket> socks; // relies on isShared
  410. };
  411. static class CConnectionTable: public SuperHashTableOf<CConnectionRec,SocketEndpoint>
  412. {
  413. void onAdd(void *) {}
  414. void onRemove(void *e)
  415. {
  416. CConnectionRec *r=(CConnectionRec *)e;
  417. delete r;
  418. }
  419. unsigned getHashFromElement(const void *e) const
  420. {
  421. const CConnectionRec &elem=*(const CConnectionRec *)e;
  422. return elem.ep.hash(0);
  423. }
  424. unsigned getHashFromFindParam(const void *fp) const
  425. {
  426. return ((const SocketEndpoint *)fp)->hash(0);
  427. }
  428. const void * getFindParam(const void *p) const
  429. {
  430. const CConnectionRec &elem=*(const CConnectionRec *)p;
  431. return (void *)&elem.ep;
  432. }
  433. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  434. {
  435. return ((CConnectionRec *)et)->ep.equals(*(SocketEndpoint *)fp);
  436. }
  437. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CConnectionRec,SocketEndpoint);
  438. unsigned numsockets;
  439. public:
  440. static CriticalSection crit;
  441. CConnectionTable()
  442. {
  443. numsockets = 0;
  444. }
  445. ~CConnectionTable() {
  446. _releaseAll();
  447. }
  448. ISocket *lookup(const SocketEndpoint &ep)
  449. {
  450. // always called from crit block
  451. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  452. if (r) {
  453. ForEachItemIn(i,r->socks) {
  454. ISocket *s = &r->socks.item(i);
  455. if (!QUERYINTERFACE(s, CInterface)->IsShared()) {
  456. r->tick = msTick();
  457. s->Link();
  458. return s;
  459. }
  460. }
  461. }
  462. return NULL;
  463. }
  464. void addLink(SocketEndpoint &ep,ISocket *sock)
  465. {
  466. // always called from crit block
  467. while (numsockets>=SOCKET_CACHE_MAX) {
  468. // find oldest
  469. CConnectionRec *c = NULL;
  470. unsigned oldest = 0;
  471. CConnectionRec *old = NULL;
  472. unsigned oldi;
  473. unsigned now = msTick();
  474. for (;;) {
  475. c = (CConnectionRec *)SuperHashTableOf<CConnectionRec,SocketEndpoint>::next(c);
  476. if (!c)
  477. break;
  478. ForEachItemIn(i,c->socks) {
  479. ISocket *s = &c->socks.item(i);
  480. if (!QUERYINTERFACE(s, CInterface)->IsShared()) { // candidate to remove
  481. unsigned t = now-c->tick;
  482. if (t>oldest) {
  483. oldest = t;
  484. old = c;
  485. oldi = i;
  486. }
  487. }
  488. }
  489. }
  490. if (!old)
  491. return;
  492. old->socks.remove(oldi);
  493. numsockets--;
  494. }
  495. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  496. if (!r) {
  497. r = new CConnectionRec;
  498. r->ep = ep;
  499. SuperHashTableOf<CConnectionRec,SocketEndpoint>::add(*r);
  500. }
  501. sock->Link();
  502. r->socks.append(*sock);
  503. numsockets++;
  504. r->tick = msTick();
  505. }
  506. void remove(const SocketEndpoint &ep, ISocket *sock)
  507. {
  508. // always called from crit block
  509. CConnectionRec *r = SuperHashTableOf<CConnectionRec,SocketEndpoint>::find(&ep);
  510. if (r)
  511. if (r->socks.zap(*sock)&&numsockets)
  512. numsockets--;
  513. }
  514. } *ConnectionTable = NULL;
  515. CriticalSection CConnectionTable::crit;
  516. void clientSetDaliServixSocketCaching(bool on)
  517. {
  518. CriticalBlock block(CConnectionTable::crit);
  519. if (on) {
  520. if (!ConnectionTable)
  521. ConnectionTable = new CConnectionTable;
  522. }
  523. else {
  524. delete ConnectionTable;
  525. ConnectionTable = NULL;
  526. }
  527. }
  528. ISocket *getConnectionTableSocket(const SocketEndpoint &ep)
  529. {
  530. CriticalBlock block(CConnectionTable::crit);
  531. if (!ConnectionTable)
  532. return nullptr;
  533. return ConnectionTable->lookup(ep);
  534. }
  535. void removeConnectionTableSocket(const SocketEndpoint &ep, ISocket *socket)
  536. {
  537. CriticalBlock block(CConnectionTable::crit);
  538. if (ConnectionTable)
  539. ConnectionTable->remove(ep, socket);
  540. }
  541. void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket)
  542. {
  543. CriticalBlock block(CConnectionTable::crit);
  544. if (ConnectionTable)
  545. ConnectionTable->addLink(ep,socket);
  546. }
  547. //---------------------------------------------------------------------------
  548. void CRemoteBase::connectSocket(SocketEndpoint &ep, unsigned connectTimeoutMs, unsigned connectRetries)
  549. {
  550. if (!connectTimeoutMs)
  551. connectTimeoutMs = dafsConnectTimeoutMs;
  552. unsigned connectAttempts = ((INFINITE == connectRetries) ? dafsConnectRetries : connectRetries) + 1;
  553. sRFTM tm(connectTimeoutMs);
  554. {
  555. CriticalBlock block(lastFailEpCrit);
  556. if (ep.equals(lastfailep))
  557. {
  558. if (msTick()-lastfailtime<dafsConnectFailRetryTimeMs)
  559. {
  560. StringBuffer msg("Failed to connect (host marked down) to dafilesrv/daliservix on ");
  561. ep.getUrlStr(msg);
  562. throw createDafsException(DAFSERR_connection_failed,msg.str());
  563. }
  564. lastfailep.set(NULL);
  565. connectAttempts = 1; // on probation
  566. }
  567. }
  568. while (connectAttempts--)
  569. {
  570. StringBuffer eps;
  571. if (TF_TRACE_CLIENT_CONN)
  572. {
  573. ep.getUrlStr(eps);
  574. if (ep.port == securitySettings.queryDaFileSrvSSLPort())
  575. PROGLOG("Connecting SECURE to %s", eps.str());
  576. else
  577. PROGLOG("Connecting to %s", eps.str());
  578. //PrintStackReport();
  579. }
  580. bool ok = true;
  581. try
  582. {
  583. if (tm.timemon)
  584. {
  585. unsigned remaining;
  586. if (tm.timemon->timedout(&remaining))
  587. throwJSocketException(JSOCKERR_connection_failed);
  588. socket.setown(ISocket::connect_timeout(ep,remaining));
  589. }
  590. else
  591. socket.setown(ISocket::connect(ep));
  592. if (ep.port == securitySettings.queryDaFileSrvSSLPort())
  593. {
  594. #ifdef _USE_OPENSSL
  595. Owned<ISecureSocket> ssock;
  596. try
  597. {
  598. ssock.setown(createSecureSocket(socket.getClear()));
  599. int status = ssock->secure_connect();
  600. if (status < 0)
  601. throw createDafsException(DAFSERR_connection_failed, "Failure to establish secure connection");
  602. socket.setown(ssock.getLink());
  603. }
  604. catch (IException *e)
  605. {
  606. cleanupDaFsSocket(ssock);
  607. ssock.clear();
  608. cleanupDaFsSocket(socket);
  609. socket.clear();
  610. StringBuffer eMsg;
  611. e->errorMessage(eMsg);
  612. e->Release();
  613. throw createDafsException(DAFSERR_connection_failed, eMsg.str());
  614. }
  615. #else
  616. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
  617. #endif
  618. }
  619. }
  620. catch (IJSOCK_Exception *e)
  621. {
  622. ok = false;
  623. if (!connectAttempts||(tm.timemon&&tm.timemon->timedout()))
  624. {
  625. if (e->errorCode()==JSOCKERR_connection_failed)
  626. {
  627. {
  628. CriticalBlock block(lastFailEpCrit);
  629. lastfailep.set(ep);
  630. lastfailtime = msTick();
  631. }
  632. e->Release();
  633. StringBuffer msg("Failed to connect (setting host down) to dafilesrv/daliservix on ");
  634. ep.getUrlStr(msg);
  635. throw createDafsException(DAFSERR_connection_failed,msg.str());
  636. }
  637. throw;
  638. }
  639. StringBuffer err;
  640. WARNLOG("Remote file connect %s",e->errorMessage(err).str());
  641. e->Release();
  642. }
  643. if (ok)
  644. {
  645. if (TF_TRACE_CLIENT_CONN)
  646. PROGLOG("Connected to %s",eps.str());
  647. break;
  648. }
  649. bool timeExpired = false;
  650. unsigned sleeptime = getRandom()%3000+1000;
  651. if (tm.timemon)
  652. {
  653. unsigned remaining;
  654. if (tm.timemon->timedout(&remaining))
  655. timeExpired = true;
  656. else
  657. {
  658. if (remaining/2<sleeptime)
  659. sleeptime = remaining/2;
  660. }
  661. }
  662. if (!timeExpired)
  663. {
  664. Sleep(sleeptime); // prevent multiple retries beating
  665. if (ep.port == securitySettings.queryDaFileSrvSSLPort())
  666. PROGLOG("Retrying SECURE connect");
  667. else
  668. PROGLOG("Retrying connect");
  669. }
  670. }
  671. clientAddSocketToCache(ep, socket);
  672. }
  673. void CRemoteBase::killSocket(SocketEndpoint &tep)
  674. {
  675. // NB: always called with CRemoteBase::crit locked
  676. try
  677. {
  678. Owned<ISocket> s = socket.getClear();
  679. if (!s)
  680. return;
  681. removeConnectionTableSocket(tep, s);
  682. }
  683. catch (IJSOCK_Exception *e)
  684. {
  685. e->Release(); // ignore errors closing
  686. }
  687. Sleep(getRandom()%1000*5+500); // prevent multiple beating
  688. }
  689. void CRemoteBase::sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry, bool lengthy, bool handleErrCode)
  690. {
  691. CriticalBlock block(crit); // serialize commands on same file
  692. SocketEndpoint tep(ep);
  693. setDafsEndpointPort(tep);
  694. unsigned nretries = retry?3:0;
  695. Owned<IJSOCK_Exception> firstexc; // when retrying return first error if fails
  696. for (;;)
  697. {
  698. try
  699. {
  700. if (socket)
  701. {
  702. sendDaFsBuffer(socket, src);
  703. receiveDaFsBuffer(socket, reply, lengthy?LENGTHY_RETRIES:NORMAL_RETRIES);
  704. break;
  705. }
  706. }
  707. catch (IJSOCK_Exception *e)
  708. {
  709. if (!nretries--)
  710. {
  711. if (firstexc)
  712. {
  713. e->Release();
  714. e = firstexc.getClear();
  715. }
  716. killSocket(tep);
  717. throw e;
  718. }
  719. StringBuffer str;
  720. e->errorMessage(str);
  721. WARNLOG("Remote File: %s, retrying (%d)",str.str(),nretries);
  722. if (firstexc)
  723. e->Release();
  724. else
  725. firstexc.setown(e);
  726. killSocket(tep);
  727. }
  728. socket.setown(getConnectionTableSocket(tep));
  729. if (socket)
  730. {
  731. // validate existing socket by sending an 'exists' command with short time out
  732. // (use exists for backward compatibility)
  733. bool ok = false;
  734. try
  735. {
  736. MemoryBuffer sendbuf;
  737. initSendBuffer(sendbuf);
  738. MemoryBuffer replybuf;
  739. sendbuf.append((RemoteFileCommandType)RFCexists).append(filename);
  740. sendDaFsBuffer(socket, sendbuf);
  741. receiveDaFsBuffer(socket, replybuf, 0, 1024);
  742. ok = true;
  743. }
  744. catch (IException *e)
  745. {
  746. e->Release();
  747. }
  748. if (!ok)
  749. killSocket(tep);
  750. }
  751. if (!socket)
  752. {
  753. bool doConnect = true;
  754. if (connectMethod == SSLFirst || connectMethod == UnsecureFirst)
  755. {
  756. // MCK - could maintain a list of 100 or so previous endpoints and if connection failed
  757. // then mark port down for a delay (like 15 min above) to avoid having to try every time ...
  758. try
  759. {
  760. connectSocket(tep, 5000, 0);
  761. doConnect = false;
  762. }
  763. catch (IDAFS_Exception *e)
  764. {
  765. if (e->errorCode() == DAFSERR_connection_failed)
  766. {
  767. unsigned prevPort = tep.port;
  768. if (prevPort == securitySettings.queryDaFileSrvSSLPort())
  769. tep.port = securitySettings.queryDaFileSrvPort();
  770. else
  771. tep.port = securitySettings.queryDaFileSrvSSLPort();
  772. WARNLOG("Connect failed on port %d, retrying on port %d", prevPort, tep.port);
  773. doConnect = true;
  774. e->Release();
  775. }
  776. else
  777. throw e;
  778. }
  779. }
  780. if (doConnect)
  781. connectSocket(tep);
  782. }
  783. }
  784. if (!handleErrCode)
  785. return;
  786. unsigned errCode;
  787. reply.read(errCode);
  788. if (errCode)
  789. {
  790. // old Solaris daliservix.cpp error code conversion
  791. if ( (errCode >= 8200) && (errCode <= 8210) )
  792. errCode = mapDafilesrvixCodes(errCode);
  793. StringBuffer msg;
  794. if (filename.get())
  795. msg.append(filename);
  796. ep.getUrlStr(msg.append('[')).append("] ");
  797. size32_t pos = reply.getPos();
  798. if (pos<reply.length())
  799. {
  800. size32_t len = reply.length()-pos;
  801. const byte *rest = reply.readDirect(len);
  802. if (errCode==RFSERR_InvalidCommand)
  803. {
  804. const char *s = (const char *)rest;
  805. const char *e = (const char *)rest+len;
  806. while (*s&&(s!=e))
  807. s++;
  808. msg.append(s-(const char *)rest,(const char *)rest);
  809. }
  810. else if (len&&(rest[len-1]==0))
  811. msg.append((const char *)rest);
  812. else
  813. {
  814. msg.appendf("extra data[%d]",len);
  815. for (unsigned i=0;(i<16)&&(i<len);i++)
  816. msg.appendf(" %2x",(int)rest[i]);
  817. }
  818. }
  819. // NB: could append getRFSERRText for all error codes
  820. else if (errCode == RFSERR_GetDirFailed)
  821. msg.append(RFSERR_GetDirFailed_Text);
  822. else
  823. msg.append("ERROR #").append(errCode);
  824. #ifdef _DEBUG
  825. ERRLOG("%s",msg.str());
  826. PrintStackReport();
  827. #endif
  828. throw createDafsException(errCode,msg.str());
  829. }
  830. }
  831. void CRemoteBase::sendRemoteCommand(MemoryBuffer & src, bool retry)
  832. {
  833. MemoryBuffer reply;
  834. sendRemoteCommand(src, reply, retry);
  835. }
  836. CRemoteBase::CRemoteBase(const SocketEndpoint &_ep, const char * _filename)
  837. : filename(_filename)
  838. {
  839. ep = _ep;
  840. connectMethod = securitySettings.queryConnectMethod();
  841. }
  842. CRemoteBase::CRemoteBase(const SocketEndpoint &_ep, DAFSConnectCfg _connectMethod, const char * _filename)
  843. : filename(_filename)
  844. {
  845. ep = _ep;
  846. connectMethod = _connectMethod;
  847. }
  848. void CRemoteBase::disconnect()
  849. {
  850. CriticalBlock block(crit);
  851. Owned<ISocket> s = socket.getClear();
  852. if (s)
  853. {
  854. SocketEndpoint tep(ep);
  855. setDafsEndpointPort(tep);
  856. removeConnectionTableSocket(tep, s);
  857. }
  858. }
  859. // IDaFsConnection impl.
  860. void CRemoteBase::close(int handle)
  861. {
  862. if (handle)
  863. {
  864. try
  865. {
  866. MemoryBuffer sendBuffer;
  867. initSendBuffer(sendBuffer);
  868. sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
  869. sendRemoteCommand(sendBuffer,false);
  870. }
  871. catch (IDAFS_Exception *e)
  872. {
  873. if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
  874. throw;
  875. e->Release();
  876. }
  877. }
  878. }
  879. void CRemoteBase::send(MemoryBuffer &sendMb, MemoryBuffer &reply)
  880. {
  881. sendRemoteCommand(sendMb, reply);
  882. }
  883. unsigned CRemoteBase::getVersion(StringBuffer &ver)
  884. {
  885. unsigned ret;
  886. MemoryBuffer sendBuffer;
  887. initSendBuffer(sendBuffer);
  888. sendBuffer.append((RemoteFileCommandType)RFCgetver);
  889. sendBuffer.append((unsigned)RFCgetver);
  890. MemoryBuffer replyBuffer;
  891. try
  892. {
  893. sendRemoteCommand(sendBuffer, replyBuffer, true, false, false);
  894. }
  895. catch (IException *e)
  896. {
  897. EXCLOG(e);
  898. ::Release(e);
  899. return 0;
  900. }
  901. unsigned errCode;
  902. replyBuffer.read(errCode);
  903. if (errCode==RFSERR_InvalidCommand)
  904. {
  905. ver.append("DS V1.0");
  906. return 10;
  907. }
  908. else if (errCode==0)
  909. ret = 11;
  910. else if (errCode<0x10000)
  911. return 0;
  912. else
  913. ret = errCode-0x10000;
  914. StringAttr vers;
  915. replyBuffer.read(vers);
  916. ver.append(vers);
  917. return ret;
  918. }
  919. const SocketEndpoint &CRemoteBase::queryEp() const
  920. {
  921. return ep;
  922. }
  923. SocketEndpoint CRemoteBase::lastfailep;
  924. unsigned CRemoteBase::lastfailtime;
  925. CriticalSection CRemoteBase::lastFailEpCrit;
  926. IDaFsConnection *createDaFsConnection(const SocketEndpoint &ep, DAFSConnectCfg connectMethod, const char *tracing)
  927. {
  928. return new CRemoteBase(ep, connectMethod, tracing);
  929. }
  930. /////////////////////////
  931. ISocket *checkSocketSecure(ISocket *socket)
  932. {
  933. if (securitySettings.queryConnectMethod() == SSLNone)
  934. return LINK(socket);
  935. char pname[256];
  936. pname[0] = 0;
  937. int pport = socket->peer_name(pname, sizeof(pname)-1);
  938. if ( (pport == securitySettings.queryDaFileSrvSSLPort()) && (!socket->isSecure()) )
  939. {
  940. #ifdef _USE_OPENSSL
  941. Owned<ISecureSocket> ssock;
  942. try
  943. {
  944. ssock.setown(createSecureSocket(LINK(socket)));
  945. int status = ssock->secure_connect();
  946. if (status < 0)
  947. throw createDafsException(DAFSERR_connection_failed, "Failure to establish secure connection");
  948. return ssock.getClear();
  949. }
  950. catch (IException *e)
  951. {
  952. cleanupDaFsSocket(ssock);
  953. ssock.clear();
  954. cleanupDaFsSocket(socket);
  955. StringBuffer eMsg;
  956. e->errorMessage(eMsg);
  957. e->Release();
  958. throw createDafsException(DAFSERR_connection_failed, eMsg.str());
  959. }
  960. #else
  961. throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
  962. #endif
  963. }
  964. return LINK(socket);
  965. }
  966. ISocket *connectDafs(SocketEndpoint &ep, unsigned timeoutms)
  967. {
  968. Owned<ISocket> socket;
  969. if ( (securitySettings.queryConnectMethod() == SSLNone) || (securitySettings.queryConnectMethod() == SSLOnly) )
  970. {
  971. socket.setown(ISocket::connect_timeout(ep, timeoutms));
  972. return checkSocketSecure(socket);
  973. }
  974. // SSLFirst or UnsecureFirst ...
  975. unsigned newtimeout = timeoutms;
  976. if (newtimeout > 5000)
  977. newtimeout = 5000;
  978. int conAttempts = 2;
  979. while (conAttempts > 0)
  980. {
  981. conAttempts--;
  982. bool connected = false;
  983. try
  984. {
  985. socket.setown(ISocket::connect_timeout(ep, newtimeout));
  986. connected = true;
  987. newtimeout = timeoutms;
  988. }
  989. catch (IJSOCK_Exception *e)
  990. {
  991. if (e->errorCode() == JSOCKERR_connection_failed)
  992. {
  993. e->Release();
  994. if (ep.port == securitySettings.queryDaFileSrvSSLPort())
  995. ep.port = securitySettings.queryDaFileSrvPort();
  996. else
  997. ep.port = securitySettings.queryDaFileSrvSSLPort();
  998. if (!conAttempts)
  999. throw;
  1000. }
  1001. else
  1002. throw;
  1003. }
  1004. if (connected)
  1005. {
  1006. if (ep.port == securitySettings.queryDaFileSrvSSLPort())
  1007. {
  1008. try
  1009. {
  1010. return checkSocketSecure(socket);
  1011. }
  1012. catch (IDAFS_Exception *e)
  1013. {
  1014. connected = false;
  1015. if (e->errorCode() == DAFSERR_connection_failed)
  1016. {
  1017. // worth logging to help identify any ssl config issues ...
  1018. StringBuffer errmsg;
  1019. e->errorMessage(errmsg);
  1020. WARNLOG("%s", errmsg.str());
  1021. e->Release();
  1022. ep.port = securitySettings.queryDaFileSrvPort();
  1023. if (!conAttempts)
  1024. throw;
  1025. }
  1026. else
  1027. throw;
  1028. }
  1029. }
  1030. else
  1031. return socket.getClear();
  1032. }
  1033. }
  1034. throw createDafsException(DAFSERR_connection_failed, "Failed to establish connection with DaFileSrv");
  1035. }
  1036. unsigned short getActiveDaliServixPort(const IpAddress &ip)
  1037. {
  1038. if (ip.isNull())
  1039. return 0;
  1040. SocketEndpoint ep(0, ip);
  1041. setDafsEndpointPort(ep);
  1042. try {
  1043. Owned<ISocket> socket = connectDafs(ep, 10000);
  1044. return ep.port;
  1045. }
  1046. catch (IException *e)
  1047. {
  1048. e->Release();
  1049. }
  1050. return 0;
  1051. }
  1052. bool testDaliServixPresent(const IpAddress &ip)
  1053. {
  1054. return getActiveDaliServixPort(ip) != 0;
  1055. }
  1056. unsigned getDaliServixVersion(const IpAddress &ip,StringBuffer &ver)
  1057. {
  1058. SocketEndpoint ep(0,ip);
  1059. return getDaliServixVersion(ep,ver);
  1060. }
  1061. unsigned getDaliServixVersion(const SocketEndpoint &_ep,StringBuffer &ver)
  1062. {
  1063. SocketEndpoint ep(_ep);
  1064. setDafsEndpointPort(ep);
  1065. if (ep.isNull())
  1066. return 0;
  1067. try
  1068. {
  1069. Owned<ISocket> socket = connectDafs(ep, 10000);
  1070. return getRemoteVersion(socket,ver);
  1071. }
  1072. catch (IException *e)
  1073. {
  1074. EXCLOG(e,"getDaliServixVersion");
  1075. e->Release();
  1076. }
  1077. return 0;
  1078. }
  1079. unsigned getRemoteVersion(IDaFsConnection &daFsConnection, StringBuffer &ver)
  1080. {
  1081. return daFsConnection.getVersion(ver);
  1082. }
  1083. unsigned getRemoteVersion(ISocket *origSock, StringBuffer &ver)
  1084. {
  1085. // used to have a global critical section here
  1086. if (!origSock)
  1087. return 0;
  1088. Owned<ISocket> socket = checkSocketSecure(origSock);
  1089. unsigned ret;
  1090. MemoryBuffer sendbuf;
  1091. initSendBuffer(sendbuf);
  1092. sendbuf.append((RemoteFileCommandType)RFCgetver);
  1093. sendbuf.append((unsigned)RFCgetver);
  1094. MemoryBuffer reply;
  1095. try
  1096. {
  1097. sendDaFsBuffer(socket, sendbuf);
  1098. receiveDaFsBuffer(socket, reply, 1 ,4096);
  1099. unsigned errCode;
  1100. reply.read(errCode);
  1101. if (errCode==RFSERR_InvalidCommand)
  1102. {
  1103. ver.append("DS V1.0");
  1104. return 10;
  1105. }
  1106. else if (errCode==0)
  1107. ret = 11;
  1108. else if (errCode<0x10000)
  1109. return 0;
  1110. else
  1111. ret = errCode-0x10000;
  1112. }
  1113. catch (IException *e)
  1114. {
  1115. EXCLOG(e);
  1116. ::Release(e);
  1117. return 0;
  1118. }
  1119. StringAttr vers;
  1120. reply.read(vers);
  1121. ver.append(vers);
  1122. return ret;
  1123. }
  1124. unsigned getCachedRemoteVersion(IDaFsConnection &daFsConnection)
  1125. {
  1126. /* JCSMORE - add a SocketEndpoint->version cache
  1127. * Idea being, that clients will want to determine version and differentiate what they send
  1128. * But do not want the cost of asking each time!
  1129. * So have a 'getRemoteVersion' call ask once and store version, so next time it returns cached answer.
  1130. *
  1131. * May want to have timeout on cache entries, but can be long. Don't expect remote side to change often within lifetime of client.
  1132. */
  1133. // JCSMORE TBD (properly!)
  1134. // 1st check ep in cache using:
  1135. // daFsConnect.queryEp()
  1136. // else
  1137. StringBuffer ver;
  1138. return daFsConnection.getVersion(ver);
  1139. }
  1140. unsigned getCachedRemoteVersion(const SocketEndpoint &ep, bool secure)
  1141. {
  1142. // 1st check ep in cache
  1143. // else
  1144. DAFSConnectCfg connMethod = secure ? SSLOnly : SSLNone;
  1145. Owned<IDaFsConnection> daFsConnection = createDaFsConnection(ep, connMethod, "getversion");
  1146. return getCachedRemoteVersion(*daFsConnection);
  1147. }
  1148. /////////////////////////
  1149. //////////////
  1150. extern unsigned stopRemoteServer(ISocket * socket)
  1151. {
  1152. // used to have a global critical section here
  1153. if (!socket)
  1154. return 0;
  1155. MemoryBuffer sendbuf;
  1156. initSendBuffer(sendbuf);
  1157. sendbuf.append((RemoteFileCommandType)RFCstop);
  1158. sendbuf.append((unsigned)RFCstop);
  1159. MemoryBuffer replybuf;
  1160. unsigned errCode = RFSERR_InvalidCommand;
  1161. try
  1162. {
  1163. sendDaFsBuffer(socket, sendbuf);
  1164. receiveDaFsBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  1165. replybuf.read(errCode);
  1166. }
  1167. catch (IJSOCK_Exception *e)
  1168. {
  1169. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  1170. EXCLOG(e);
  1171. else
  1172. errCode = 0;
  1173. }
  1174. catch (IException *e)
  1175. {
  1176. EXCLOG(e);
  1177. ::Release(e);
  1178. }
  1179. return errCode;
  1180. }
  1181. int setDafsTrace(ISocket * socket,byte flags)
  1182. {
  1183. if (!socket)
  1184. {
  1185. byte ret = traceFlags;
  1186. traceFlags = flags;
  1187. return ret;
  1188. }
  1189. MemoryBuffer sendbuf;
  1190. initSendBuffer(sendbuf);
  1191. sendbuf.append((RemoteFileCommandType)RFCsettrace).append(flags);
  1192. MemoryBuffer replybuf;
  1193. try
  1194. {
  1195. sendDaFsBuffer(socket, sendbuf);
  1196. receiveDaFsBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  1197. int retcode;
  1198. replybuf.read(retcode);
  1199. return retcode;
  1200. }
  1201. catch (IException *e)
  1202. {
  1203. EXCLOG(e);
  1204. ::Release(e);
  1205. }
  1206. return -1;
  1207. }
  1208. int setDafsThrottleLimit(ISocket * socket, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg)
  1209. {
  1210. assertex(socket);
  1211. MemoryBuffer sendbuf;
  1212. initSendBuffer(sendbuf);
  1213. sendbuf.append((RemoteFileCommandType)RFCsetthrottle2).append((unsigned)throttleClass).append(throttleLimit);
  1214. sendbuf.append(throttleDelayMs).append(throttleCPULimit).append(queueLimit);
  1215. MemoryBuffer replybuf;
  1216. try
  1217. {
  1218. sendDaFsBuffer(socket, sendbuf);
  1219. receiveDaFsBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
  1220. int retcode;
  1221. replybuf.read(retcode);
  1222. if (retcode && errMsg && replybuf.remaining())
  1223. replybuf.read(*errMsg);
  1224. return retcode;
  1225. }
  1226. catch (IException *e)
  1227. {
  1228. EXCLOG(e);
  1229. ::Release(e);
  1230. }
  1231. return -1;
  1232. }
  1233. int getDafsInfo(ISocket * socket, unsigned level, StringBuffer &retstr)
  1234. {
  1235. assertex(socket);
  1236. MemoryBuffer sendbuf;
  1237. initSendBuffer(sendbuf);
  1238. sendbuf.append((RemoteFileCommandType)RFCgetinfo).append(level);
  1239. MemoryBuffer replybuf;
  1240. try
  1241. {
  1242. sendDaFsBuffer(socket, sendbuf);
  1243. receiveDaFsBuffer(socket, replybuf, 1);
  1244. int retcode;
  1245. replybuf.read(retcode);
  1246. if (retcode==0)
  1247. {
  1248. StringAttr s;
  1249. replybuf.read(s);
  1250. retstr.append(s);
  1251. }
  1252. return retcode;
  1253. }
  1254. catch (IException *e)
  1255. {
  1256. EXCLOG(e);
  1257. ::Release(e);
  1258. }
  1259. return -1;
  1260. }
  1261. struct CDafsOsCacheEntry
  1262. {
  1263. SocketEndpoint ep;
  1264. DAFS_OS os;
  1265. time_t at;
  1266. };
  1267. class CDafsOsCache: public SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>
  1268. {
  1269. void onAdd(void *) {}
  1270. void onRemove(void *et)
  1271. {
  1272. CDafsOsCacheEntry *e = (CDafsOsCacheEntry *)et;
  1273. delete e;
  1274. }
  1275. unsigned getHashFromElement(const void *e) const
  1276. {
  1277. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)e;
  1278. return elem.ep.hash(0);
  1279. }
  1280. unsigned getHashFromFindParam(const void *fp) const
  1281. {
  1282. return ((const SocketEndpoint *)fp)->hash(0);
  1283. }
  1284. const void * getFindParam(const void *p) const
  1285. {
  1286. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)p;
  1287. return (void *)&elem.ep;
  1288. }
  1289. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  1290. {
  1291. return ((CDafsOsCacheEntry *)et)->ep.equals(*(SocketEndpoint *)fp);
  1292. }
  1293. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CDafsOsCacheEntry,SocketEndpoint);
  1294. public:
  1295. static CriticalSection crit;
  1296. CDafsOsCache()
  1297. {
  1298. }
  1299. ~CDafsOsCache()
  1300. {
  1301. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::_releaseAll();
  1302. }
  1303. DAFS_OS lookup(const SocketEndpoint &ep,ISocket *sock)
  1304. {
  1305. CriticalBlock block(crit);
  1306. CDafsOsCacheEntry *r = SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::find(&ep);
  1307. bool needupdate=false;
  1308. unsigned t = (unsigned)time(NULL);
  1309. if (!r) {
  1310. r = new CDafsOsCacheEntry;
  1311. r->ep = ep;
  1312. needupdate = true;
  1313. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::add(*r);
  1314. }
  1315. else
  1316. needupdate = (t-r->at>60*5); // update every 5 mins
  1317. if (needupdate) {
  1318. r->os = DAFS_OSunknown;
  1319. StringBuffer ver;
  1320. unsigned ret;
  1321. if (sock)
  1322. ret = getRemoteVersion(sock,ver);
  1323. else
  1324. ret = getDaliServixVersion(ep,ver);
  1325. if (ret!=0) { // if cross-os needs dafilesrv
  1326. if (strstr(ver.str(),"Linux")!=NULL)
  1327. r->os = DAFS_OSlinux;
  1328. else if (strstr(ver.str(),"Windows")!=NULL)
  1329. r->os = DAFS_OSwindows;
  1330. else if (strstr(ver.str(),"Solaris")!=NULL)
  1331. r->os = DAFS_OSsolaris;
  1332. }
  1333. r->at = t;
  1334. }
  1335. return r->os;
  1336. }
  1337. };
  1338. CriticalSection CDafsOsCache::crit;
  1339. DAFS_OS getDaliServixOs(const SocketEndpoint &ep,ISocket *socket)
  1340. {
  1341. if (ep.isLocal())
  1342. #ifdef _WIN32
  1343. return DAFS_OSwindows;
  1344. #else
  1345. return DAFS_OSlinux;
  1346. #endif
  1347. static CDafsOsCache cache;
  1348. return cache.lookup(ep,socket);
  1349. }
  1350. DAFS_OS getDaliServixOs(const SocketEndpoint &ep)
  1351. {
  1352. return getDaliServixOs(ep,NULL);
  1353. }
  1354. extern DAFSCLIENT_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flags)
  1355. {
  1356. SocketEndpoint ep(_ep);
  1357. setDafsEndpointPort(ep);
  1358. if (ep.isNull())
  1359. return -3;
  1360. try {
  1361. Owned<ISocket> socket = connectDafs(ep, 5000);
  1362. return setDafsTrace(socket, flags);
  1363. }
  1364. catch (IException *e)
  1365. {
  1366. EXCLOG(e,"setDafileSvrTraceFlags");
  1367. e->Release();
  1368. }
  1369. return -2;
  1370. }
  1371. extern DAFSCLIENT_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg)
  1372. {
  1373. SocketEndpoint ep(_ep);
  1374. setDafsEndpointPort(ep);
  1375. if (ep.isNull())
  1376. return -3;
  1377. try {
  1378. Owned<ISocket> socket = connectDafs(ep, 5000);
  1379. return setDafsThrottleLimit(socket, throttleClass, throttleLimit, throttleDelayMs, throttleCPULimit, queueLimit, errMsg);
  1380. }
  1381. catch (IException *e)
  1382. {
  1383. EXCLOG(e,"setDafileSvrThrottleLimit");
  1384. e->Release();
  1385. }
  1386. return -2;
  1387. }
  1388. extern DAFSCLIENT_API int getDafileSvrInfo(const SocketEndpoint &_ep, unsigned level, StringBuffer &retstr)
  1389. {
  1390. SocketEndpoint ep(_ep);
  1391. setDafsEndpointPort(ep);
  1392. if (ep.isNull())
  1393. return false;
  1394. try {
  1395. Owned<ISocket> socket = connectDafs(ep, 5000);
  1396. return getDafsInfo(socket, level, retstr);
  1397. }
  1398. catch (IException *e)
  1399. {
  1400. EXCLOG(e,"getDafileSvrInfo");
  1401. e->Release();
  1402. }
  1403. return -2;
  1404. }