redissync.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jthread.hpp"
  15. #include "jhash.hpp"
  16. #include "eclrtl.hpp"
  17. #include "jstring.hpp"
  18. #include "redissync.hpp"
  19. namespace RedisPlugin {
  20. static __thread SyncConnection * cachedConnection;
  21. static __thread ThreadTermFunc threadHookChain;
  22. //The following class is here to ensure destruction of the cachedConnection within the main thread
  23. //as this is not handled by the thread hook mechanism.
  24. static class mainThreadCachedConnection
  25. {
  26. public :
  27. mainThreadCachedConnection() { }
  28. ~mainThreadCachedConnection()
  29. {
  30. if (cachedConnection)
  31. cachedConnection->Release();
  32. }
  33. } mainThread;
  34. static void releaseContext()
  35. {
  36. if (cachedConnection)
  37. cachedConnection->Release();
  38. if (threadHookChain)
  39. {
  40. (*threadHookChain)();
  41. threadHookChain = NULL;
  42. }
  43. }
  44. SyncConnection::SyncConnection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
  45. : Connection(ctx, _options, pswd, _timeout)
  46. {
  47. connect(ctx, _database, pswd);
  48. }
  49. SyncConnection::SyncConnection(ICodeContext * ctx, RedisServer * _server, unsigned __int64 _database, const char * pswd)
  50. : Connection(ctx, _server)
  51. {
  52. connect(ctx, _database, pswd);
  53. }
  54. void SyncConnection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * pswd)
  55. {
  56. struct timeval to = { timeout/1000000, timeout%1000000 };
  57. context = redisConnectWithTimeout(server->getIp(), server->getPort(), to);
  58. assertConnection();
  59. authenticate(ctx, pswd);
  60. selectDB(ctx, _database);
  61. init(ctx);
  62. }
  63. void SyncConnection::authenticate(ICodeContext * ctx, const char * pswd)
  64. {
  65. if (strlen(pswd) > 0)
  66. {
  67. OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", pswd, strlen(pswd)));
  68. assertOnError(reply->query(), "server authentication failed");
  69. }
  70. }
  71. SyncConnection * SyncConnection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
  72. {
  73. if (!cachedConnection)
  74. {
  75. cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
  76. threadHookChain = addThreadTermFunc(releaseContext);
  77. return LINK(cachedConnection);
  78. }
  79. unsigned optionsPswdHash = hashc((const unsigned char*)options, strlen(options), hashc((const unsigned char*)pswd, strlen(pswd), 0));
  80. if (cachedConnection->isSameConnection(ctx, optionsPswdHash))
  81. {
  82. //MORE: need to check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
  83. //At present updateTimeout calls assertConnection.
  84. cachedConnection->updateTimeout(_timeout);
  85. cachedConnection->selectDB(ctx, _database);
  86. return LINK(cachedConnection);
  87. }
  88. cachedConnection->Release();
  89. cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
  90. return LINK(cachedConnection);
  91. }
  92. void SyncConnection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
  93. {
  94. if (database == _database)
  95. return;
  96. database = _database;
  97. VStringBuffer cmd("SELECT %" I64F "u", database);
  98. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
  99. assertOnError(reply->query(), "'SELECT' request failed");
  100. }
  101. void SyncConnection::updateTimeout(unsigned __int64 _timeout)
  102. {
  103. if (timeout == _timeout)
  104. return;
  105. assertConnection();
  106. timeout = _timeout;
  107. struct timeval to = { timeout/1000000, timeout%1000000 };
  108. if (redisSetTimeout(context, to) != REDIS_OK)
  109. {
  110. if (context->err)
  111. {
  112. VStringBuffer msg("RedisPlugin: failed to set timeout - %s", context->errstr);
  113. rtlFail(0, msg.str());
  114. }
  115. else
  116. rtlFail(0, "RedisPlugin: failed to set timeout - no message available");
  117. }
  118. }
  119. void SyncConnection::logServerStats(ICodeContext * ctx)
  120. {
  121. OwnedReply reply = Reply::createReply(redisCommand(context, "INFO"));
  122. assertOnError(reply->query(), "'INFO' request failed");
  123. StringBuffer stats("Redis Plugin : Server stats - ");
  124. stats.newline().append(reply->query()->str).newline();
  125. ctx->logString(stats.str());
  126. }
  127. void SyncConnection::assertOnError(const redisReply * reply, const char * _msg)
  128. {
  129. if (!reply)
  130. {
  131. //There should always be a context error if no reply error
  132. assertConnection();
  133. VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
  134. rtlFail(0, msg.str());
  135. }
  136. else if (reply->type == REDIS_REPLY_ERROR)
  137. {
  138. if (strncmp(reply->str, "NOAUTH", 6) == 0)
  139. {
  140. VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
  141. rtlFail(0, msg.str());
  142. }
  143. else
  144. {
  145. VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
  146. rtlFail(0, msg.str());
  147. }
  148. }
  149. }
  150. void SyncConnection::assertConnection()
  151. {
  152. if (!context)
  153. rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
  154. else if (context->err)
  155. {
  156. VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip(), port());
  157. rtlFail(0, msg.str());
  158. }
  159. }
  160. void SyncConnection::clear(ICodeContext * ctx)
  161. {
  162. //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
  163. OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
  164. //NOTE: documented as never failing, but in case
  165. assertOnError(reply->query(), "'FlushDB' request failed");
  166. }
  167. void SyncConnection::del(ICodeContext * ctx, const char * key)
  168. {
  169. OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
  170. assertOnError(reply->query(), "'Del' request failed");
  171. }
  172. void SyncConnection::persist(ICodeContext * ctx, const char * key)
  173. {
  174. OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
  175. assertOnError(reply->query(), "'Persist' request failed");
  176. }
  177. void SyncConnection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
  178. {
  179. OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b %u", key, strlen(key), _expire));
  180. assertOnError(reply->query(), "'Expire' request failed");
  181. }
  182. bool SyncConnection::exists(ICodeContext * ctx, const char * key)
  183. {
  184. OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
  185. assertOnError(reply->query(), "'Exists' request failed");
  186. return (reply->query()->integer != 0);
  187. }
  188. unsigned __int64 SyncConnection::dbSize(ICodeContext * ctx)
  189. {
  190. OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
  191. assertOnError(reply->query(), "'DBSIZE' request failed");
  192. return reply->query()->integer;
  193. }
  194. //-------------------------------------------SET-----------------------------------------
  195. //--OUTER--
  196. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
  197. {
  198. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
  199. master->set(ctx, key, value, expire);
  200. }
  201. //Set pointer types
  202. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueLength, const type * value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
  203. {
  204. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
  205. master->set(ctx, key, valueLength, value, expire);
  206. }
  207. //--INNER--
  208. template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
  209. {
  210. const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
  211. const char * msg = setFailMsg;
  212. StringBuffer cmd("SET %b %b");
  213. appendExpire(cmd, expire);
  214. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
  215. assertOnError(reply->query(), msg);
  216. }
  217. template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, size32_t valueLength, const type * value, unsigned expire)
  218. {
  219. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  220. const char * msg = setFailMsg;
  221. StringBuffer cmd("SET %b %b");
  222. appendExpire(cmd, expire);
  223. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueLength));
  224. assertOnError(reply->query(), msg);
  225. }
  226. //-------------------------------------------GET-----------------------------------------
  227. //--OUTER--
  228. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
  229. {
  230. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
  231. master->get(ctx, key, returnValue);
  232. }
  233. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, type * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
  234. {
  235. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
  236. master->get(ctx, key, returnLength, returnValue);
  237. }
  238. void SyncRGetVoidPtrLenPair(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, void * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
  239. {
  240. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
  241. master->getVoidPtrLenPair(ctx, key, returnLength, returnValue);
  242. }
  243. //--INNER--
  244. template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, type & returnValue)
  245. {
  246. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  247. StringBuffer keyMsg = getFailMsg;
  248. assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
  249. size_t returnSize = reply->query()->len;
  250. if (sizeof(type)!=returnSize)
  251. {
  252. VStringBuffer msg("RedisPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
  253. rtlFail(0, msg.str());
  254. }
  255. memcpy(&returnValue, reply->query()->str, returnSize);
  256. }
  257. template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, size_t & returnLength, type * & returnValue)
  258. {
  259. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  260. StringBuffer keyMsg = getFailMsg;
  261. assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
  262. returnLength = reply->query()->len;
  263. size_t returnSize = returnLength;
  264. returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
  265. }
  266. void SyncConnection::getVoidPtrLenPair(ICodeContext * ctx, const char * key, size_t & returnLength, void * & returnValue)
  267. {
  268. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  269. StringBuffer keyMsg = getFailMsg;
  270. assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
  271. returnLength = reply->query()->len;
  272. returnValue = reinterpret_cast<void*>(allocateAndCopy(reply->query()->str, reply->query()->len));
  273. }
  274. //--------------------------------------------------------------------------------
  275. // ECL SERVICE ENTRYPOINTS
  276. //--------------------------------------------------------------------------------
  277. ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  278. {
  279. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  280. master->clear(ctx);
  281. }
  282. ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  283. {
  284. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  285. return master->exists(ctx, key);
  286. }
  287. ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  288. {
  289. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  290. master->del(ctx, key);
  291. }
  292. ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  293. {
  294. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  295. master->persist(ctx, key);
  296. }
  297. ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned _expire, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  298. {
  299. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  300. master->expire(ctx, key, _expire*unitExpire);
  301. }
  302. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  303. {
  304. Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
  305. return master->dbSize(ctx);
  306. }
  307. //-----------------------------------SET------------------------------------------
  308. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  309. {
  310. SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
  311. }
  312. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  313. {
  314. SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, pswd, timeout);
  315. }
  316. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  317. {
  318. SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
  319. }
  320. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  321. {
  322. SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
  323. }
  324. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  325. {
  326. SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
  327. }
  328. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  329. {
  330. SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
  331. }
  332. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueLength, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  333. {
  334. SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
  335. }
  336. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
  337. {
  338. SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, pswd, timeout);
  339. }
  340. //-------------------------------------GET----------------------------------------
  341. ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  342. {
  343. bool value;
  344. SyncRGet(ctx, options, key, value, database, pswd, timeout);
  345. return value;
  346. }
  347. ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  348. {
  349. double value;
  350. SyncRGet(ctx, options, key, value, database, pswd, timeout);
  351. return value;
  352. }
  353. ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  354. {
  355. signed __int64 value;
  356. SyncRGet(ctx, options, key, value, database, pswd, timeout);
  357. return value;
  358. }
  359. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  360. {
  361. unsigned __int64 value;
  362. SyncRGet(ctx, options, key, value, database, pswd, timeout);
  363. return value;
  364. }
  365. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  366. {
  367. size_t _returnLength;
  368. SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
  369. returnLength = static_cast<size32_t>(_returnLength);
  370. }
  371. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  372. {
  373. size_t _returnLength;
  374. SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
  375. returnLength = static_cast<size32_t>(_returnLength/sizeof(UChar));
  376. }
  377. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  378. {
  379. size_t _returnLength;
  380. SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
  381. returnLength = static_cast<size32_t>(rtlUtf8Length(_returnLength, returnValue));
  382. }
  383. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnLength, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
  384. {
  385. size_t _returnLength;
  386. SyncRGetVoidPtrLenPair(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
  387. returnLength = static_cast<size32_t>(_returnLength);
  388. }
  389. }//close namespace