redis.cpp 75 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jthread.hpp"
  15. #include "eclrtl.hpp"
  16. #include "jstring.hpp"
  17. #include "jmutex.hpp"
  18. #include "redis.hpp"
  19. extern "C"
  20. {
  21. #include "hiredis/hiredis.h"
  22. }
  23. #define REDIS_VERSION "redis plugin 1.0.0"
  24. ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  25. {
  26. if (pb->size != sizeof(ECLPluginDefinitionBlock))
  27. return false;
  28. pb->magicVersion = PLUGIN_VERSION;
  29. pb->version = REDIS_VERSION;
  30. pb->moduleName = "lib_redis";
  31. pb->ECL = nullptr;
  32. pb->flags = PLUGIN_IMPLICIT_MODULE;
  33. pb->description = "ECL plugin library for the C API hiredis";
  34. return true;
  35. }
  36. namespace RedisPlugin {
  37. class Connection;
  38. static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
  39. static __thread Connection * cachedConnection = nullptr;
  40. static __thread Connection * cachedPubConnection = nullptr;//database should always = 0
  41. static __thread Connection * cachedSubscriptionConnection = nullptr;
  42. #define NO_CONNECTION_CACHING 0
  43. #define ALLOW_CONNECTION_CACHING 1
  44. #define CACHE_ALL_CONNECTIONS 2
  45. #define INTERNAL_TIMEOUT -2
  46. #define DUMMY_IP 0
  47. #define DUMMY_PORT 0
  48. static CriticalSection critsec;
  49. static __thread bool threadHooked = false;
  50. static int connectionCachingLevel = ALLOW_CONNECTION_CACHING;
  51. static std::atomic<bool> connectionCachingLevelChecked(false);
  52. static bool cacheSubConnections = true;
  53. static std::atomic<bool> cacheSubConnectionsOptChecked(false);
  54. static int unsubscribeTimeout = 100;//ms
  55. static std::atomic<bool> unsubscribeTimeoutChecked(false);
  56. static unsigned unsubscribeReadAttempts = 2;//The max number of possible socket read attempts when wanting the desired unsubscribe confirmation, otherwise give up.
  57. static std::atomic<bool> unsubscribeReadAttemptsChecked(false);
  58. static void * allocateAndCopy(const void * src, size_t size)
  59. {
  60. return memcpy(rtlMalloc(size), src, size);
  61. }
  62. static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
  63. {
  64. if (expire > 0)
  65. buffer.append(" PX ").append(expire);
  66. return buffer;
  67. }
  68. class Reply : public CInterface
  69. {
  70. public :
  71. inline Reply() { };
  72. inline Reply(void * _reply) : reply((redisReply*)_reply) { }
  73. inline Reply(redisReply * _reply) : reply(_reply) { }
  74. inline ~Reply()
  75. {
  76. if (reply)
  77. freeReplyObject(reply);
  78. }
  79. static inline Reply * createReply(void * _reply) { return new Reply(_reply); }
  80. inline const redisReply * query() const { return reply; }
  81. inline void setClear(void * _reply) { setClear((redisReply*)_reply); }
  82. void setClear(redisReply * _reply);
  83. private :
  84. redisReply * reply = nullptr;
  85. };
  86. void Reply::setClear(redisReply * _reply)
  87. {
  88. if (reply == _reply)
  89. return;
  90. if (reply)
  91. freeReplyObject(reply);
  92. reply = _reply;
  93. }
  94. typedef Owned<RedisPlugin::Reply> OwnedReply;
  95. class TimeoutHandler
  96. {
  97. public :
  98. TimeoutHandler(unsigned _timeout) : timeout(_timeout), t0(msTick()) { }
  99. inline void reset(unsigned _timeout) { timeout = _timeout; t0 = msTick(); }
  100. unsigned timeLeft() const;
  101. inline unsigned getTimeout() const { return timeout; }
  102. private :
  103. unsigned timeout;
  104. unsigned t0;
  105. };
  106. unsigned TimeoutHandler::timeLeft() const
  107. {
  108. //This function is ambiguous and the caller must disambiguate timeout == 0 from timeLeft == 0
  109. if (timeout)
  110. {
  111. unsigned dt = msTick() - t0;
  112. if (dt < timeout)
  113. return timeout - dt;
  114. }
  115. return 0;
  116. }
  117. class Connection : public CInterface
  118. {
  119. friend class ConnectionContainer;
  120. public :
  121. Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool selectDB);
  122. ~Connection() { freeContext(); }
  123. static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool cachedConnectionRequested, bool isSubscription = false);
  124. //set
  125. template <class type> void setKey(ICodeContext * ctx, const char * key, type value, unsigned expire);
  126. template <class type> void setKey(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
  127. void setIntKey(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned);
  128. void setRealKey(ICodeContext * ctx, const char * key, double value, unsigned expire);
  129. //get
  130. template <class type> void getKey(ICodeContext * ctx, const char * key, type & value);
  131. template <class type> void getKey(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
  132. template <class type> void getNumericKey(ICodeContext * ctx, const char * key, type & value);
  133. signed __int64 returnInt(const char * key, const char * cmd, const redisReply * reply);
  134. //-------------------------------LOCKING------------------------------------------------
  135. void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
  136. void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password, unsigned expire);
  137. void unlock(ICodeContext * ctx, const char * key);
  138. //--------------------------------------------------------------------------------------
  139. //-------------------------------PUB/SUB------------------------------------------------
  140. unsigned __int64 publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey);
  141. void subAndWaitForSinglePub(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey);
  142. //--------------------------------------------------------------------------------------
  143. void persist(ICodeContext * ctx, const char * key);
  144. void expire(ICodeContext * ctx, const char * key, unsigned _expire);
  145. void del(ICodeContext * ctx, const char * key);
  146. void clear(ICodeContext * ctx);
  147. unsigned __int64 dbSize(ICodeContext * ctx);
  148. bool exists(ICodeContext * ctx, const char * key);
  149. signed __int64 incrBy(ICodeContext * ctx, const char * key, signed __int64 value);
  150. protected : //Specific to subscribed connections
  151. void subscribe(ICodeContext * ctx, const char * channel);
  152. void unsubscribe();
  153. bool isCorrectChannel(const redisReply * reply, const char * op) const;
  154. int redisSetUnsubscribeTimeout();
  155. static int getUnsubscribeTimeout();
  156. static unsigned getUnsubscribeReadAttempts();
  157. protected :
  158. void freeContext();
  159. int redisSetTimeout();
  160. int setTimeout(unsigned _timeout);
  161. inline unsigned timeLeft() const { return timeout.timeLeft(); }
  162. void assertTimeout(int state);
  163. void redisConnect();
  164. void doParseOptions(ICodeContext * ctx, const char * _options);
  165. void connect(ICodeContext * ctx, int _database, const char * password, bool selectDB);
  166. inline bool isCachedConnection() const { return (this == cachedConnection) || (this == cachedPubConnection) || (this == cachedSubscriptionConnection); }
  167. void selectDB(ICodeContext * ctx, int _database);
  168. void readReply(Reply * reply);
  169. void readReplyAndAssert(Reply * reply, const char * msg);
  170. void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = nullptr);
  171. void assertKey(const redisReply * reply, const char * key);
  172. void assertAuthorization(const redisReply * reply);
  173. void assertOnError(const redisReply * reply, const char * _msg);
  174. void assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key = nullptr);
  175. void assertConnection(const char * _msg);
  176. void assertConnectionWithCmdMsg(const char * cmd, const char * key = nullptr);
  177. __declspec(noreturn) void fail(const char * cmd, const char * errmsg, const char * key = nullptr) __attribute__((noreturn));
  178. void * redisCommand(const char * format, ...);
  179. void fromStr(const char * str, const char * key, double & ret);
  180. void fromStr(const char * str, const char * key, signed __int64 & ret);
  181. void fromStr(const char * str, const char * key, unsigned __int64 & ret);
  182. static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
  183. static bool canCacheConnections(bool cachedConnectionRequested, bool isSubscription);
  184. static int getConnectionCachingLevel();
  185. static bool getCacheSubConnections();
  186. int writeBufferToSocket();
  187. bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
  188. void reset(ICodeContext * ctx, unsigned _database, const char * password, unsigned _timeout, bool selectDB);
  189. //-------------------------------LOCKING------------------------------------------------
  190. void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
  191. void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
  192. const char * encodeChannel(StringBuffer & buffer, const char * keyOrChannel, int _database, bool lockedKey) const;
  193. bool noScript(const redisReply * reply) const;
  194. bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
  195. //--------------------------------------------------------------------------------------
  196. protected :
  197. redisContext * context = nullptr;
  198. StringAttr options;
  199. StringAttr ip; //The default is set in parseOptions as "localhost"
  200. unsigned serverIpPortPasswordHash = 0;
  201. int port = 6379; //Default redis-server port
  202. TimeoutHandler timeout;
  203. int database = 0; //NOTE: redis stores the maximum number of dbs as an 'int'.
  204. StringAttr channel;
  205. bool subscribed = false;
  206. };
  207. class ConnectionContainer : public CInterface
  208. {
  209. public :
  210. ConnectionContainer() { }
  211. ConnectionContainer(Connection * _connection)
  212. {
  213. connection.setown(_connection);
  214. }
  215. ~ConnectionContainer()
  216. {
  217. if (connection)
  218. connection->unsubscribe();
  219. }
  220. inline Connection * operator -> () const { return connection.get(); }
  221. inline void setown(Connection * _connection) { connection.setown(_connection); }
  222. __declspec(noreturn) void handleException(IException * error) __attribute__((noreturn))
  223. {
  224. if (connection)
  225. connection->freeContext();
  226. throw error;
  227. }
  228. Owned<Connection> connection;
  229. };
  230. static bool releaseAllCachedContexts(bool isPooled)
  231. {
  232. if (cachedConnection)
  233. {
  234. cachedConnection->Release();
  235. cachedConnection = nullptr;
  236. }
  237. if (cachedPubConnection)
  238. {
  239. cachedPubConnection->Release();
  240. cachedPubConnection = nullptr;
  241. }
  242. if (cachedSubscriptionConnection)
  243. {
  244. cachedSubscriptionConnection->Release();
  245. cachedSubscriptionConnection = nullptr;
  246. }
  247. threadHooked = false;
  248. return false;
  249. }
  250. //The following class is here to ensure destruction of the cachedConnection within the main thread
  251. //as this is not handled by the thread hook mechanism.
  252. static class MainThreadCachedConnection
  253. {
  254. public :
  255. MainThreadCachedConnection() { }
  256. ~MainThreadCachedConnection() { releaseAllCachedContexts(false); }
  257. } mainThread;
  258. Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool selectDB)
  259. : timeout(_timeout)
  260. {
  261. serverIpPortPasswordHash = hashServerIpPortPassword(ctx, _options, password);
  262. options.set(_options, strlen(_options));
  263. if (parseOptions || !_ip || !_port)//parseOptions(true) is intended to be used when passing a valid ip & port but check just in case.
  264. doParseOptions(ctx, _options);
  265. else
  266. {
  267. port = _port;
  268. ip.set(_ip, strlen(_ip));
  269. }
  270. connect(ctx, _database, password, selectDB);
  271. }
  272. void Connection::redisConnect()
  273. {
  274. freeContext();
  275. if (timeout.getTimeout() == 0)
  276. context = ::redisConnect(ip.str(), port);
  277. else
  278. {
  279. unsigned timeStillLeft = timeLeft();
  280. if (timeStillLeft == 0)
  281. rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
  282. struct timeval to = { (time_t) (timeStillLeft/1000), (suseconds_t) ((timeStillLeft%1000)*1000) };
  283. context = ::redisConnectWithTimeout(ip.str(), port, to);
  284. }
  285. assertConnection("connection");
  286. }
  287. void Connection::connect(ICodeContext * ctx, int _database, const char * password, bool selectDB)
  288. {
  289. redisConnect();
  290. //The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
  291. //such that they may be pipelined to save an extra round trip to the server and back.
  292. if (password && *password)
  293. redisAppendCommand(context, "AUTH %b", password, strlen(password));
  294. if (selectDB && (database != _database))
  295. {
  296. VStringBuffer cmd("SELECT %d", _database);
  297. redisAppendCommand(context, cmd.str());
  298. }
  299. //Now read replies.
  300. OwnedReply reply = new Reply();
  301. if (password && *password)
  302. readReplyAndAssert(reply, "server authentication");
  303. if (selectDB && (database != _database))
  304. {
  305. VStringBuffer cmd("SELECT %d", _database);
  306. readReplyAndAssertWithCmdMsg(reply, cmd.str());
  307. database = _database;
  308. }
  309. }
  310. void * Connection::redisCommand(const char * format, ...)
  311. {
  312. //Copied from https://github.com/redis/hiredis/blob/master/hiredis.c ~line:1008 void * redisCommand(redisContext * context, const char * format, ...)
  313. //with redisSetTimeout(); added.
  314. va_list parameters;
  315. void * reply = nullptr;
  316. va_start(parameters, format);
  317. assertTimeout(redisSetTimeout());
  318. reply = ::redisvCommand(context, format, parameters);
  319. va_end(parameters);
  320. return reply;
  321. }
  322. int Connection::setTimeout(unsigned _timeout)
  323. {
  324. struct timeval to = { (time_t) (_timeout/1000), (suseconds_t) ((_timeout%1000)*1000) };
  325. if (!context)
  326. return REDIS_ERR;
  327. return ::redisSetTimeout(context, to);//NOTE: ::redisSetTimeout sets the socket timeout and therefore 0 => forever
  328. }
  329. int Connection::redisSetTimeout()
  330. {
  331. unsigned timeStillLeft = timeLeft();
  332. if (timeStillLeft == 0 && timeout.getTimeout() != 0)
  333. return INTERNAL_TIMEOUT;
  334. return setTimeout(timeStillLeft);
  335. }
  336. int Connection::redisSetUnsubscribeTimeout()
  337. {
  338. unsigned timeout = getUnsubscribeTimeout();
  339. if (timeout == 0)//0 => no timeout => use normal timeout
  340. return redisSetTimeout();
  341. unsigned timeStillLeft = timeLeft();
  342. if (timeStillLeft == 0)
  343. return INTERNAL_TIMEOUT;
  344. unsigned tmp = unsubscribeTimeout < timeStillLeft ? unsubscribeTimeout : timeStillLeft;
  345. return setTimeout(tmp);
  346. }
  347. bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
  348. {
  349. return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
  350. }
  351. unsigned Connection::hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password)
  352. {
  353. return hashc((const unsigned char*)_options, strlen(_options), hashc((const unsigned char*)password, strlen(password), 0));
  354. }
  355. void Connection::reset(ICodeContext * ctx, unsigned _database, const char * password, unsigned _timeout, bool selectDB)
  356. {
  357. timeout.reset(_timeout);
  358. if (!context || context->err != REDIS_OK)
  359. {
  360. database = 0;
  361. connect(ctx, _database, password, selectDB);
  362. }
  363. }
  364. void Connection::doParseOptions(ICodeContext * ctx, const char * _options)
  365. {
  366. StringArray optionStrings;
  367. optionStrings.appendList(_options, " ");
  368. ForEachItemIn(idx, optionStrings)
  369. {
  370. const char *opt = optionStrings.item(idx);
  371. if (strncmp(opt, "--SERVER=", 9) == 0)
  372. {
  373. opt += 9;
  374. StringArray splitPort;
  375. splitPort.appendList(opt, ":");
  376. if (splitPort.ordinality()==2)
  377. {
  378. ip.set(splitPort.item(0));
  379. port = atoi(splitPort.item(1));
  380. }
  381. }
  382. else
  383. {
  384. VStringBuffer err("Redis Plugin: ERROR - unsupported option string '%s'", opt);
  385. rtlFail(0, err.str());
  386. }
  387. }
  388. if (ip.isEmpty())
  389. {
  390. ip.set("localhost");
  391. if (ctx)
  392. {
  393. VStringBuffer msg("Redis Plugin: WARNING - using default cache (%s:%d)", ip.str(), port);
  394. ctx->logString(msg.str());
  395. }
  396. }
  397. }
  398. void Connection::freeContext()
  399. {
  400. subscribed = false;
  401. channel.clear();
  402. if(context)
  403. {
  404. redisFree(context);
  405. context = nullptr;
  406. database = 0;
  407. }
  408. }
  409. const char * Connection::encodeChannel(StringBuffer & buffer, const char * keyOrChannel, int _database, bool lockedKey) const
  410. {
  411. if (lockedKey)
  412. buffer.append(REDIS_LOCK_PREFIX).append("_").append(keyOrChannel).append("_").append(_database);
  413. else
  414. buffer.set(keyOrChannel);
  415. return buffer.str();
  416. }
  417. bool Connection::isCorrectChannel(const redisReply * reply, const char * op) const
  418. {
  419. if (!reply || channel.isEmpty())
  420. return false;
  421. return (reply->type == REDIS_REPLY_ARRAY)
  422. && (reply->elements > 1)
  423. && (strcmp(op, reply->element[0]->str) == 0)
  424. && (strcmp(channel.str(), reply->element[1]->str) == 0);
  425. }
  426. void Connection::subscribe(ICodeContext * ctx, const char * _channel)
  427. {
  428. assert(!subscribed || channel.isEmpty());//If this is a reused subscriptionConnection then this implies that it was not correctly unsubscribed/reset.
  429. channel.set(_channel);
  430. OwnedReply reply = Reply::createReply(redisCommand("SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
  431. /* It is possible for the subscription to succeed in being registered by the server but fails in receiving the associated reply.
  432. * Set early to prevent not unsubscribing when failing.
  433. */
  434. subscribed = true;
  435. assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
  436. if (!isCorrectChannel(reply->query(), "subscribe"))
  437. fail("SUBSCRIBE", "failed to register SUB", channel.str());
  438. }
  439. void Connection::subAndWaitForSinglePub(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey)
  440. {
  441. StringBuffer channelBuffer;
  442. encodeChannel(channelBuffer, keyOrChannel, _database, lockedKey);
  443. subscribe(ctx, channelBuffer.str());
  444. //Now wait for published message
  445. OwnedReply reply = new Reply();
  446. readReply(reply);
  447. assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
  448. if (isCorrectChannel(reply->query(), "message"))
  449. {
  450. if (reply->query()->element[2]->len > 0)
  451. {
  452. messageSize = (size_t)reply->query()->element[2]->len;
  453. message = reinterpret_cast<char*>(allocateAndCopy(reply->query()->element[2]->str, messageSize));
  454. }
  455. else
  456. {
  457. messageSize = 0;
  458. message = nullptr;
  459. }
  460. unsubscribe();
  461. return;
  462. }
  463. throwUnexpected();
  464. }
  465. int Connection::writeBufferToSocket()
  466. {
  467. int done = 0;
  468. if (context->flags & REDIS_BLOCK)
  469. {
  470. /* Write until done */
  471. while (!done)
  472. {
  473. if (redisBufferWrite(context, &done) == REDIS_ERR)
  474. return REDIS_ERR;
  475. }
  476. }
  477. return REDIS_OK;
  478. }
  479. unsigned Connection::getUnsubscribeReadAttempts()
  480. {
  481. if (!unsubscribeReadAttemptsChecked)//unsubscribeReadAttemptsChecked is std:atomic<bool>. Test to guard against unnecessary critical section
  482. {
  483. CriticalBlock block(critsec);
  484. if (!unsubscribeReadAttemptsChecked)
  485. {
  486. const char * tmp = getenv("HPCC_REDIS_PLUGIN_UNSUBSCRIBE_READ_ATTEMPTS");//ms
  487. if (tmp && *tmp)
  488. unsubscribeReadAttempts = atoi(tmp);
  489. unsubscribeReadAttemptsChecked = true;
  490. }
  491. }
  492. return unsubscribeReadAttempts;
  493. }
  494. void Connection::unsubscribe()
  495. {
  496. /* redisContext has both an output buffer (context->obuf) and an input buffer (context->reader).
  497. * redisCommand writes the command to the output buffer and then calls redisGetReply. This will first try to read unconsumed replies from the input buffer.
  498. * If there are none then it will flush the output buffer to the socket and wait for a (or multiples of) redisReply from the socket which writes this to the input buffer.
  499. * This is undesired behaviour here as it will not send the UNSUBSCRIBE command until all replies are consumed on the input buffer. Furthermore, there could be
  500. * spurious commands still in the output buffer preceding the UNSIBSCRIBE, though in the current plugin implementation there shouldn't be, unless in a failing state.
  501. * To solve these two (client) issues both buffers are cleared before unsubscribing. However, the same applies for the server side output buffer and more importantly the
  502. * client socket file descriptor as the redis server pushes published messages to clients. In order to reuse the connection for subsequent subscriptions this unsubscribe
  503. * must be confirmed and in order to do this all (now) spurious preceding replies must first be consumed. Because a single socket read will read more than a single
  504. * redis reply (if >1 exists waiting on the server), we can switch subsequent reads to just the input buffer. The point here is that we want to limit the number of
  505. * possible socket reads and thus possible waits. We then only make a maximum of HPCC_REDIS_PLUGIN_UNSUBSCRIBE_READ_ATTEMPTS socket reads.
  506. */
  507. if (!subscribed)
  508. return;
  509. //return early
  510. if (!context || (context->err != REDIS_OK) || channel.isEmpty())
  511. {
  512. freeContext();
  513. return;
  514. }
  515. /* hiredis v0.12.0 is broken (hiredis.h includes read.h but read.h is not installed), and <=v0.11.0 sds.h is neither installed nor included in hiredis.h.
  516. *
  517. * SubConnections are only used to sub and unsub. They only sub to a single channel at a time (currently) so only an un-flushed UNSUBSCRIBE may still be
  518. * still be present in the output buffer. If so it will either be for the same channel or another and thus is benign in nature as long as the channel is
  519. * is also confirmed in the reply. This implies that it should be ok to not clear the output buffer. Further more, connection caching is only used for
  520. * hiredis >= v0.13.0 which is where this is preferably needed.
  521. */
  522. #if HIREDIS_MAJOR >= 0 && 12 >= MIN_HIREDIS_MINOR && HIREDIS_PATCH >= 1
  523. //Empty output buffer as it may contain previous and now unwanted commands. Since the subscription connections only sub and unsub, this should already be empty.
  524. if (*context->obuf != '\0')//obuf is a redis sds string (chr*) containing a header with the actual pointer pointing directly to string buffer post header.
  525. {
  526. sdsfree(context->obuf);//free/clear current buffer
  527. context->obuf = sdsempty();//setup new one ready for writing
  528. }
  529. #endif
  530. //Empty input buffer as it may contain previous and now unwanted replies
  531. if (context->reader->len > 0)
  532. {
  533. redisReaderFree(context->reader);
  534. context->reader = redisReaderCreate();
  535. }
  536. //Write command to buffer, set timeout, and write to socket.
  537. bool cmdAppendOK = redisAppendCommand(context, "UNSUBSCRIBE %b", channel.str(), channel.length());
  538. if ((cmdAppendOK != REDIS_OK) || (redisSetUnsubscribeTimeout() != REDIS_OK) || (writeBufferToSocket() != REDIS_OK))
  539. {
  540. freeContext();
  541. return;
  542. }
  543. OwnedReply reply = new Reply();
  544. for (unsigned i = 0; i < getUnsubscribeReadAttempts(); i++)
  545. {
  546. if (redisSetUnsubscribeTimeout() != REDIS_OK)
  547. {
  548. freeContext();
  549. return;
  550. }
  551. redisReply * nakedReply = nullptr;
  552. redisGetReply(context, (void**)&nakedReply);
  553. reply->setClear(nakedReply);
  554. if (!reply->query())
  555. {
  556. freeContext();
  557. return;
  558. }
  559. if (isCorrectChannel(reply->query(), "unsubscribe"))
  560. {
  561. channel.clear();
  562. subscribed = false;
  563. return;
  564. }
  565. /* Whilst the input buffer was cleared the same may not be true for that server side.
  566. * We can read as many replies from the reader as we like but only 'unsubscribeReadAttempts'
  567. * from the socket.
  568. */
  569. bool replyErrorFound = (reply->query()->type == REDIS_REPLY_ERROR);
  570. for(;;)
  571. {
  572. redisReply * nakedReply = nullptr;
  573. if (redisReaderGetReply(context->reader, (void**)&nakedReply) != REDIS_OK)
  574. {
  575. freeContext();
  576. return;
  577. }
  578. if (!nakedReply)
  579. break;
  580. reply->setClear(nakedReply);
  581. if (nakedReply->type == REDIS_REPLY_ERROR)
  582. {
  583. replyErrorFound = true;
  584. continue;
  585. }
  586. if (isCorrectChannel(reply->query(), "unsubscribe"))
  587. {
  588. channel.clear();
  589. subscribed = false;
  590. return;
  591. }
  592. }
  593. /* If a reply error was encountered there is no way to know if it was
  594. * associated with the unsubscribe attempt of from a previous and unwanted
  595. * reply, at this point. Either it was, in which case we need to clear up or read
  596. * over from the socket again. The latter is not acceptable in case it *was* associated
  597. * and thus there are no more replies and therefore not wasting time for the read to time out.
  598. */
  599. if (replyErrorFound)
  600. {
  601. freeContext();
  602. return;
  603. }
  604. }
  605. freeContext();
  606. }
  607. void Connection::assertTimeout(int state)
  608. {
  609. switch(state)
  610. {
  611. case REDIS_OK :
  612. return;
  613. case REDIS_ERR :
  614. assertConnection("request to set timeout");
  615. break;
  616. case INTERNAL_TIMEOUT :
  617. rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
  618. }
  619. }
  620. void Connection::readReply(Reply * reply)
  621. {
  622. redisReply * nakedReply = nullptr;
  623. assertTimeout(redisSetTimeout());
  624. redisGetReply(context, (void**)&nakedReply);
  625. reply->setClear(nakedReply);
  626. }
  627. void Connection::readReplyAndAssert(Reply * reply, const char * msg)
  628. {
  629. readReply(reply);
  630. assertOnError(reply->query(), msg);
  631. }
  632. void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key)
  633. {
  634. readReply(reply);
  635. assertOnErrorWithCmdMsg(reply->query(), msg, key);
  636. }
  637. int Connection::getConnectionCachingLevel()
  638. {
  639. //Fetch connection caching level
  640. if (!connectionCachingLevelChecked)//connectionCachingLevelChecked is std:atomic<bool>. Test to guard against unnecessary critical section
  641. {
  642. CriticalBlock block(critsec);
  643. if (!connectionCachingLevelChecked)
  644. {
  645. const char * tmp = getenv("HPCC_REDIS_PLUGIN_CONNECTION_CACHING_LEVEL"); // 0 = NO_CONNECTION_CACHING, 1 = ALLOW_CONNECTION_CACHING, 2 = CACHE_ALL_CONNECTIONS
  646. //connectionCachingLevel is already defaulted to ALLOW_CONNECTION_CACHING
  647. if (tmp && *tmp)
  648. connectionCachingLevel = atoi(tmp); //don't bother range checking
  649. connectionCachingLevelChecked = true;
  650. }
  651. }
  652. return connectionCachingLevel;
  653. }
  654. bool Connection::getCacheSubConnections()
  655. {
  656. if (!cacheSubConnectionsOptChecked)//cacheSubConnectionsOptChecked is std:atomic<bool>. Test to guard against unnecessary critical section
  657. {
  658. CriticalBlock block(critsec);
  659. if (!cacheSubConnectionsOptChecked)
  660. {
  661. const char * tmp = getenv("HPCC_REDIS_PLUGIN_CACHE_SUB_CONNECTIONS");
  662. //cacheSubConnections is already defaulted to true;
  663. if (tmp && *tmp)
  664. {
  665. //less ops to check on & true however, making cacheSubConnections(false) if not met would do also for misspelled versions.
  666. //Since the default is ON, misspelling on or true means/does nothing.
  667. if (*tmp == '0' || (stricmp("off", tmp) == 0) || (stricmp("false", tmp) == 0))
  668. cacheSubConnections = false;
  669. }
  670. cacheSubConnectionsOptChecked= true;
  671. }
  672. }
  673. return cacheSubConnections;
  674. }
  675. int Connection::getUnsubscribeTimeout()
  676. {
  677. if (!unsubscribeTimeoutChecked)//unsubscribeTimeoutChecked is std:atomic<bool>. Test to guard against unnecessary critical section
  678. {
  679. CriticalBlock block(critsec);
  680. if (!unsubscribeTimeoutChecked)
  681. {
  682. const char * tmp = getenv("HPCC_REDIS_PLUGIN_UNSUBSCRIBE_TIMEOUT");//ms
  683. if (tmp && *tmp)
  684. unsubscribeTimeout = atoi(tmp);
  685. unsubscribeTimeoutChecked = true;
  686. }
  687. }
  688. return unsubscribeTimeout;
  689. }
  690. bool Connection::canCacheConnections(bool cachedConnectionRequested, bool isSubscription)
  691. {
  692. #if HIREDIS_VERSION_OK_FOR_CACHING
  693. switch (getConnectionCachingLevel())
  694. {
  695. case CACHE_ALL_CONNECTIONS :
  696. return true;
  697. case NO_CONNECTION_CACHING :
  698. return false;
  699. }
  700. if (isSubscription)
  701. return cachedConnectionRequested && getCacheSubConnections();
  702. return cachedConnectionRequested;
  703. #endif
  704. return false;
  705. }
  706. static void addThreadHook()
  707. {
  708. if (!threadHooked)
  709. {
  710. addThreadTermFunc(releaseAllCachedContexts);
  711. threadHooked = true;
  712. }
  713. }
  714. Connection * Connection::createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * _options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool cachedConnectionRequested, bool isSubscription)
  715. {
  716. if (canCacheConnections(cachedConnectionRequested, isSubscription))
  717. {
  718. if (!_cachedConnection)
  719. {
  720. _cachedConnection = new Connection(ctx, _options, _ip, _port, parseOptions, _database, password, _timeout, !isSubscription);
  721. addThreadHook();
  722. return LINK(_cachedConnection);
  723. }
  724. if (_cachedConnection->isSameConnection(ctx, _options, password))
  725. {
  726. //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
  727. _cachedConnection->reset(ctx, _database, password, _timeout, !isSubscription);//If the context had been previously freed, this will reconnect and selectDB in connect().
  728. _cachedConnection->selectDB(ctx, _database);//If the context is still present selectDB here.
  729. return LINK(_cachedConnection);
  730. }
  731. _cachedConnection->Release();
  732. _cachedConnection = nullptr;
  733. _cachedConnection = new Connection(ctx, _options, _ip, _port, parseOptions, _database, password, _timeout, !isSubscription);
  734. return LINK(_cachedConnection);
  735. }
  736. else
  737. return new Connection(ctx, _options, _ip, _port, parseOptions, _database, password, _timeout, !isSubscription);
  738. }
  739. void Connection::selectDB(ICodeContext * ctx, int _database)
  740. {
  741. if (database == _database || subscribed)
  742. return;
  743. database = _database;
  744. VStringBuffer cmd("SELECT %d", database);
  745. OwnedReply reply = Reply::createReply(redisCommand(cmd.str()));
  746. assertOnErrorWithCmdMsg(reply->query(), cmd.str());
  747. }
  748. void Connection::fail(const char * cmd, const char * errmsg, const char * key)
  749. {
  750. if (key)
  751. {
  752. VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %d for %s:%d failed : %s", cmd, key, database, ip.str(), port, errmsg);
  753. rtlFail(0, msg.str());
  754. }
  755. VStringBuffer msg("Redis Plugin: ERROR - %s on database %d for %s:%d failed : %s", cmd, database, ip.str(), port, errmsg);
  756. rtlFail(0, msg.str());
  757. }
  758. void Connection::assertOnError(const redisReply * reply, const char * _msg)
  759. {
  760. if (!reply)
  761. {
  762. assertConnection(_msg);
  763. throwUnexpected();
  764. }
  765. else if (reply->type == REDIS_REPLY_ERROR)
  766. {
  767. assertAuthorization(reply);
  768. VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
  769. rtlFail(0, msg.str());
  770. }
  771. }
  772. void Connection::assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key)
  773. {
  774. if (!reply)
  775. {
  776. assertConnectionWithCmdMsg(cmd, key);
  777. throwUnexpected();
  778. }
  779. else if (reply->type == REDIS_REPLY_ERROR)
  780. {
  781. assertAuthorization(reply);
  782. fail(cmd, reply->str, key);
  783. }
  784. }
  785. void Connection::assertAuthorization(const redisReply * reply)
  786. {
  787. if (reply && reply->str && ( strncmp(reply->str, "NOAUTH", 6) == 0 || strncmp(reply->str, "ERR operation not permitted", 27) == 0 ))
  788. {
  789. VStringBuffer msg("Redis Plugin: ERROR - authentication for %s:%d failed : %s", ip.str(), port, reply->str);
  790. rtlFail(0, msg.str());
  791. }
  792. }
  793. void Connection::assertKey(const redisReply * reply, const char * key)
  794. {
  795. if (reply && reply->type == REDIS_REPLY_NIL)
  796. {
  797. VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %d on %s:%d", key, database, ip.str(), port);
  798. rtlFail(0, msg.str());
  799. }
  800. }
  801. void Connection::assertConnectionWithCmdMsg(const char * cmd, const char * key)
  802. {
  803. if (!context)
  804. fail(cmd, "neither 'reply' nor connection error available", key);
  805. else if (context->err)
  806. fail(cmd, context->errstr, key);
  807. }
  808. void Connection::assertConnection(const char * _msg)
  809. {
  810. if (!context)
  811. {
  812. VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : neither 'reply' nor connection error available", _msg, ip.str(), port);
  813. rtlFail(0, msg.str());
  814. }
  815. else if (context->err)
  816. {
  817. VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : %s", _msg, ip.str(), port, context->errstr);
  818. rtlFail(0, msg.str());
  819. }
  820. }
  821. void Connection::clear(ICodeContext * ctx)
  822. {
  823. //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
  824. OwnedReply reply = Reply::createReply(redisCommand("FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
  825. //NOTE: documented as never failing, but in case
  826. assertOnErrorWithCmdMsg(reply->query(), "FlushDB");
  827. }
  828. void Connection::del(ICodeContext * ctx, const char * key)
  829. {
  830. OwnedReply reply = Reply::createReply(redisCommand("DEL %b", key, strlen(key)));
  831. assertOnErrorWithCmdMsg(reply->query(), "Del", key);
  832. }
  833. void Connection::persist(ICodeContext * ctx, const char * key)
  834. {
  835. OwnedReply reply = Reply::createReply(redisCommand("PERSIST %b", key, strlen(key)));
  836. assertOnErrorWithCmdMsg(reply->query(), "Persist", key);
  837. }
  838. void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
  839. {
  840. OwnedReply reply = Reply::createReply(redisCommand("PEXPIRE %b %u", key, strlen(key), _expire));
  841. assertOnErrorWithCmdMsg(reply->query(), "Expire", key);
  842. }
  843. bool Connection::exists(ICodeContext * ctx, const char * key)
  844. {
  845. OwnedReply reply = Reply::createReply(redisCommand("EXISTS %b", key, strlen(key)));
  846. assertOnErrorWithCmdMsg(reply->query(), "Exists", key);
  847. return (reply->query()->integer != 0);
  848. }
  849. unsigned __int64 Connection::dbSize(ICodeContext * ctx)
  850. {
  851. OwnedReply reply = Reply::createReply(redisCommand("DBSIZE"));
  852. assertOnErrorWithCmdMsg(reply->query(), "DBSIZE");
  853. return reply->query()->integer;
  854. }
  855. signed __int64 Connection::incrBy(ICodeContext * ctx, const char * key, signed __int64 value)
  856. {
  857. OwnedReply reply = Reply::createReply(redisCommand("INCRBY %b %" I64F "d", key, strlen(key), value));
  858. return returnInt(key, "INCRBY", reply->query());
  859. }
  860. //-------------------------------------------SET-----------------------------------------
  861. void Connection::setIntKey(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned)
  862. {
  863. StringBuffer cmd("SET %b %" I64F);
  864. if (_unsigned)
  865. cmd.append("u");
  866. else
  867. cmd.append("d");
  868. appendExpire(cmd, expire);
  869. OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), value));
  870. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  871. }
  872. void Connection::setRealKey(ICodeContext * ctx, const char * key, double value, unsigned expire)
  873. {
  874. StringBuffer cmd("SET %b %.16g");
  875. appendExpire(cmd, expire);
  876. OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), value));
  877. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  878. }
  879. //--OUTER--
  880. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  881. {
  882. ConnectionContainer master;
  883. try
  884. {
  885. master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  886. master->setKey(ctx, key, value, expire);
  887. }
  888. catch (IException * error)
  889. {
  890. master.handleException(error);
  891. }
  892. }
  893. //Set pointer types
  894. template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  895. {
  896. ConnectionContainer master;
  897. try
  898. {
  899. master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  900. master->setKey(ctx, key, valueSize, value, expire);
  901. }
  902. catch (IException * error)
  903. {
  904. master.handleException(error);
  905. }
  906. }
  907. //--INNER--
  908. template<class type> void Connection::setKey(ICodeContext * ctx, const char * key, type value, unsigned expire)
  909. {
  910. const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
  911. StringBuffer cmd("SET %b %b");
  912. appendExpire(cmd, expire);
  913. OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), _value, sizeof(type)));
  914. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  915. }
  916. template<class type> void Connection::setKey(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
  917. {
  918. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  919. StringBuffer cmd("SET %b %b");
  920. appendExpire(cmd, expire);
  921. OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), _value, (size_t)valueSize));
  922. assertOnErrorWithCmdMsg(reply->query(), "SET", key);
  923. }
  924. //-------------------------------------------GET-----------------------------------------
  925. signed __int64 Connection::returnInt(const char * key, const char * cmd, const redisReply * reply)
  926. {
  927. assertOnErrorWithCmdMsg(reply, cmd, key);
  928. assertKey(reply, key);
  929. if (reply->type == REDIS_REPLY_INTEGER)
  930. return reply->integer;
  931. fail(cmd, "expected RESP integer from redis", key);
  932. throwUnexpected(); //stop compiler complaining
  933. }
  934. //--OUTER--
  935. template<class type> void SyncRGetNumeric(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  936. {
  937. ConnectionContainer master;
  938. try
  939. {
  940. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  941. master->getNumericKey(ctx, key, returnValue);
  942. }
  943. catch (IException * error)
  944. {
  945. master.handleException(error);
  946. }
  947. }
  948. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  949. {
  950. ConnectionContainer master;
  951. try
  952. {
  953. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  954. master->getKey(ctx, key, returnValue);
  955. }
  956. catch (IException * error)
  957. {
  958. master.handleException(error);
  959. }
  960. }
  961. template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  962. {
  963. ConnectionContainer master;
  964. try
  965. {
  966. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  967. master->getKey(ctx, key, returnSize, returnValue);
  968. }
  969. catch (IException * error)
  970. {
  971. master.handleException(error);
  972. }
  973. }
  974. void Connection::fromStr(const char * str, const char * key, double & ret)
  975. {
  976. char * end = nullptr;
  977. ret = strtod(str, &end);
  978. if (errno == ERANGE)
  979. fail("GetReal", "value returned out of range", key);
  980. }
  981. void Connection::fromStr(const char * str, const char * key, signed __int64 & ret)
  982. {
  983. char* end = nullptr;
  984. ret = strtoll(str, &end, 10);
  985. if (errno == ERANGE)
  986. fail("GetInteger", "value returned out of range", key);
  987. }
  988. void Connection::fromStr(const char * str, const char * key, unsigned __int64 & ret)
  989. {
  990. char* end = nullptr;
  991. ret = strtoull(str, &end, 10);
  992. if (errno == ERANGE)
  993. fail("GetUnsigned", "value returned out of range", key);
  994. }
  995. //--INNER--
  996. template<class type> void Connection::getNumericKey(ICodeContext * ctx, const char * key, type & returnValue)
  997. {
  998. OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
  999. assertOnErrorWithCmdMsg(reply->query(), "GET", key);
  1000. assertKey(reply->query(), key);
  1001. fromStr(reply->query()->str, key, returnValue);
  1002. }
  1003. template<class type> void Connection::getKey(ICodeContext * ctx, const char * key, type & returnValue)
  1004. {
  1005. OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
  1006. assertOnErrorWithCmdMsg(reply->query(), "GET", key);
  1007. assertKey(reply->query(), key);
  1008. size_t returnSize = reply->query()->len;
  1009. if (sizeof(type)!=returnSize)
  1010. {
  1011. VStringBuffer msg("requested type of different size (%uB) from that stored (%uB)", (unsigned)sizeof(type), (unsigned)returnSize);
  1012. fail("GET", msg.str(), key);
  1013. }
  1014. memcpy(&returnValue, reply->query()->str, returnSize);
  1015. }
  1016. template<class type> void Connection::getKey(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
  1017. {
  1018. OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
  1019. assertOnErrorWithCmdMsg(reply->query(), "GET", key);
  1020. assertKey(reply->query(), key);
  1021. returnSize = reply->query()->len;
  1022. returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
  1023. }
  1024. unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey)
  1025. {
  1026. StringBuffer channel;
  1027. encodeChannel(channel, keyOrChannel, _database, lockedKey);
  1028. OwnedReply reply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
  1029. assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
  1030. if (reply->query()->type == REDIS_REPLY_INTEGER)
  1031. {
  1032. if (reply->query()->integer >= 0)
  1033. return (unsigned __int64)reply->query()->integer;
  1034. else
  1035. throwUnexpected();
  1036. }
  1037. throwUnexpected();
  1038. }
  1039. //--------------------------------------------------------------------------------
  1040. // ECL SERVICE ENTRYPOINTS
  1041. //--------------------------------------------------------------------------------
  1042. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey, bool cachedConnectionRequested)
  1043. {
  1044. ConnectionContainer master;
  1045. try
  1046. {
  1047. master.setown(Connection::createConnection(ctx, cachedPubConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested));
  1048. return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
  1049. }
  1050. catch (IException * error)
  1051. {
  1052. master.handleException(error);
  1053. }
  1054. }
  1055. ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey, bool cachedConnectionRequested)
  1056. {
  1057. size_t _messageSize = 0;
  1058. ConnectionContainer master;
  1059. try
  1060. {
  1061. master.setown(Connection::createConnection(ctx, cachedSubscriptionConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested, true));
  1062. master->subAndWaitForSinglePub(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
  1063. messageSize = static_cast<size32_t>(_messageSize);
  1064. }
  1065. catch (IException * error)
  1066. {
  1067. master.handleException(error);
  1068. }
  1069. }
  1070. ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1071. {
  1072. ConnectionContainer master;
  1073. try
  1074. {
  1075. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1076. master->clear(ctx);
  1077. }
  1078. catch (IException * error)
  1079. {
  1080. master.handleException(error);
  1081. }
  1082. }
  1083. ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1084. {
  1085. ConnectionContainer master;
  1086. try
  1087. {
  1088. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1089. return master->exists(ctx, key);
  1090. }
  1091. catch (IException * error)
  1092. {
  1093. master.handleException(error);
  1094. }
  1095. }
  1096. ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1097. {
  1098. ConnectionContainer master;
  1099. try
  1100. {
  1101. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1102. master->del(ctx, key);
  1103. }
  1104. catch (IException * error)
  1105. {
  1106. master.handleException(error);
  1107. }
  1108. }
  1109. ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1110. {
  1111. ConnectionContainer master;
  1112. try
  1113. {
  1114. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1115. master->persist(ctx, key);
  1116. }
  1117. catch (IException * error)
  1118. {
  1119. master.handleException(error);
  1120. }
  1121. }
  1122. ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1123. {
  1124. ConnectionContainer master;
  1125. try
  1126. {
  1127. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1128. master->expire(ctx, key, _expire);
  1129. }
  1130. catch (IException * error)
  1131. {
  1132. master.handleException(error);
  1133. }
  1134. }
  1135. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1136. {
  1137. ConnectionContainer master;
  1138. try
  1139. {
  1140. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1141. return master->dbSize(ctx);
  1142. }
  1143. catch (IException * error)
  1144. {
  1145. master.handleException(error);
  1146. }
  1147. }
  1148. ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRINCRBY(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1149. {
  1150. ConnectionContainer master;
  1151. try
  1152. {
  1153. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1154. return master->incrBy(ctx, key, value);
  1155. }
  1156. catch (IException * error)
  1157. {
  1158. master.handleException(error);
  1159. }
  1160. }
  1161. //-----------------------------------SET------------------------------------------
  1162. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1163. {
  1164. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout, cachedConnectionRequested);
  1165. }
  1166. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1167. {
  1168. SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout, cachedConnectionRequested);
  1169. }
  1170. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1171. {
  1172. ConnectionContainer master;
  1173. try
  1174. {
  1175. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1176. master->setIntKey(ctx, key, value, expire, false);
  1177. }
  1178. catch (IException * error)
  1179. {
  1180. master.handleException(error);
  1181. }
  1182. }
  1183. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1184. {
  1185. ConnectionContainer master;
  1186. try
  1187. {
  1188. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1189. master->setIntKey(ctx, key, value, expire, true);
  1190. }
  1191. catch (IException * error)
  1192. {
  1193. master.handleException(error);
  1194. }
  1195. }
  1196. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1197. {
  1198. ConnectionContainer master;
  1199. try
  1200. {
  1201. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1202. master->setRealKey(ctx, key, value, expire);
  1203. }
  1204. catch (IException * error)
  1205. {
  1206. master.handleException(error);
  1207. }
  1208. }
  1209. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1210. {
  1211. SyncRSet(ctx, options, key, value, database, expire, password, timeout, cachedConnectionRequested);
  1212. }
  1213. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1214. {
  1215. SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout, cachedConnectionRequested);
  1216. }
  1217. ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1218. {
  1219. SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout, cachedConnectionRequested);
  1220. }
  1221. //-------------------------------------GET----------------------------------------
  1222. ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1223. {
  1224. bool value;
  1225. SyncRGet(ctx, options, key, value, database, password, timeout, cachedConnectionRequested);
  1226. return value;
  1227. }
  1228. ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1229. {
  1230. double value;
  1231. SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cachedConnectionRequested);
  1232. return value;
  1233. }
  1234. ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1235. {
  1236. signed __int64 value;
  1237. SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cachedConnectionRequested);
  1238. return value;
  1239. }
  1240. ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1241. {
  1242. unsigned __int64 value;
  1243. SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cachedConnectionRequested);
  1244. return value;
  1245. }
  1246. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1247. {
  1248. size_t _returnSize;
  1249. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout, cachedConnectionRequested);
  1250. returnSize = static_cast<size32_t>(_returnSize);
  1251. }
  1252. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1253. {
  1254. size_t returnSize;
  1255. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout, cachedConnectionRequested);
  1256. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  1257. }
  1258. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1259. {
  1260. size_t returnSize;
  1261. SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout, cachedConnectionRequested);
  1262. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  1263. }
  1264. ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1265. {
  1266. size_t _returnSize;
  1267. SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout, cachedConnectionRequested);
  1268. returnSize = static_cast<size32_t>(_returnSize);
  1269. }
  1270. //----------------------------------LOCK------------------------------------------
  1271. //-----------------------------------SET-----------------------------------------
  1272. //Set pointer types
  1273. void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  1274. {
  1275. ConnectionContainer master;
  1276. try
  1277. {
  1278. master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  1279. master->lockSet(ctx, key, valueSize, value, expire);
  1280. }
  1281. catch (IException * error)
  1282. {
  1283. master.handleException(error);
  1284. }
  1285. }
  1286. //--INNER--
  1287. void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire)
  1288. {
  1289. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  1290. handleLockOnSet(ctx, key, _value, (size_t)valueSize, expire);
  1291. }
  1292. //-------------------------------------------GET-----------------------------------------
  1293. //--OUTER--
  1294. void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
  1295. {
  1296. ConnectionContainer master;
  1297. try
  1298. {
  1299. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
  1300. master->lockGet(ctx, key, returnSize, returnValue, password, expire);
  1301. }
  1302. catch (IException * error)
  1303. {
  1304. master.handleException(error);
  1305. }
  1306. }
  1307. //--INNER--
  1308. void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password, unsigned expire)
  1309. {
  1310. MemoryAttr retVal;
  1311. handleLockOnGet(ctx, key, &retVal, password, expire);
  1312. returnSize = retVal.length();
  1313. returnValue = reinterpret_cast<char*>(retVal.detach());
  1314. }
  1315. //---------------------------------------------------------------------------------------
  1316. bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
  1317. {
  1318. if (expire == 0)
  1319. fail("GetOrLock<type>", "invalid value for 'expire', persistent locks not allowed.", key);
  1320. StringBuffer cmd("SET %b %b NX PX ");
  1321. cmd.append(expire);
  1322. OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), channel, strlen(channel)));
  1323. assertOnErrorWithCmdMsg(reply->query(), cmd.str(), key);
  1324. return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
  1325. }
  1326. void Connection::unlock(ICodeContext * ctx, const char * key)
  1327. {
  1328. //WATCH key, if altered between WATCH and EXEC abort all commands inbetween
  1329. redisAppendCommand(context, "WATCH %b", key, strlen(key));
  1330. redisAppendCommand(context, "GET %b", key, strlen(key));
  1331. //Read replies
  1332. OwnedReply reply = new Reply();
  1333. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//WATCH reply
  1334. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//GET reply
  1335. //check if locked
  1336. if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
  1337. {
  1338. //MULTI - all commands between MULTI and EXEC are considered an atomic transaction on the server
  1339. redisAppendCommand(context, "MULTI");//MULTI
  1340. redisAppendCommand(context, "DEL %b", key, strlen(key));//DEL
  1341. redisAppendCommand(context, "EXEC");//EXEC
  1342. #if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
  1343. sleep(10);
  1344. #endif
  1345. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//MULTI reply
  1346. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//DEL reply
  1347. readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//EXEC reply
  1348. }
  1349. else
  1350. {
  1351. reply->setClear(redisCommand("UNWATCH"));
  1352. //Close connection upon failing to UNWATCH.
  1353. if (!reply->query() || (reply->query()->type != REDIS_REPLY_STATUS) || (strcmp(reply->query()->str, "OK") != 0))
  1354. freeContext();
  1355. }
  1356. }
  1357. void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire)
  1358. {
  1359. //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
  1360. StringBuffer channel;
  1361. encodeChannel(channel, key, database, true);
  1362. //Query key and set lock if non existent
  1363. if (lock(ctx, key, channel.str(), expire))
  1364. return;
  1365. #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
  1366. {
  1367. OwnedReply pubReply = Reply::createReply(redisCommand("DEL %b", key, strlen(key)));
  1368. assertOnError(pubReply->query(), "del fail");
  1369. }
  1370. #endif
  1371. //SUB before GET
  1372. //Requires separate connection from GET so that the replies are not mangled. This could be averted but is not worth it.
  1373. int _timeLeft = (int) timeLeft();//createConnection requires a timeout value, so create it here.
  1374. if (_timeLeft == 0 && timeout.getTimeout() != 0)//Disambiguate between zero time left and timeout = 0 => infinity.
  1375. rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
  1376. ConnectionContainer subscriptionConnection;
  1377. try
  1378. {
  1379. subscriptionConnection.setown(createConnection(ctx, cachedSubscriptionConnection, options.str(), ip.str(), port, false, 0, password, _timeLeft, canCacheConnections(isCachedConnection(), true), true));
  1380. subscriptionConnection->subscribe(ctx, channel.str());
  1381. #if(0)//Test publish before GET.
  1382. {
  1383. OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
  1384. assertOnError(pubReply->query(), "pub fail");
  1385. }
  1386. #endif
  1387. //Now GET
  1388. OwnedReply getReply = Reply::createReply((redisReply*)redisCommand("GET %b", key, strlen(key)));
  1389. assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
  1390. #if(0)//Test publish after GET.
  1391. {
  1392. OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
  1393. assertOnError(pubReply->query(), "pub fail");
  1394. }
  1395. #endif
  1396. //Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
  1397. //failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
  1398. if (getReply->query()->type != REDIS_REPLY_NIL && getReply->query()->str && strncmp(getReply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
  1399. {
  1400. retVal->set(getReply->query()->len, getReply->query()->str);
  1401. return;
  1402. }
  1403. else
  1404. {
  1405. //Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
  1406. if (getReply->query()->str && strcmp(getReply->query()->str, channel.str()) !=0 )
  1407. {
  1408. VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", getReply->query()->str, channel.str());
  1409. fail("GetOrLock<type>", msg.str(), key);
  1410. //MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
  1411. //However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
  1412. //More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
  1413. //this plugin, rather than the string retrieved as the lock value (the value of the locked key).
  1414. }
  1415. getReply.clear();
  1416. #if(0)//Added to allow for manual pub testing via redis-cli
  1417. struct timeval to = { 10, 0 };//10secs
  1418. ::redisSetTimeout(subscriptionConnection->context, to);
  1419. #endif
  1420. OwnedReply subReply = new Reply();
  1421. subscriptionConnection->readReply(subReply);
  1422. subscriptionConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
  1423. if (subscriptionConnection->isCorrectChannel(subReply->query(), "message"))
  1424. {
  1425. //We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
  1426. //Since the value is sent via a published message, there is no direct reply struct so assume that an empty
  1427. //string is equivalent to a non-existent key.
  1428. //More importantly, it is paramount that this routine only return an empty string under one condition, that
  1429. //which indicates to the caller that the key was successfully locked.
  1430. //NOTE: it is possible for an empty message to have been PUBLISHed.
  1431. if (subReply->query()->element[2]->len > 0)
  1432. {
  1433. retVal->set(subReply->query()->element[2]->len, subReply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
  1434. return;
  1435. }
  1436. //fail that key does not exist
  1437. redisReply fakeReply;
  1438. fakeReply.type = REDIS_REPLY_NIL;
  1439. assertKey(&fakeReply, key);
  1440. }
  1441. }
  1442. throwUnexpected();
  1443. }
  1444. catch (IException * error)
  1445. {
  1446. subscriptionConnection.handleException(error);
  1447. }
  1448. }
  1449. void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
  1450. {
  1451. //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
  1452. StringBuffer channel;
  1453. encodeChannel(channel, key, database, true);
  1454. if (size > 29)//c.f. 1st note below.
  1455. {
  1456. OwnedReply replyContainer = new Reply();
  1457. if (expire == 0)
  1458. {
  1459. const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
  1460. replyContainer->setClear(redisCommand("EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
  1461. if (noScript(replyContainer->query()))
  1462. {
  1463. const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
  1464. replyContainer->setClear(redisCommand("EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
  1465. }
  1466. }
  1467. else
  1468. {
  1469. const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
  1470. replyContainer->setClear(redisCommand("EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
  1471. if (noScript(replyContainer->query()))
  1472. {
  1473. const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
  1474. replyContainer->setClear(redisCommand("EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
  1475. }
  1476. }
  1477. assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
  1478. }
  1479. else
  1480. {
  1481. StringBuffer cmd("SET %b %b");
  1482. RedisPlugin::appendExpire(cmd, expire);
  1483. redisAppendCommand(context, "MULTI");
  1484. redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
  1485. redisAppendCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), value, size);//PUB
  1486. redisAppendCommand(context, "EXEC");
  1487. //Now read and assert replies
  1488. OwnedReply reply = new Reply();
  1489. readReplyAndAssertWithCmdMsg(reply, "SET", key);//MULTI reply
  1490. readReplyAndAssertWithCmdMsg(reply, "SET", key);//SET reply
  1491. readReplyAndAssertWithCmdMsg(reply, "PUB for the key", key);//PUB reply
  1492. readReplyAndAssertWithCmdMsg(reply, "SET", key);//EXEC reply
  1493. }
  1494. //NOTE: When setting and publishing the data with a pipelined MULTI-SET-PUB-EXEC, the data is sent twice, once with the SET and again with the PUBLISH.
  1495. //To prevent this, send the data to the server only once with a server-side lua script that then sets and publishes the data from the server.
  1496. //However, there is a transmission overhead for this method that may still be larger than sending the data twice if it is small enough.
  1497. //multi-set-pub-exec (via strings) has a transmission length of - "MULTI SET" + key + value + "PUBLISH" + channel + value = 5 + 3 + key + 7 + value + channel + value + 4
  1498. //The lua script (assuming the script already exists on the server) a length of - "EVALSHA" + digest + "1" + key + channel + value = 7 + 40 + 1 + key + channel + value
  1499. //Therefore, they have same length when: 19 + value = 48 => value = 29.
  1500. //NOTE: Pipelining the above commands may not be the expected behaviour, instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
  1501. //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. This may not be expected behaviour
  1502. //as it is now possible for the key-value to NOT actually exist in the cache though it was retrieved via a redis plugin get function. This is documented in the README.
  1503. //Further more, it is possible that the locked value and thus the channel stored within the key is not that expected, i.e. computed via encodeChannel() (e.g.
  1504. //if set by a non-conforming external client/process). It is however, possible to account for this via using a GETSET instead of just the SET. This returns the old
  1505. //value stored, this can then be checked if it is a lock (i.e. has at least the "redis_key_lock prefix"), if it doesn't, PUB on the channel from encodeChannel(),
  1506. //otherwise PUB on the value retrieved from GETSET or possibly only if it at least has the prefix "redis_key_lock".
  1507. //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing. It would also mean sending the data twice.
  1508. }
  1509. bool Connection::noScript(const redisReply * reply) const
  1510. {
  1511. return (reply && reply->type == REDIS_REPLY_ERROR && strncmp(reply->str, "NOSCRIPT", 8) == 0);
  1512. }
  1513. //--------------------------------------------------------------------------------
  1514. // ECL SERVICE ENTRYPOINTS
  1515. //--------------------------------------------------------------------------------
  1516. //-----------------------------------SET------------------------------------------
  1517. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1518. {
  1519. SyncLockRSet(ctx, options, key, valueLength, value, database, expire, password, timeout, cachedConnectionRequested);
  1520. returnLength = valueLength;
  1521. returnValue = (char*)allocateAndCopy(value, valueLength);
  1522. }
  1523. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1524. {
  1525. unsigned valueSize = (valueLength)*sizeof(UChar);
  1526. SyncLockRSet(ctx, options, key, valueSize, (char*)value, database, expire, password, timeout, cachedConnectionRequested);
  1527. returnLength= valueLength;
  1528. returnValue = (UChar*)allocateAndCopy(value, valueSize);
  1529. }
  1530. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1531. {
  1532. unsigned valueSize = rtlUtf8Size(valueLength, value);
  1533. SyncLockRSet(ctx, options, key, valueSize, value, database, expire, password, timeout, cachedConnectionRequested);
  1534. returnLength = valueLength;
  1535. returnValue = (char*)allocateAndCopy(value, valueSize);
  1536. }
  1537. //-------------------------------------GET----------------------------------------
  1538. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire, bool cachedConnectionRequested)
  1539. {
  1540. size_t _returnSize;
  1541. SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout, cachedConnectionRequested);
  1542. returnSize = static_cast<size32_t>(_returnSize);
  1543. }
  1544. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire, bool cachedConnectionRequested)
  1545. {
  1546. size_t returnSize;
  1547. char * _returnValue;
  1548. SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, expire, password, timeout, cachedConnectionRequested);
  1549. returnValue = (UChar*)_returnValue;
  1550. returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
  1551. }
  1552. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire, bool cachedConnectionRequested)
  1553. {
  1554. size_t returnSize;
  1555. SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout, cachedConnectionRequested);
  1556. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  1557. }
  1558. ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
  1559. {
  1560. ConnectionContainer master;
  1561. try
  1562. {
  1563. master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
  1564. master->unlock(ctx, key);
  1565. }
  1566. catch (IException * error)
  1567. {
  1568. master.handleException(error);
  1569. }
  1570. }
  1571. }//close namespace