redis.cpp 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jthread.hpp"
  15. #include "eclrtl.hpp"
  16. #include "jstring.hpp"
  17. #include "redis.hpp"
  18. #include "hiredis/hiredis.h"
  19. #define REDIS_VERSION "redis plugin 1.0.0"
  20. ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  21. {
  22. if (pb->size != sizeof(ECLPluginDefinitionBlock))
  23. return false;
  24. pb->magicVersion = PLUGIN_VERSION;
  25. pb->version = REDIS_VERSION;
  26. pb->moduleName = "lib_redis";
  27. pb->ECL = NULL;
  28. pb->flags = PLUGIN_IMPLICIT_MODULE;
  29. pb->description = "ECL plugin library for the C API hiredis\n";
  30. return true;
  31. }
  32. namespace RedisPlugin {
  33. class Connection;
  34. static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
  35. static __thread Connection * cachedConnection = NULL;
  36. static __thread Connection * cachedPubConnection = NULL;//database should always = 0
  37. #if HIREDIS_VERSION_OK
  38. static __thread ThreadTermFunc threadHookChain = NULL;
  39. static __thread bool threadHooked = false;
  40. #endif
  41. static void * allocateAndCopy(const void * src, size_t size)
  42. {
  43. return memcpy(rtlMalloc(size), src, size);
  44. }
  45. static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
  46. {
  47. if (expire > 0)
  48. buffer.append(" PX ").append(expire);
  49. return buffer;
  50. }
  51. class Reply : public CInterface
  52. {
  53. public :
  54. inline Reply() : reply(NULL) { };
  55. inline Reply(void * _reply) : reply((redisReply*)_reply) { }
  56. inline Reply(redisReply * _reply) : reply(_reply) { }
  57. inline ~Reply()
  58. {
  59. if (reply)
  60. freeReplyObject(reply);
  61. }
  62. static Reply * createReply(void * _reply) { return new Reply(_reply); }
  63. inline const redisReply * query() const { return reply; }
  64. void setClear(void * _reply) { setClear((redisReply*)_reply); }
  65. void setClear(redisReply * _reply)
  66. {
  67. if (reply)
  68. freeReplyObject(reply);
  69. reply = _reply;
  70. }
  71. private :
  72. redisReply * reply;
  73. };
  74. typedef Owned<RedisPlugin::Reply> OwnedReply;
  75. class TimeoutHandler
  76. {
  77. public :
  78. TimeoutHandler(unsigned _timeout) : timeout(_timeout), t0(msTick()) { }
  79. inline void reset(unsigned _timeout) { timeout = _timeout; t0 = msTick(); }
  80. unsigned timeLeft() const
  81. {
  82. unsigned dt = msTick() - t0;
  83. if (dt < timeout)
  84. return timeout - dt;
  85. return 0;
  86. }
  87. inline unsigned getTimeout() { return timeout; }
  88. private :
  89. unsigned timeout;
  90. unsigned t0;
  91. };
  92. class Connection : public CInterface
  93. {
  94. public :
  95. Connection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout);
  96. Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout);
  97. ~Connection()
  98. {
  99. if (context)
  100. redisFree(context);
  101. }
  102. static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, int database, const char * password, unsigned _timeout);
  103. //set
  104. template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
  105. template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
  106. //get
  107. template <class type> void get(ICodeContext * ctx, const char * key, type & value);
  108. template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
  109. //-------------------------------LOCKING------------------------------------------------
  110. void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
  111. void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password, unsigned expire);
  112. void unlock(ICodeContext * ctx, const char * key);
  113. //--------------------------------------------------------------------------------------
  114. //-------------------------------PUB/SUB------------------------------------------------
  115. unsigned __int64 publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey);
  116. void subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey);
  117. //--------------------------------------------------------------------------------------
  118. void persist(ICodeContext * ctx, const char * key);
  119. void expire(ICodeContext * ctx, const char * key, unsigned _expire);
  120. void del(ICodeContext * ctx, const char * key);
  121. void clear(ICodeContext * ctx);
  122. unsigned __int64 dbSize(ICodeContext * ctx);
  123. bool exists(ICodeContext * ctx, const char * key);
  124. protected :
  125. void redisSetTimeout();
  126. void redisConnect();
  127. unsigned timeLeft();
  128. void parseOptions(ICodeContext * ctx, const char * _options);
  129. void connect(ICodeContext * ctx, int _database, const char * password);
  130. void selectDB(ICodeContext * ctx, int _database);
  131. void readReply(Reply * reply);
  132. void readReplyAndAssert(Reply * reply, const char * msg);
  133. void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = NULL);
  134. void assertKey(const redisReply * reply, const char * key);
  135. void assertAuthorization(const redisReply * reply);
  136. void assertOnError(const redisReply * reply, const char * _msg);
  137. void assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key = NULL);
  138. void assertConnection(const char * _msg);
  139. void assertConnectionWithCmdMsg(const char * cmd, const char * key = NULL);
  140. void fail(const char * cmd, const char * errmsg, const char * key = NULL);
  141. void * redisCommand(redisContext * context, const char * format, ...);
  142. #if HIREDIS_VERSION_OK
  143. static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
  144. bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
  145. void reset(ICodeContext * ctx, const char * password, unsigned _timeout);
  146. #endif
  147. //-------------------------------LOCKING------------------------------------------------
  148. void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
  149. void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
  150. void encodeChannel(StringBuffer & channel, const char * key, int _database) const;
  151. bool noScript(const redisReply * reply) const;
  152. bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
  153. //--------------------------------------------------------------------------------------
  154. protected :
  155. redisContext * context;
  156. StringAttr options;
  157. StringAttr ip;
  158. unsigned serverIpPortPasswordHash;
  159. int port;
  160. TimeoutHandler timeout;
  161. int database; //NOTE: redis stores the maximum number of dbs as an 'int'.
  162. };
  163. #if HIREDIS_VERSION_OK
  164. static void releaseContext()
  165. {
  166. if (cachedConnection)
  167. {
  168. cachedConnection->Release();
  169. cachedConnection = NULL;
  170. }
  171. if (cachedPubConnection)
  172. {
  173. cachedPubConnection->Release();
  174. cachedPubConnection = NULL;
  175. }
  176. if (threadHookChain)
  177. {
  178. (*threadHookChain)();
  179. threadHookChain = NULL;
  180. }
  181. threadHooked = false;
  182. }
  183. //The following class is here to ensure destruction of the cachedConnection within the main thread
  184. //as this is not handled by the thread hook mechanism.
  185. static class MainThreadCachedConnection
  186. {
  187. public :
  188. MainThreadCachedConnection() { }
  189. ~MainThreadCachedConnection() { releaseContext(); }
  190. } mainThread;
  191. #endif
  192. Connection::Connection(ICodeContext * ctx, const char * _options, int _database, const char * password, unsigned _timeout)
  193. : context(nullptr), database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(0)
  194. {
  195. #if HIREDIS_VERSION_OK
  196. serverIpPortPasswordHash = hashServerIpPortPassword(ctx, _options, pass);
  197. #endif
  198. options.set(_options, strlen(_options));
  199. parseOptions(ctx, _options);
  200. connect(ctx, _database, password);
  201. }
  202. Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout)
  203. : context(nullptr), database(0), timeout(_timeout), serverIpPortPasswordHash(_serverIpPortPasswordHash), port(_port)
  204. {
  205. options.set(_options, strlen(_options));
  206. ip.set(_ip, strlen(_ip));
  207. connect(ctx, _database, password);
  208. }
  209. void Connection::redisConnect()
  210. {
  211. if (timeout.getTimeout() == 0)
  212. context = ::redisConnect(ip.str(), port);
  213. else
  214. {
  215. int _timeLeft = (int) timeLeft();
  216. struct timeval to = { _timeLeft/1000, (_timeLeft%1000)*1000 };
  217. context = ::redisConnectWithTimeout(ip.str(), port, to);
  218. }
  219. assertConnection("connection");
  220. }
  221. void Connection::connect(ICodeContext * ctx, int _database, const char * password)
  222. {
  223. redisConnect();
  224. //The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
  225. //such that they may be pipelined to save an extra round trip to the server and back.
  226. if (password && *password)
  227. redisAppendCommand(context, "AUTH %b", password, strlen(password));
  228. if (database != _database)
  229. {
  230. VStringBuffer cmd("SELECT %d", _database);
  231. redisAppendCommand(context, cmd.str());
  232. }
  233. //Now read replies.
  234. OwnedReply reply = new Reply();
  235. if (password && *password)
  236. readReplyAndAssert(reply, "server authentication");
  237. if (database != _database)
  238. {
  239. VStringBuffer cmd("SELECT %d", _database);
  240. readReplyAndAssertWithCmdMsg(reply, cmd.str());
  241. database = _database;
  242. }
  243. }
  244. void * Connection::redisCommand(redisContext * context, const char * format, ...)
  245. {
  246. //Copied from https://github.com/redis/hiredis/blob/master/hiredis.c ~line:1008 void * redisCommand(redisContext * context, const char * format, ...)
  247. //with redisSetTimeout(); added.
  248. va_list parameters;
  249. void * reply = NULL;
  250. va_start(parameters, format);
  251. redisSetTimeout();
  252. reply = ::redisvCommand(context, format, parameters);
  253. va_end(parameters);
  254. return reply;
  255. }
  256. unsigned Connection::timeLeft()
  257. {
  258. unsigned _timeLeft = timeout.timeLeft();
  259. if (_timeLeft == 0 && timeout.getTimeout() != 0)
  260. ::rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
  261. return _timeLeft;
  262. }
  263. void Connection::redisSetTimeout()
  264. {
  265. int _timeLeft = (int) timeLeft();
  266. if (_timeLeft == 0)
  267. return;
  268. struct timeval to = { _timeLeft/1000, (_timeLeft%1000)*1000 };
  269. assertex(context);
  270. if (::redisSetTimeout(context, to) != REDIS_OK)
  271. {
  272. assertConnection("request to set timeout");
  273. throwUnexpected();//In case there is a bug in hiredis such that the above err is not reflected in the 'context' (checked in assertConnection) as expected.
  274. }
  275. }
  276. #if HIREDIS_VERSION_OK
  277. bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
  278. {
  279. return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
  280. }
  281. unsigned Connection::hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password)
  282. {
  283. return hashc((const unsigned char*)_options, strlen(_options), hashc((const unsigned char*)password, strlen(password), 0));
  284. }
  285. void Connection::reset(ICodeContext * ctx, const char * password, unsigned _timeout)
  286. {
  287. timeout.reset(_timeout);
  288. if (context && context->err != REDIS_OK)
  289. {
  290. redisFree(context);
  291. context = NULL;
  292. database = 0;
  293. connect(ctx, 0, password);
  294. }
  295. }
  296. #endif
  297. void Connection::parseOptions(ICodeContext * ctx, const char * _options)
  298. {
  299. StringArray optionStrings;
  300. optionStrings.appendList(_options, " ");
  301. ForEachItemIn(idx, optionStrings)
  302. {
  303. const char *opt = optionStrings.item(idx);
  304. if (strncmp(opt, "--SERVER=", 9) == 0)
  305. {
  306. opt += 9;
  307. StringArray splitPort;
  308. splitPort.appendList(opt, ":");
  309. if (splitPort.ordinality()==2)
  310. {
  311. ip.set(splitPort.item(0));
  312. port = atoi(splitPort.item(1));
  313. }
  314. }
  315. else
  316. {
  317. VStringBuffer err("Redis Plugin: ERROR - unsupported option string '%s'", opt);
  318. ::rtlFail(0, err.str());
  319. }
  320. }
  321. if (ip.isEmpty())
  322. {
  323. ip.set("localhost");
  324. port = 6379;
  325. if (ctx)
  326. {
  327. VStringBuffer msg("Redis Plugin: WARNING - using default cache (%s:%d)", ip.str(), port);
  328. ctx->logString(msg.str());
  329. }
  330. }
  331. }
  332. void Connection::readReply(Reply * reply)
  333. {
  334. redisReply * nakedReply = NULL;
  335. redisSetTimeout();
  336. redisGetReply(context, (void**)&nakedReply);
  337. reply->setClear(nakedReply);
  338. }
  339. void Connection::readReplyAndAssert(Reply * reply, const char * msg)
  340. {
  341. readReply(reply);
  342. assertOnError(reply->query(), msg);
  343. }
  344. void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key)
  345. {
  346. readReply(reply);
  347. assertOnErrorWithCmdMsg(reply->query(), msg, key);
  348. }
  349. Connection * Connection::createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, int _database, const char * password, unsigned _timeout)
  350. {
  351. #if HIREDIS_VERSION_OK
  352. if (!_cachedConnection)
  353. {
  354. _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
  355. if (!threadHooked)
  356. {
  357. threadHookChain = addThreadTermFunc(releaseContext);
  358. threadHooked = true;
  359. }
  360. return LINK(_cachedConnection);
  361. }
  362. if (_cachedConnection->isSameConnection(ctx, options, password))
  363. {
  364. //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
  365. _cachedConnection->reset(ctx, password, _timeout);
  366. _cachedConnection->selectDB(ctx, _database);
  367. return LINK(_cachedConnection);
  368. }
  369. _cachedConnection->Release();
  370. _cachedConnection = NULL;
  371. _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
  372. return LINK(_cachedConnection);
  373. #endif
  374. return new Connection(ctx, options, _database, password, _timeout);
  375. }
  376. void Connection::selectDB(ICodeContext * ctx, int _database)
  377. {
  378. if (database == _database)
  379. return;
  380. database = _database;
  381. VStringBuffer cmd("SELECT %d", database);
  382. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
  383. assertOnErrorWithCmdMsg(reply->query(), cmd.str());
  384. }
  385. void Connection::fail(const char * cmd, const char * errmsg, const char * key)
  386. {
  387. if (key)
  388. {
  389. VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %d for %s:%d failed : %s", cmd, key, database, ip.str(), port, errmsg);
  390. ::rtlFail(0, msg.str());
  391. }
  392. VStringBuffer msg("Redis Plugin: ERROR - %s on database %d for %s:%d failed : %s", cmd, database, ip.str(), port, errmsg);
  393. ::rtlFail(0, msg.str());
  394. }
  395. void Connection::assertOnError(const redisReply * reply, const char * _msg)
  396. {
  397. if (!reply)
  398. {
  399. assertConnection(_msg);
  400. throwUnexpected();
  401. }
  402. else if (reply->type == REDIS_REPLY_ERROR)
  403. {
  404. assertAuthorization(reply);
  405. VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
  406. ::rtlFail(0, msg.str());
  407. }
  408. }
  409. void Connection::assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key)
  410. {
  411. if (!reply)
  412. {
  413. assertConnectionWithCmdMsg(cmd, key);
  414. throwUnexpected();
  415. }
  416. else if (reply->type == REDIS_REPLY_ERROR)
  417. {
  418. assertAuthorization(reply);
  419. fail(cmd, reply->str, key);
  420. }
  421. }
  422. void Connection::assertAuthorization(const redisReply * reply)
  423. {
  424. if (reply && reply->str && ( strncmp(reply->str, "NOAUTH", 6) == 0 || strncmp(reply->str, "ERR operation not permitted", 27) == 0 ))
  425. {
  426. VStringBuffer msg("Redis Plugin: ERROR - authentication for %s:%d failed : %s", ip.str(), port, reply->str);
  427. ::rtlFail(0, msg.str());
  428. }
  429. }
  430. void Connection::assertKey(const redisReply * reply, const char * key)
  431. {
  432. if (reply && reply->type == REDIS_REPLY_NIL)
  433. {
  434. VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %d on %s:%d", key, database, ip.str(), port);
  435. ::rtlFail(0, msg.str());
  436. }
  437. }
  438. void Connection::assertConnectionWithCmdMsg(const char * cmd, const char * key)
  439. {
  440. if (!context)
  441. fail(cmd, "neither 'reply' nor connection error available", key);
  442. else if (context->err)
  443. fail(cmd, context->errstr, key);
  444. }
  445. void Connection::assertConnection(const char * _msg)
  446. {
  447. if (!context)
  448. {
  449. VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : neither 'reply' nor connection error available", _msg, ip.str(), port);
  450. ::rtlFail(0, msg.str());
  451. }
  452. else if (context->err)
  453. {
  454. VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : %s", _msg, ip.str(), port, context->errstr);
  455. ::rtlFail(0, msg.str());
  456. }
  457. }
  458. void Connection::clear(ICodeContext * ctx)
  459. {
  460. //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
  461. OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
  462. //NOTE: documented as never failing, but in case
  463. assertOnErrorWithCmdMsg(reply->query(), "FlushDB");
  464. }
  465. void Connection::del(ICodeContext * ctx, const char * key)
  466. {
  467. OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
  468. assertOnErrorWithCmdMsg(reply->query(), "Del", key);
  469. }
  470. void Connection::persist(ICodeContext * ctx, const char * key)
  471. {
  472. OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
  473. assertOnErrorWithCmdMsg(reply->query(), "Persist", key);
  474. }
  475. void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
  476. {
  477. OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
  478. assertOnErrorWithCmdMsg(reply->query(), "Expire", key);
  479. }
  480. bool Connection::exists(ICodeContext * ctx, const char * key)
  481. {
  482. OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
  483. assertOnErrorWithCmdMsg(reply->query(), "Exists", key);
  484. return (reply->query()->integer != 0);
  485. }
  486. unsigned __int64 Connection::dbSize(ICodeContext * ctx)
  487. {
  488. OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
  489. assertOnErrorWithCmdMsg(reply->query(), "DBSIZE");
  490. return reply->query()->integer;
  491. }
  492. //-------------------------------------------SET-----------------------------------------
  493. //--OUTER--
  494. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout)
  495. {
  496. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
  497. master->set(ctx, key, value, expire);
  498. }
  499. //Set pointer types
  500. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout)
  501. {
  502. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
  503. master->set(ctx, key, valueSize, value, expire);
  504. }
  505. //--INNER--
  506. template<class type> void Connection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
  507. {
  508. const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
  509. StringBuffer cmd("SET %b %b");
  510. appendExpire(cmd, expire);
  511. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
  512. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  513. }
  514. template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
  515. {
  516. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  517. StringBuffer cmd("SET %b %b");
  518. appendExpire(cmd, expire);
  519. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
  520. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  521. }
  522. //-------------------------------------------GET-----------------------------------------
  523. //--OUTER--
  524. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout)
  525. {
  526. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
  527. master->get(ctx, key, returnValue);
  528. }
  529. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout)
  530. {
  531. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
  532. master->get(ctx, key, returnSize, returnValue);
  533. }
  534. //--INNER--
  535. template<class type> void Connection::get(ICodeContext * ctx, const char * key, type & returnValue)
  536. {
  537. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  538. assertOnErrorWithCmdMsg(reply->query(), "GET", key);
  539. assertKey(reply->query(), key);
  540. size_t returnSize = reply->query()->len;
  541. if (sizeof(type)!=returnSize)
  542. {
  543. VStringBuffer msg("requested type of different size (%uB) from that stored (%uB)", (unsigned)sizeof(type), (unsigned)returnSize);
  544. fail("GET", msg.str(), key);
  545. }
  546. memcpy(&returnValue, reply->query()->str, returnSize);
  547. }
  548. template<class type> void Connection::get(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
  549. {
  550. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  551. assertOnErrorWithCmdMsg(reply->query(), "GET", key);
  552. assertKey(reply->query(), key);
  553. returnSize = reply->query()->len;
  554. returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
  555. }
  556. unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey)
  557. {
  558. StringBuffer channel;
  559. if (lockedKey)
  560. encodeChannel(channel, keyOrChannel, _database);
  561. else
  562. channel.set(keyOrChannel);
  563. OwnedReply reply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
  564. assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
  565. if (reply->query()->type == REDIS_REPLY_INTEGER)
  566. {
  567. if (reply->query()->integer >= 0)
  568. return (unsigned __int64)reply->query()->integer;
  569. else
  570. throwUnexpected();
  571. }
  572. throwUnexpected();
  573. }
  574. void Connection::subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey)
  575. {
  576. StringBuffer channel;
  577. if (lockedKey)
  578. encodeChannel(channel, keyOrChannel, _database);
  579. else
  580. channel.set(keyOrChannel);
  581. #if(0)//Replicate a lingering SUBSCRIBE to test channel comparison test when reading message.
  582. {
  583. OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE oldChannel"));
  584. assertOnErrorWithCmdMsg(reply->query(), "Test lingering SUBSCRIBE", "oldChannel");
  585. }
  586. #endif
  587. OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
  588. assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
  589. if (reply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", reply->query()->element[0]->str) != 0 )
  590. fail("SUBSCRIBE", "failed to register SUB", channel.str());
  591. readReply(reply);
  592. assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
  593. if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0 && reply->query()->elements == 3)
  594. {
  595. if (reply->query()->element[2]->len > 0)
  596. {
  597. messageSize = (size_t)reply->query()->element[2]->len;
  598. message = reinterpret_cast<char*>(allocateAndCopy(reply->query()->element[2]->str, messageSize));
  599. }
  600. else
  601. {
  602. messageSize = 0;
  603. message = NULL;
  604. }
  605. return;
  606. }
  607. throwUnexpected();
  608. }
  609. //--------------------------------------------------------------------------------
  610. // ECL SERVICE ENTRYPOINTS
  611. //--------------------------------------------------------------------------------
  612. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
  613. {
  614. Owned<Connection> master = Connection::createConnection(ctx, cachedPubConnection, options, 0, password, timeout);
  615. return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
  616. }
  617. ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
  618. {
  619. size_t _messageSize = 0;
  620. Owned<Connection> master = new Connection(ctx, options, 0, password, timeout);
  621. master->subscribe(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
  622. messageSize = static_cast<size32_t>(_messageSize);
  623. }
  624. ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
  625. {
  626. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  627. master->clear(ctx);
  628. }
  629. ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  630. {
  631. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  632. return master->exists(ctx, key);
  633. }
  634. ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  635. {
  636. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  637. master->del(ctx, key);
  638. }
  639. ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  640. {
  641. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  642. master->persist(ctx, key);
  643. }
  644. ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout)
  645. {
  646. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  647. master->expire(ctx, key, _expire);
  648. }
  649. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
  650. {
  651. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  652. return master->dbSize(ctx);
  653. }
  654. //-----------------------------------SET------------------------------------------
  655. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  656. {
  657. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
  658. }
  659. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  660. {
  661. SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout);
  662. }
  663. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  664. {
  665. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  666. }
  667. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  668. {
  669. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  670. }
  671. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  672. {
  673. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  674. }
  675. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  676. {
  677. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  678. }
  679. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  680. {
  681. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
  682. }
  683. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  684. {
  685. SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout);
  686. }
  687. //-------------------------------------GET----------------------------------------
  688. ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  689. {
  690. bool value;
  691. SyncRGet(ctx, options, key, value, database, password, timeout);
  692. return value;
  693. }
  694. ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  695. {
  696. double value;
  697. SyncRGet(ctx, options, key, value, database, password, timeout);
  698. return value;
  699. }
  700. ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  701. {
  702. signed __int64 value;
  703. SyncRGet(ctx, options, key, value, database, password, timeout);
  704. return value;
  705. }
  706. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  707. {
  708. unsigned __int64 value;
  709. SyncRGet(ctx, options, key, value, database, password, timeout);
  710. return value;
  711. }
  712. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
  713. {
  714. size_t _returnSize;
  715. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
  716. returnSize = static_cast<size32_t>(_returnSize);
  717. }
  718. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
  719. {
  720. size_t returnSize;
  721. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
  722. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  723. }
  724. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
  725. {
  726. size_t returnSize;
  727. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
  728. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  729. }
  730. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
  731. {
  732. size_t _returnSize;
  733. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
  734. returnSize = static_cast<size32_t>(_returnSize);
  735. }
  736. //----------------------------------LOCK------------------------------------------
  737. //-----------------------------------SET-----------------------------------------
  738. //Set pointer types
  739. void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout)
  740. {
  741. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
  742. master->lockSet(ctx, key, valueSize, value, expire);
  743. }
  744. //--INNER--
  745. void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire)
  746. {
  747. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  748. handleLockOnSet(ctx, key, _value, (size_t)valueSize, expire);
  749. }
  750. //-------------------------------------------GET-----------------------------------------
  751. //--OUTER--
  752. void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout)
  753. {
  754. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
  755. master->lockGet(ctx, key, returnSize, returnValue, password, expire);
  756. }
  757. //--INNER--
  758. void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password, unsigned expire)
  759. {
  760. MemoryAttr retVal;
  761. handleLockOnGet(ctx, key, &retVal, password, expire);
  762. returnSize = retVal.length();
  763. returnValue = reinterpret_cast<char*>(retVal.detach());
  764. }
  765. //---------------------------------------------------------------------------------------
  766. void Connection::encodeChannel(StringBuffer & channel, const char * key, int _database) const
  767. {
  768. channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(_database);
  769. }
  770. bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
  771. {
  772. if (expire == 0)
  773. fail("GetOrLock<type>", "invalid value for 'expire', persistent locks not allowed.", key);
  774. StringBuffer cmd("SET %b %b NX PX ");
  775. cmd.append(expire);
  776. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
  777. assertOnErrorWithCmdMsg(reply->query(), cmd.str(), key);
  778. return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
  779. }
  780. void Connection::unlock(ICodeContext * ctx, const char * key)
  781. {
  782. //WATCH key, if altered between WATCH and EXEC abort all commands inbetween
  783. redisAppendCommand(context, "WATCH %b", key, strlen(key));
  784. redisAppendCommand(context, "GET %b", key, strlen(key));
  785. //Read replies
  786. OwnedReply reply = new Reply();
  787. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//WATCH reply
  788. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//GET reply
  789. //check if locked
  790. if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
  791. {
  792. //MULTI - all commands between MULTI and EXEC are considered an atomic transaction on the server
  793. redisAppendCommand(context, "MULTI");//MULTI
  794. redisAppendCommand(context, "DEL %b", key, strlen(key));//DEL
  795. redisAppendCommand(context, "EXEC");//EXEC
  796. #if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
  797. sleep(10);
  798. #endif
  799. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//MULTI reply
  800. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//DEL reply
  801. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//EXEC reply
  802. }
  803. //If the above is aborted, let the lock expire.
  804. }
  805. void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire)
  806. {
  807. //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
  808. StringBuffer channel;
  809. encodeChannel(channel, key, database);
  810. //Query key and set lock if non existent
  811. if (lock(ctx, key, channel.str(), expire))
  812. return;
  813. #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
  814. {
  815. OwnedReply pubReply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
  816. assertOnError(pubReply->query(), "del fail");
  817. }
  818. #endif
  819. //SUB before GET
  820. //Requires separate connection from GET so that the replies are not mangled. This could be averted
  821. Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeLeft());
  822. OwnedReply subReply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
  823. //Defer checking of reply/connection errors until actually needed.
  824. #if(0)//Test publish before GET.
  825. {
  826. OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
  827. assertOnError(pubReply->query(), "pub fail");
  828. }
  829. #endif
  830. //Now GET
  831. OwnedReply getReply = Reply::createReply((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
  832. assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
  833. #if(0)//Test publish after GET.
  834. {
  835. OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
  836. assertOnError(pubReply->query(), "pub fail");
  837. }
  838. #endif
  839. //Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
  840. //failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
  841. if (getReply->query()->type != REDIS_REPLY_NIL && getReply->query()->str && strncmp(getReply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
  842. {
  843. retVal->set(getReply->query()->len, getReply->query()->str);
  844. return;
  845. }
  846. else
  847. {
  848. //Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
  849. if (getReply->query()->str && strcmp(getReply->query()->str, channel.str()) !=0 )
  850. {
  851. VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", getReply->query()->str, channel.str());
  852. fail("GetOrLock<type>", msg.str(), key);
  853. //MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
  854. //However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
  855. //More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
  856. //this plugin, rather than the string retrieved as the lock value (the value of the locked key).
  857. }
  858. getReply.clear();
  859. #if(0)//Added to allow for manual pub testing via redis-cli
  860. struct timeval to = { 10, 0 };//10secs
  861. ::redisSetTimeout(subConnection->context, to);
  862. #endif
  863. //Locked so SUBSCRIBE
  864. subConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
  865. if (subReply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", subReply->query()->element[0]->str) != 0 )
  866. fail("GetOrLock<type>", "failed to register SUB", key);//NOTE: In this instance better to be this->fail rather than subConnection->fail - due to database reported in msg.
  867. subConnection->readReply(subReply);
  868. subConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
  869. if (subReply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", subReply->query()->element[0]->str) == 0)
  870. {
  871. //We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
  872. //Since the value is sent via a published message, there is no direct reply struct so assume that an empty
  873. //string is equivalent to a non-existent key.
  874. //More importantly, it is paramount that this routine only return an empty string under one condition, that
  875. //which indicates to the caller that the key was successfully locked.
  876. //NOTE: it is possible for an empty message to have been PUBLISHed.
  877. if (subReply->query()->element[2]->len > 0)
  878. {
  879. retVal->set(subReply->query()->element[2]->len, subReply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
  880. return;
  881. }
  882. //fail that key does not exist
  883. redisReply fakeReply;
  884. fakeReply.type = REDIS_REPLY_NIL;
  885. assertKey(&fakeReply, key);
  886. }
  887. }
  888. throwUnexpected();
  889. }
  890. void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
  891. {
  892. //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
  893. StringBuffer channel;
  894. encodeChannel(channel, key, database);
  895. if (size > 29)//c.f. 1st note below.
  896. {
  897. OwnedReply replyContainer = new Reply();
  898. if (expire == 0)
  899. {
  900. const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
  901. replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
  902. if (noScript(replyContainer->query()))
  903. {
  904. const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
  905. replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
  906. }
  907. }
  908. else
  909. {
  910. const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
  911. replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
  912. if (noScript(replyContainer->query()))
  913. {
  914. const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
  915. replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
  916. }
  917. }
  918. assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
  919. }
  920. else
  921. {
  922. StringBuffer cmd("SET %b %b");
  923. RedisPlugin::appendExpire(cmd, expire);
  924. redisAppendCommand(context, "MULTI");
  925. redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
  926. redisAppendCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), value, size);//PUB
  927. redisAppendCommand(context, "EXEC");
  928. //Now read and assert replies
  929. OwnedReply reply = new Reply();
  930. readReplyAndAssertWithCmdMsg(reply, "SET", key);//MULTI reply
  931. readReplyAndAssertWithCmdMsg(reply, "SET", key);//SET reply
  932. readReplyAndAssertWithCmdMsg(reply, "PUB for the key", key);//PUB reply
  933. readReplyAndAssertWithCmdMsg(reply, "SET", key);//EXEC reply
  934. }
  935. //NOTE: When setting and publishing the data with a pipelined MULTI-SET-PUB-EXEC, the data is sent twice, once with the SET and again with the PUBLISH.
  936. //To prevent this, send the data to the server only once with a server-side lua script that then sets and publishes the data from the server.
  937. //However, there is a transmission overhead for this method that may still be larger than sending the data twice if it is small enough.
  938. //multi-set-pub-exec (via strings) has a transmission length of - "MULTI SET" + key + value + "PUBLISH" + channel + value = 5 + 3 + key + 7 + value + channel + value + 4
  939. //The lua script (assuming the script already exists on the server) a length of - "EVALSHA" + digest + "1" + key + channel + value = 7 + 40 + 1 + key + channel + value
  940. //Therefore, they have same length when: 19 + value = 48 => value = 29.
  941. //NOTE: Pipelining the above commands may not be the expected behaviour, instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
  942. //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. This may not be expected behaviour
  943. //as it is now possible for the key-value to NOT actually exist in the cache though it was retrieved via a redis plugin get function. This is documented in the README.
  944. //Further more, it is possible that the locked value and thus the channel stored within the key is not that expected, i.e. computed via encodeChannel() (e.g.
  945. //if set by a non-conforming external client/process). It is however, possible to account for this via using a GETSET instead of just the SET. This returns the old
  946. //value stored, this can then be checked if it is a lock (i.e. has at least the "redis_key_lock prefix"), if it doesn't, PUB on the channel from encodeChannel(),
  947. //otherwise PUB on the value retrieved from GETSET or possibly only if it at least has the prefix "redis_key_lock".
  948. //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing. It would also mean sending the data twice.
  949. }
  950. bool Connection::noScript(const redisReply * reply) const
  951. {
  952. return (reply && reply->type == REDIS_REPLY_ERROR && strncmp(reply->str, "NOSCRIPT", 8) == 0);
  953. }
  954. //--------------------------------------------------------------------------------
  955. // ECL SERVICE ENTRYPOINTS
  956. //--------------------------------------------------------------------------------
  957. //-----------------------------------SET------------------------------------------
  958. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  959. {
  960. SyncLockRSet(ctx, options, key, valueLength, value, database, expire, password, timeout);
  961. returnLength = valueLength;
  962. returnValue = (char*)allocateAndCopy(value, valueLength);
  963. }
  964. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  965. {
  966. unsigned valueSize = (valueLength)*sizeof(UChar);
  967. SyncLockRSet(ctx, options, key, valueSize, (char*)value, database, expire, password, timeout);
  968. returnLength= valueLength;
  969. returnValue = (UChar*)allocateAndCopy(value, valueSize);
  970. }
  971. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
  972. {
  973. unsigned valueSize = rtlUtf8Size(valueLength, value);
  974. SyncLockRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
  975. returnLength = valueLength;
  976. returnValue = (char*)allocateAndCopy(value, valueSize);
  977. }
  978. //-------------------------------------GET----------------------------------------
  979. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
  980. {
  981. size_t _returnSize;
  982. SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout);
  983. returnSize = static_cast<size32_t>(_returnSize);
  984. }
  985. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
  986. {
  987. size_t returnSize;
  988. char * _returnValue;
  989. SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, expire, password, timeout);
  990. returnValue = (UChar*)_returnValue;
  991. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  992. }
  993. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
  994. {
  995. size_t returnSize;
  996. SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout);
  997. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  998. }
  999. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
  1000. {
  1001. Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
  1002. master->unlock(ctx, key);
  1003. }
  1004. }//close namespace