redis.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jthread.hpp"
  15. #include "eclrtl.hpp"
  16. #include "jstring.hpp"
  17. #include "redis.hpp"
  18. #include "hiredis/hiredis.h"
  19. #define REDIS_VERSION "redis plugin 1.0.0"
  20. ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  21. {
  22. if (pb->size != sizeof(ECLPluginDefinitionBlock))
  23. return false;
  24. pb->magicVersion = PLUGIN_VERSION;
  25. pb->version = REDIS_VERSION;
  26. pb->moduleName = "lib_redis";
  27. pb->ECL = NULL;
  28. pb->flags = PLUGIN_IMPLICIT_MODULE;
  29. pb->description = "ECL plugin library for the C API hiredis\n";
  30. return true;
  31. }
  32. namespace RedisPlugin {
  33. class Connection;
  34. static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
  35. static __thread Connection * cachedConnection;
  36. static __thread ThreadTermFunc threadHookChain;
  37. StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
  38. {
  39. if (expire > 0)
  40. buffer.append(" EX ").append(expire/1000);
  41. return buffer;
  42. }
  43. class Reply : public CInterface
  44. {
  45. public :
  46. inline Reply() : reply(NULL) { };
  47. inline Reply(void * _reply) : reply((redisReply*)_reply) { }
  48. inline Reply(redisReply * _reply) : reply(_reply) { }
  49. inline ~Reply()
  50. {
  51. if (reply)
  52. freeReplyObject(reply);
  53. }
  54. static Reply * createReply(void * _reply) { return new Reply(_reply); }
  55. inline const redisReply * query() const { return reply; }
  56. void setClear(redisReply * _reply)
  57. {
  58. if (reply)
  59. freeReplyObject(reply);
  60. reply = _reply;
  61. }
  62. private :
  63. redisReply * reply;
  64. };
  65. typedef Owned<RedisPlugin::Reply> OwnedReply;
  66. class Connection : public CInterface
  67. {
  68. public :
  69. Connection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
  70. Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout);
  71. ~Connection()
  72. {
  73. if (context)
  74. redisFree(context);
  75. }
  76. static Connection * createConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
  77. //set
  78. template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
  79. template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
  80. //get
  81. template <class type> void get(ICodeContext * ctx, const char * key, type & value);
  82. template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
  83. //-------------------------------LOCKING------------------------------------------------
  84. void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
  85. void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password);
  86. void unlock(ICodeContext * ctx, const char * key);
  87. //--------------------------------------------------------------------------------------
  88. void persist(ICodeContext * ctx, const char * key);
  89. void expire(ICodeContext * ctx, const char * key, unsigned _expire);
  90. void del(ICodeContext * ctx, const char * key);
  91. void clear(ICodeContext * ctx);
  92. unsigned __int64 dbSize(ICodeContext * ctx);
  93. bool exists(ICodeContext * ctx, const char * key);
  94. protected :
  95. void parseOptions(ICodeContext * ctx, const char * _options);
  96. void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
  97. void selectDB(ICodeContext * ctx, unsigned __int64 _database);
  98. void authenticate(ICodeContext * ctx, const char * password);
  99. void resetContextErr();
  100. void readReply(Reply * reply);
  101. void readReplyAndAssert(Reply * reply, const char * msg);
  102. void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
  103. void assertKey(const redisReply * reply, const char * key);
  104. void assertOnError(const redisReply * reply, const char * _msg);
  105. void assertOnCommandError(const redisReply * reply, const char * cmd);
  106. void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
  107. void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
  108. void assertConnection();
  109. void updateTimeout(unsigned __int64 _timeout);
  110. void * allocateAndCopy(const char * src, size_t size);
  111. bool isSameConnection(ICodeContext * ctx, const char * password) const;
  112. //-------------------------------LOCKING------------------------------------------------
  113. void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
  114. void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password);
  115. void encodeChannel(StringBuffer & channel, const char * key) const;
  116. bool lock(ICodeContext * ctx, const char * key, const char * channel);
  117. //--------------------------------------------------------------------------------------
  118. protected :
  119. redisContext * context;
  120. StringAttr options;
  121. StringAttr ip;
  122. unsigned serverIpPortPasswordHash;
  123. int port;
  124. unsigned __int64 timeout;
  125. unsigned __int64 database;
  126. };
  127. //The following class is here to ensure destruction of the cachedConnection within the main thread
  128. //as this is not handled by the thread hook mechanism.
  129. static class mainThreadCachedConnection
  130. {
  131. public :
  132. mainThreadCachedConnection() { }
  133. ~mainThreadCachedConnection()
  134. {
  135. if (cachedConnection)
  136. cachedConnection->Release();
  137. }
  138. } mainThread;
  139. static void releaseContext()
  140. {
  141. if (cachedConnection)
  142. {
  143. cachedConnection->Release();
  144. cachedConnection = NULL;
  145. }
  146. if (threadHookChain)
  147. {
  148. (*threadHookChain)();
  149. threadHookChain = NULL;
  150. }
  151. }
  152. Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
  153. : database(0), timeout(_timeout), port(0)
  154. {
  155. serverIpPortPasswordHash = hashc((const unsigned char*)password, strlen(password), 0);
  156. serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
  157. options.set(_options, strlen(_options));
  158. parseOptions(ctx, _options);
  159. connect(ctx, _database, password);
  160. }
  161. Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
  162. : database(0), timeout(_timeout), serverIpPortPasswordHash(_serverIpPortPasswordHash), port(_port)
  163. {
  164. options.set(_options, strlen(_options));
  165. ip.set(_ip, strlen(_ip));
  166. connect(ctx, _database, password);
  167. }
  168. void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
  169. {
  170. struct timeval to = { timeout/1000, timeout%1000 };
  171. context = redisConnectWithTimeout(ip.str(), port, to);
  172. redisSetTimeout(context, to);
  173. assertConnection();
  174. //The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
  175. //such that they may be pipelined to save an extra round trip to the server and back.
  176. if (password && *password)
  177. redisAppendCommand(context, "AUTH %b", password, strlen(password));
  178. if (database != _database)
  179. {
  180. VStringBuffer cmd("SELECT %" I64F "u", database);
  181. redisAppendCommand(context, cmd.str());
  182. }
  183. //Now read replies.
  184. OwnedReply reply = new Reply();
  185. if (password && *password)
  186. readReplyAndAssert(reply, "server authentication failed");
  187. if (database != _database)
  188. {
  189. readReplyAndAssert(reply, "request to SELECT database failed");
  190. database = _database;
  191. }
  192. }
  193. bool Connection::isSameConnection(ICodeContext * ctx, const char * password) const
  194. {
  195. unsigned hash = hashc((const unsigned char*)options.str(), options.length(), hashc((const unsigned char*)password, strlen(password), 0));
  196. return (serverIpPortPasswordHash == hash);
  197. }
  198. void * Connection::allocateAndCopy(const char * src, size_t size)
  199. {
  200. void * value = rtlMalloc(size);
  201. return memcpy(value, src, size);
  202. }
  203. void Connection::parseOptions(ICodeContext * ctx, const char * _options)
  204. {
  205. StringArray optionStrings;
  206. optionStrings.appendList(_options, " ");
  207. ForEachItemIn(idx, optionStrings)
  208. {
  209. const char *opt = optionStrings.item(idx);
  210. if (strncmp(opt, "--SERVER=", 9) == 0)
  211. {
  212. opt += 9;
  213. StringArray splitPort;
  214. splitPort.appendList(opt, ":");
  215. if (splitPort.ordinality()==2)
  216. {
  217. ip.set(splitPort.item(0));
  218. port = atoi(splitPort.item(1));
  219. }
  220. }
  221. else
  222. {
  223. VStringBuffer err("RedisPlugin: unsupported option string %s", opt);
  224. rtlFail(0, err.str());
  225. }
  226. }
  227. if (ip.isEmpty())
  228. {
  229. ip.set("localhost");
  230. port = 6379;
  231. if (ctx)
  232. {
  233. VStringBuffer msg("Redis Plugin: WARNING - using default server (%s:%d)", ip.str(), port);
  234. ctx->logString(msg.str());
  235. }
  236. }
  237. return;
  238. }
  239. void Connection::authenticate(ICodeContext * ctx, const char * password)
  240. {
  241. if (password && *password)
  242. {
  243. OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", password, strlen(password)));
  244. assertOnError(reply->query(), "server authentication failed");
  245. }
  246. }
  247. void Connection::resetContextErr()
  248. {
  249. if (context)
  250. context->err = REDIS_OK;
  251. }
  252. void Connection::readReply(Reply * reply)
  253. {
  254. redisReply * nakedReply = NULL;
  255. redisGetReply(context, (void**)&nakedReply);
  256. assertex(reply);
  257. reply->setClear(nakedReply);
  258. }
  259. void Connection::readReplyAndAssert(Reply * reply, const char * msg)
  260. {
  261. readReply(reply);
  262. assertex(reply);
  263. assertOnError(reply->query(), msg);
  264. }
  265. void Connection::readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key)
  266. {
  267. readReply(reply);
  268. assertex(reply);
  269. assertOnCommandErrorWithKey(reply->query(), msg, key);
  270. }
  271. Connection * Connection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
  272. {
  273. if (!cachedConnection)
  274. {
  275. cachedConnection = new Connection(ctx, options, _database, password, _timeout);
  276. threadHookChain = addThreadTermFunc(releaseContext);
  277. return LINK(cachedConnection);
  278. }
  279. if (cachedConnection->isSameConnection(ctx, password))
  280. {
  281. //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
  282. //At present updateTimeout calls assertConnection.
  283. cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
  284. cachedConnection->updateTimeout(_timeout);
  285. cachedConnection->selectDB(ctx, _database);
  286. return LINK(cachedConnection);
  287. }
  288. cachedConnection->Release();
  289. cachedConnection = new Connection(ctx, options, _database, password, _timeout);
  290. return LINK(cachedConnection);
  291. }
  292. void Connection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
  293. {
  294. if (database == _database)
  295. return;
  296. database = _database;
  297. VStringBuffer cmd("SELECT %" I64F "u", database);
  298. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
  299. assertOnCommandError(reply->query(), "SELECT");
  300. }
  301. void Connection::updateTimeout(unsigned __int64 _timeout)
  302. {
  303. if (timeout == _timeout)
  304. return;
  305. assertConnection();
  306. timeout = _timeout;
  307. struct timeval to = { timeout/1000, timeout%1000 };
  308. assertex(context);
  309. if (redisSetTimeout(context, to) != REDIS_OK)
  310. {
  311. if (context->err)
  312. {
  313. VStringBuffer msg("RedisPlugin: failed to set timeout - %s", context->errstr);
  314. rtlFail(0, msg.str());
  315. }
  316. else
  317. rtlFail(0, "RedisPlugin: failed to set timeout - no message available");
  318. }
  319. }
  320. void Connection::assertOnError(const redisReply * reply, const char * _msg)
  321. {
  322. if (!reply)//assertex(reply)?
  323. {
  324. //There should always be a context error if no reply error
  325. assertConnection();
  326. VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
  327. rtlFail(0, msg.str());
  328. }
  329. else if (reply->type == REDIS_REPLY_ERROR)
  330. {
  331. if (strncmp(reply->str, "NOAUTH", 6) == 0)
  332. {
  333. VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
  334. rtlFail(0, msg.str());
  335. }
  336. else
  337. {
  338. VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
  339. rtlFail(0, msg.str());
  340. }
  341. }
  342. }
  343. void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
  344. {
  345. if (!reply)//assertex(reply)?
  346. {
  347. //There should always be a context error if no reply error
  348. assertConnection();
  349. VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, key, database);
  350. rtlFail(0, msg.str());
  351. }
  352. else if (reply->type == REDIS_REPLY_ERROR)
  353. {
  354. if (strncmp(reply->str, "NOAUTH", 6) == 0)
  355. {
  356. VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
  357. rtlFail(0, msg.str());
  358. }
  359. else
  360. {
  361. VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
  362. rtlFail(0, msg.str());
  363. }
  364. }
  365. }
  366. void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
  367. {
  368. if (!reply)//assertex(reply)?
  369. {
  370. //There should always be a context error if no reply error
  371. assertConnection();
  372. VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, database);
  373. rtlFail(0, msg.str());
  374. }
  375. else if (reply->type == REDIS_REPLY_ERROR)
  376. {
  377. if (strncmp(reply->str, "NOAUTH", 6) == 0)
  378. {
  379. VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
  380. rtlFail(0, msg.str());
  381. }
  382. else
  383. {
  384. VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
  385. rtlFail(0, msg.str());
  386. }
  387. }
  388. }
  389. void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
  390. {
  391. if (!reply)//assertex(reply)?
  392. {
  393. //There should always be a context error if no reply error
  394. assertConnection();
  395. VStringBuffer msg("Redis Plugin: ERROR - %s failed with neither 'reply' nor connection error available", cmd);
  396. rtlFail(0, msg.str());
  397. }
  398. else if (reply->type == REDIS_REPLY_ERROR)
  399. {
  400. if (strncmp(reply->str, "NOAUTH", 6) == 0)
  401. {
  402. VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
  403. rtlFail(0, msg.str());
  404. }
  405. else
  406. {
  407. VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
  408. rtlFail(0, msg.str());
  409. }
  410. }
  411. }
  412. void Connection::assertKey(const redisReply * reply, const char * key)
  413. {
  414. if (reply && reply->type == REDIS_REPLY_NIL)
  415. {
  416. VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %" I64F "u", key, database);
  417. rtlFail(0, msg.str());
  418. }
  419. }
  420. void Connection::assertConnection()
  421. {
  422. if (!context)
  423. rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
  424. else if (context->err)
  425. {
  426. VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip.str(), port);
  427. rtlFail(0, msg.str());
  428. }
  429. }
  430. void Connection::clear(ICodeContext * ctx)
  431. {
  432. //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
  433. OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
  434. //NOTE: documented as never failing, but in case
  435. assertOnCommandErrorWithDatabase(reply->query(), "FlushDB");
  436. }
  437. void Connection::del(ICodeContext * ctx, const char * key)
  438. {
  439. OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
  440. assertOnCommandErrorWithKey(reply->query(), "Del", key);
  441. }
  442. void Connection::persist(ICodeContext * ctx, const char * key)
  443. {
  444. OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
  445. assertOnCommandErrorWithKey(reply->query(), "Persist", key);
  446. }
  447. void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
  448. {
  449. OwnedReply reply = Reply::createReply(redisCommand(context, "EXPIRE %b %u", key, strlen(key), _expire/1000));
  450. assertOnCommandErrorWithKey(reply->query(), "Expire", key);
  451. }
  452. bool Connection::exists(ICodeContext * ctx, const char * key)
  453. {
  454. OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
  455. assertOnCommandErrorWithKey(reply->query(), "Exists", key);
  456. return (reply->query()->integer != 0);
  457. }
  458. unsigned __int64 Connection::dbSize(ICodeContext * ctx)
  459. {
  460. OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
  461. assertOnCommandErrorWithDatabase(reply->query(), "DBSIZE");
  462. return reply->query()->integer;
  463. }
  464. //-------------------------------------------SET-----------------------------------------
  465. //--OUTER--
  466. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
  467. {
  468. Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
  469. master->set(ctx, key, value, expire);
  470. }
  471. //Set pointer types
  472. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
  473. {
  474. Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
  475. master->set(ctx, key, valueSize, value, expire);
  476. }
  477. //--INNER--
  478. template<class type> void Connection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
  479. {
  480. const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
  481. StringBuffer cmd("SET %b %b");
  482. appendExpire(cmd, expire);
  483. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
  484. assertOnCommandErrorWithKey(reply->query(), "SET", key);
  485. }
  486. template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
  487. {
  488. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  489. StringBuffer cmd("SET %b %b");
  490. appendExpire(cmd, expire);
  491. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
  492. assertOnCommandErrorWithKey(reply->query(), "SET", key);
  493. }
  494. //-------------------------------------------GET-----------------------------------------
  495. //--OUTER--
  496. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
  497. {
  498. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
  499. master->get(ctx, key, returnValue);
  500. }
  501. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
  502. {
  503. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
  504. master->get(ctx, key, returnSize, returnValue);
  505. }
  506. //--INNER--
  507. template<class type> void Connection::get(ICodeContext * ctx, const char * key, type & returnValue)
  508. {
  509. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  510. assertOnError(reply->query(), "GET");
  511. assertKey(reply->query(), key);
  512. size_t returnSize = reply->query()->len;
  513. if (sizeof(type)!=returnSize)
  514. {
  515. VStringBuffer msg("RedisPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
  516. rtlFail(0, msg.str());
  517. }
  518. memcpy(&returnValue, reply->query()->str, returnSize);
  519. }
  520. template<class type> void Connection::get(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
  521. {
  522. OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
  523. assertOnError(reply->query(), "GET");
  524. assertKey(reply->query(), key);
  525. returnSize = reply->query()->len;
  526. returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
  527. }
  528. //--------------------------------------------------------------------------------
  529. // ECL SERVICE ENTRYPOINTS
  530. //--------------------------------------------------------------------------------
  531. ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  532. {
  533. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  534. master->clear(ctx);
  535. }
  536. ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  537. {
  538. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  539. return master->exists(ctx, key);
  540. }
  541. ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  542. {
  543. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  544. master->del(ctx, key);
  545. }
  546. ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  547. {
  548. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  549. master->persist(ctx, key);
  550. }
  551. ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, unsigned _expire, const char * password, unsigned __int64 timeout)
  552. {
  553. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  554. master->expire(ctx, key, _expire);
  555. }
  556. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  557. {
  558. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  559. return master->dbSize(ctx);
  560. }
  561. //-----------------------------------SET------------------------------------------
  562. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  563. {
  564. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
  565. }
  566. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  567. {
  568. SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout);
  569. }
  570. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  571. {
  572. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  573. }
  574. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  575. {
  576. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  577. }
  578. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  579. {
  580. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  581. }
  582. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  583. {
  584. SyncRSet(ctx, options, key, value, database, expire, password, timeout);
  585. }
  586. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  587. {
  588. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
  589. }
  590. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  591. {
  592. SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout);
  593. }
  594. //-------------------------------------GET----------------------------------------
  595. ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  596. {
  597. bool value;
  598. SyncRGet(ctx, options, key, value, database, password, timeout);
  599. return value;
  600. }
  601. ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  602. {
  603. double value;
  604. SyncRGet(ctx, options, key, value, database, password, timeout);
  605. return value;
  606. }
  607. ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  608. {
  609. signed __int64 value;
  610. SyncRGet(ctx, options, key, value, database, password, timeout);
  611. return value;
  612. }
  613. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  614. {
  615. unsigned __int64 value;
  616. SyncRGet(ctx, options, key, value, database, password, timeout);
  617. return value;
  618. }
  619. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  620. {
  621. size_t _returnSize;
  622. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
  623. returnSize = static_cast<size32_t>(_returnSize);
  624. }
  625. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  626. {
  627. size_t returnSize;
  628. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
  629. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  630. }
  631. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  632. {
  633. size_t returnSize;
  634. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
  635. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  636. }
  637. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  638. {
  639. size_t _returnSize;
  640. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
  641. returnSize = static_cast<size32_t>(_returnSize);
  642. }
  643. //----------------------------------LOCK------------------------------------------
  644. //-----------------------------------SET-----------------------------------------
  645. //Set pointer types
  646. void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
  647. {
  648. Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
  649. master->lockSet(ctx, key, valueSize, value, expire);
  650. }
  651. //--INNER--
  652. void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire)
  653. {
  654. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  655. handleLockOnSet(ctx, key, _value, (size_t)valueSize, expire);
  656. }
  657. //-------------------------------------------GET-----------------------------------------
  658. //--OUTER--
  659. void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
  660. {
  661. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
  662. master->lockGet(ctx, key, returnSize, returnValue, password);
  663. }
  664. //--INNER--
  665. void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password)
  666. {
  667. MemoryAttr retVal;
  668. handleLockOnGet(ctx, key, &retVal, password);
  669. returnSize = retVal.length();
  670. returnValue = reinterpret_cast<char*>(retVal.detach());
  671. }
  672. //---------------------------------------------------------------------------------------
  673. void Connection::encodeChannel(StringBuffer & channel, const char * key) const
  674. {
  675. channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database).append("_").append(ip.str()).append("_").append(port);
  676. }
  677. bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel)
  678. {
  679. StringBuffer cmd("SET %b %b NX EX ");
  680. cmd.append(timeout/1000);
  681. OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
  682. assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
  683. if (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0)
  684. return true;
  685. return false;
  686. }
  687. void Connection::unlock(ICodeContext * ctx, const char * key)
  688. {
  689. //WATCH key, if altered between WATCH and EXEC abort all commands inbetween
  690. redisAppendCommand(context, "WATCH %b", key, strlen(key));
  691. redisAppendCommand(context, "GET %b", key, strlen(key));
  692. //Read replies
  693. OwnedReply reply = new Reply();
  694. readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//WATCH reply
  695. readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//GET reply
  696. //check if locked
  697. if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
  698. {
  699. //MULTI - all commands between MULTI and EXEC are considered an atomic transaction on the server
  700. redisAppendCommand(context, "MULTI");//MULTI
  701. redisAppendCommand(context, "DEL %b", key, strlen(key));//DEL
  702. redisAppendCommand(context, "EXEC");//EXEC
  703. #if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
  704. sleep(10);
  705. #endif
  706. readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//MULTI reply
  707. readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//DEL reply
  708. readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//EXEC reply
  709. }
  710. //If the above is aborted, let the lock expire.
  711. }
  712. void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password)
  713. {
  714. StringBuffer channel;
  715. encodeChannel(channel, key);
  716. //Query key and set lock if non existent
  717. if (lock(ctx, key, channel.str()))
  718. return;
  719. //SUB before GET
  720. //Requires separate connection from GET so that the replies are not mangled. This could be averted
  721. Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeout);
  722. OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), channel.length()));
  723. assertOnCommandErrorWithKey(reply->query(), "GET", key);
  724. if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
  725. {
  726. VStringBuffer msg("Redis Plugin: ERROR - GET '%s' on database %" I64F "u failed : failed to register SUB", key, database);
  727. rtlFail(0, msg.str());
  728. }
  729. #if(0)
  730. {
  731. OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
  732. assertOnError(pubReply->query(), "pub fail");
  733. }
  734. #endif
  735. //Now GET
  736. reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
  737. assertOnCommandErrorWithKey(reply->query(), "GET", key);
  738. assertKey(reply->query(), key);
  739. #if(0)
  740. {
  741. OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
  742. assertOnError(pubReply->query(), "pub fail");
  743. }
  744. #endif
  745. //Check if returned value is locked
  746. if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
  747. {
  748. //Not locked so return value
  749. retVal->set(reply->query()->len, reply->query()->str);
  750. return;
  751. }
  752. else
  753. {
  754. //Check that we SUBSCRIBEd to the correct channel (which could have been manually SET).
  755. if (strcmp(reply->query()->str, channel) !=0 )
  756. {
  757. VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to.", key, database, reply->query()->str);
  758. rtlFail(0, msg.str());
  759. //MORE: We could attempt to recover at this stage by subscribing to the channel that the key was actually locked with.
  760. //However, we may have missed the massage publication already or by then.
  761. //If we ever changed the semantics of the 'timeout' to be that of these plugin functions rather than each redis call, we might as well
  762. //subscribe again if there was time left on the clock.
  763. //Since they are not, we could, though is this desirable behaviour?
  764. }
  765. #if(0)//Added to allow for manual pub testing via redis-cli
  766. struct timeval to = { 10, 0 };//10secs
  767. redisSetTimeout(subConnection->context, to);
  768. #endif
  769. //Locked so SUBSCRIBE
  770. redisReply * nakedReply = NULL;
  771. bool err = redisGetReply(subConnection->context, (void**)&nakedReply);
  772. reply->setClear(nakedReply);
  773. if (err != REDIS_OK)
  774. rtlFail(0, "RedisPlugin: ERROR - GET timed out.");
  775. assertOnCommandErrorWithKey(nakedReply, "GET", key);
  776. if (nakedReply->type == REDIS_REPLY_ARRAY && strcmp("message", nakedReply->element[0]->str) == 0)
  777. {
  778. retVal->set(nakedReply->element[2]->len, nakedReply->element[2]->str);//return the published value rather than another (WATCHed) GET.
  779. return;
  780. }
  781. }
  782. throwUnexpected();
  783. }
  784. void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
  785. {
  786. StringBuffer cmd("SET %b %b");
  787. RedisPlugin::appendExpire(cmd, expire);
  788. //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
  789. //It is possible for a process/call to 'own' a lock and store this info in the LockObject, however, this prevents sharing between clients.
  790. redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
  791. StringBuffer channel;
  792. encodeChannel(channel, key);
  793. redisAppendCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), value, size);//PUB
  794. //Now read and assert replies
  795. OwnedReply replyContainer = new Reply();
  796. readReplyAndAssertWithKey(replyContainer, "SET", key);//SET reply
  797. readReplyAndAssertWithKey(replyContainer, "PUB for the key", key);//PUB reply
  798. //NOTE: Pipelining the above commands may not be the desired behaviour but instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
  799. //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. However, this may not be expected behaviour
  800. //as it's now possible for the key-value to actually exists in the cache when it was retrieved via redis plugin get function. This is documented in the README.
  801. //Further more, it is possible that the locked value and thus the channel stored within the key is not that expected, i.e. computed via encodeChannel() (e.g.
  802. //if set by a non-conforming external client/process). It is however, possible to account for this via using a GETSET instead of just the SET. This returns the old
  803. //value stored, this can then be checked if it is a lock (i.e. has at least the "redis_key_lock prefix"), if it doesn't, PUB on the channel from encodeChannel(),
  804. //otherwise PUB on the value retrieved from GETSET or possibly only if it at least has the prefix "redis_key_lock".
  805. //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing.
  806. }
  807. //--------------------------------------------------------------------------------
  808. // ECL SERVICE ENTRYPOINTS
  809. //--------------------------------------------------------------------------------
  810. //-----------------------------------SET------------------------------------------
  811. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  812. {
  813. SyncLockRSet(ctx, options, key, valueSize, value, expire, database, password, timeout);
  814. returnSize = valueSize;
  815. returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
  816. }
  817. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  818. {
  819. unsigned valueSize = (valueLength)*sizeof(UChar);
  820. SyncLockRSet(ctx, options, key, valueSize, (char*)value, expire, database, password, timeout);
  821. returnLength= valueLength;
  822. returnValue = (UChar*)memcpy(rtlMalloc(valueSize), (void*)value, valueSize);
  823. }
  824. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
  825. {
  826. unsigned valueSize = rtlUtf8Size(valueLength, value);
  827. SyncLockRSet(ctx, options, key, valueSize, value, expire, database, password, timeout);
  828. returnLength = valueLength;
  829. returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
  830. }
  831. //-------------------------------------GET----------------------------------------
  832. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  833. {
  834. size_t _returnSize;
  835. SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
  836. returnSize = static_cast<size32_t>(_returnSize);
  837. }
  838. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  839. {
  840. size_t returnSize;
  841. char * _returnValue;
  842. SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, password, timeout);
  843. returnValue = (UChar*)_returnValue;
  844. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  845. }
  846. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  847. {
  848. size_t returnSize;
  849. SyncLockRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
  850. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  851. }
  852. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
  853. {
  854. Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
  855. master->unlock(ctx, key);
  856. }
  857. }//close namespace