Browse Source

HPCC-14286 Redis Plugin - add publish and subscribe functions

Signed-off-by: James Noss <james.noss@lexisnexis.com>
James Noss 9 years ago
parent
commit
6766341a52
4 changed files with 213 additions and 81 deletions
  1. 36 24
      plugins/redis/README.md
  2. 9 0
      plugins/redis/lib_redis.ecllib
  3. 165 57
      plugins/redis/redis.cpp
  4. 3 0
      plugins/redis/redis.hpp

+ 36 - 24
plugins/redis/README.md

@@ -53,36 +53,42 @@ Here is a list of the core plugin **functions**.
 
 ###Set
 ```
-SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
 ```
 
 ###Get
 ```
-INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
 ```
 
 ###Utility
 ```
-BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Delete(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+FlushDB(CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Delete(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Persist(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+INTEGER DBSize(CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+```
+
+###PUB-SUB
+```
+UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
+STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
 ```
 
 The core points to note here are:
@@ -91,8 +97,12 @@ The core points to note here are:
    STRING of length 8, set with SetString, being successfully retrieved from the cache via GetInteger without an **ECL** exception being thrown.
    * `CONST VARSTRING options` passes the server **IP** and **port** to the plugin in the *strict* format - `--SERVER=<ip>:<port>`. If `options` is empty, the default
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
-   * `UNSIGNED timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
+   * `UNSIGNED4 timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
    * `UNSIGNED expire` has units *ms* and a default of **0**, i.e. *forever*.
+   * Both `Publish` and `Subscribe` have a flag `BOOLEAN lockedKey = FALSE` such, that when **TRUE**, will encode `CONST VARSTRING keyOrChannel` as if it were a key
+   allowing key-channel encoding compatibility with the `GetOrLockString` and `SetAndPublishString` functions. For this reason, they both also take a **Database** value as
+   this is used in the encoding of the lock and channel. Please note however that the redis pub-sub paradigm is actually irrespective of database.
+   * *c.f.* redis documentation for the following - [Exists](http://redis.io/commands/exists), [FlushDB](http://redis.io/commands/flushdb), [Delete](http://redis.io/commands/del), [Persist](http://redis.io/commands/persist), [Expire](http://redis.io/commands/expire), [DBSize](http://redis.io/commands/dbsize), [Publish](http://redis.io/commands/publish), & [Subscribe](http://redis.io/commands/subscribe).
 
 ###The redisServer MODULE
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
@@ -138,7 +148,7 @@ myRedis := redisServer('--SERVER=127.0.0.1:6379');
 
 STRING poppins := 'supercalifragilisticexpialidocious'; //Value to externally compute/retrieve from 3rd party vendor.
 
-myFunc(STRING key, UNSIGNED database) := FUNCTION  //Function for computing/retrieving a value.
+myFunc(STRING key, INTEGER database) := FUNCTION  //Function for computing/retrieving a value.
   return myRedis.GetString(key, database);
 END;
 
@@ -200,3 +210,5 @@ A few notes to point out here:
 | SetAndPublish (value length > 29) | 1       | 5       | new connection   |
 | SetAndPublish (value length < 29) | 4       | 8       | new connection   |
 | Unlock              | 5       | 9       | new connection   |
+| Publish             | 1       | 4       | new connection   |
+| Subscribe           | 2       | 5       | new connection   |

+ 9 - 0
plugins/redis/lib_redis.ecllib

@@ -43,6 +43,9 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,action,context,entrypoint='RExpire';
   UNSIGNED DBSize(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='RDBSize';
 
+  UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRPub';
+  STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRSub';
+
   STRING   SetAndPublishString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetStr';
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
@@ -81,6 +84,9 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED4 timeout
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0) := redis.DBSize(options, database, password, timeout);
 
+  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
+  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
+
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
@@ -119,6 +125,9 @@ EXPORT RedisServerWithoutTimeout(VARSTRING options, VARSTRING password = '') :=
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire, UNSIGNED4 timeout = 1000)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.DBSize(options, database, password, timeout);
 
+  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
+  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
+
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);

+ 165 - 57
plugins/redis/redis.cpp

@@ -40,10 +40,13 @@ ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
 namespace RedisPlugin {
 
 class Connection;
+class SubConnection;
 static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
-static __thread Connection * cachedConnection;
-static __thread ThreadTermFunc threadHookChain;
-static __thread bool threadHooked;
+static __thread Connection * cachedConnection = NULL;
+static __thread Connection * cachedPubConnection = NULL;//database should always = 0
+static __thread Connection * cachedSubConnection = NULL;//intended to always point to SubConnection
+static __thread ThreadTermFunc threadHookChain = NULL;
+static __thread bool threadHooked = false;
 
 static void * allocateAndCopy(const void * src, size_t size)
 {
@@ -69,6 +72,7 @@ public :
 
     static Reply * createReply(void * _reply) { return new Reply(_reply); }
     inline const redisReply * query() const { return reply; }
+    void setClear(void * _reply) { setClear((redisReply*)_reply); }
     void setClear(redisReply * _reply)
     {
         if (reply)
@@ -103,14 +107,14 @@ private :
 class Connection : public CInterface
 {
 public :
-    Connection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
+    Connection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout);
     Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout);
     ~Connection()
     {
         if (context)
             redisFree(context);
     }
-    static Connection * createConnection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
+    static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, int database, const char * password, unsigned _timeout, bool sub = false);
 
     //set
     template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
@@ -125,6 +129,11 @@ public :
     void unlock(ICodeContext * ctx, const char * key);
     //--------------------------------------------------------------------------------------
 
+    //-------------------------------PUB/SUB------------------------------------------------
+    unsigned __int64 publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey);
+    void subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey);
+    //--------------------------------------------------------------------------------------
+
     void persist(ICodeContext * ctx, const char * key);
     void expire(ICodeContext * ctx, const char * key, unsigned _expire);
     void del(ICodeContext * ctx, const char * key);
@@ -133,13 +142,14 @@ public :
     bool exists(ICodeContext * ctx, const char * key);
 
 protected :
+    virtual void selectDB(ICodeContext * ctx, int _database);
+    virtual void reset();
+
     void redisSetTimeout();
     void redisConnect();
     unsigned timeLeft();
     void parseOptions(ICodeContext * ctx, const char * _options);
     void connect(ICodeContext * ctx, int _database, const char * password);
-    void selectDB(ICodeContext * ctx, int _database);
-    void resetContextErr();
     void readReply(Reply * reply);
     void readReplyAndAssert(Reply * reply, const char * msg);
     void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = NULL);
@@ -157,7 +167,7 @@ protected :
     //-------------------------------LOCKING------------------------------------------------
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
     void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
-    void encodeChannel(StringBuffer & channel, const char * key) const;
+    void encodeChannel(StringBuffer & channel, const char * key, int _database) const;
     bool noScript(const redisReply * reply) const;
     bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
     //--------------------------------------------------------------------------------------
@@ -171,22 +181,16 @@ protected :
     TimeoutHandler timeout;
     int database; //NOTE: redis stores the maximum number of dbs as an 'int'.
 };
-
-//The following class is here to ensure destruction of the cachedConnection within the main thread
-//as this is not handled by the thread hook mechanism.
-static class MainThreadCachedConnection
+class SubConnection : public Connection
 {
 public :
-    MainThreadCachedConnection() { }
-    ~MainThreadCachedConnection()
-    {
-        if (cachedConnection)
-        {
-            cachedConnection->Release();
-            cachedConnection = NULL;
-        }
-    }
-} mainThread;
+    SubConnection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout) :
+        Connection(ctx, _options, 0, password, _timeout) { };
+
+protected :
+    virtual void selectDB(ICodeContext * ctx, int _database) { };
+    virtual void reset();
+};
 
 static void releaseContext()
 {
@@ -195,6 +199,16 @@ static void releaseContext()
         cachedConnection->Release();
         cachedConnection = NULL;
     }
+    if (cachedPubConnection)
+    {
+        cachedPubConnection->Release();
+        cachedPubConnection = NULL;
+    }
+    if (cachedSubConnection)
+    {
+        cachedSubConnection->Release();
+        cachedSubConnection = NULL;
+    }
     if (threadHookChain)
     {
         (*threadHookChain)();
@@ -202,6 +216,14 @@ static void releaseContext()
     }
     threadHooked = false;
 }
+//The following class is here to ensure destruction of the cachedConnection within the main thread
+//as this is not handled by the thread hook mechanism.
+static class MainThreadCachedConnection
+{
+public :
+    MainThreadCachedConnection() { }
+    ~MainThreadCachedConnection() { releaseContext(); }
+} mainThread;
 Connection::Connection(ICodeContext * ctx, const char * _options, int _database, const char * password, unsigned _timeout)
   : database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
 {
@@ -330,11 +352,17 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
         }
     }
 }
-void Connection::resetContextErr()
+void Connection::reset()
 {
     if (context)
         context->err = REDIS_OK;
 }
+void SubConnection::reset()
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "UNSUBSCRIBE"));
+    assertOnErrorWithCmdMsg(reply->query(), "UNSUBSCRIBE all channels");
+    Connection::reset();
+}
 void Connection::readReply(Reply * reply)
 {
     redisReply * nakedReply = NULL;
@@ -352,32 +380,39 @@ void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, c
     readReply(reply);
     assertOnErrorWithCmdMsg(reply->query(), msg, key);
 }
-Connection * Connection::createConnection(ICodeContext * ctx, const char * options, int _database, const char * password, unsigned _timeout)
+Connection * Connection::createConnection(ICodeContext * ctx,  Connection * & _cachedConnection, const char * options, int _database, const char * password, unsigned _timeout, bool sub)
 {
-    if (!cachedConnection)
+    if (!_cachedConnection)
     {
-        cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+        if (sub)
+            _cachedConnection = new SubConnection(ctx, options, _database, password, _timeout);
+        else
+            _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+
         if (!threadHooked)
         {
             threadHookChain = addThreadTermFunc(releaseContext);
             threadHooked = true;
         }
-        return LINK(cachedConnection);
+        return LINK(_cachedConnection);
     }
 
-    if (cachedConnection->isSameConnection(ctx, options, password))
+    if (_cachedConnection->isSameConnection(ctx, options, password))
     {
         //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
-        cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
-        cachedConnection->timeout.reset(_timeout);
-        cachedConnection->selectDB(ctx, _database);
-        return LINK(cachedConnection);
+        _cachedConnection->reset();//reset the context err to allow reuse when an error previously occurred.
+        _cachedConnection->timeout.reset(_timeout);
+        _cachedConnection->selectDB(ctx, _database);
+        return LINK(_cachedConnection);
     }
 
-    cachedConnection->Release();
-    cachedConnection = NULL;
-    cachedConnection = new Connection(ctx, options, _database, password, _timeout);
-    return LINK(cachedConnection);
+    _cachedConnection->Release();
+    _cachedConnection = NULL;
+    if (sub)
+        _cachedConnection = new SubConnection(ctx, options, _database, password, _timeout);
+    else
+        _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+    return LINK(_cachedConnection);
 }
 void Connection::selectDB(ICodeContext * ctx, int _database)
 {
@@ -499,13 +534,13 @@ unsigned __int64 Connection::dbSize(ICodeContext * ctx)
 //--OUTER--
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->set(ctx, key, value, expire);
 }
 //Set pointer types
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->set(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -532,12 +567,12 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
 //--OUTER--
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->get(ctx, key, returnValue);
 }
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->get(ctx, key, returnSize, returnValue);
 }
 //--INNER--
@@ -566,37 +601,110 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
     returnSize = reply->query()->len;
     returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
 }
+unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey)
+{
+    StringBuffer channel;
+    if (lockedKey)
+        encodeChannel(channel, keyOrChannel, _database);
+    else
+        channel.set(keyOrChannel);
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
+    assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
+    if (reply->query()->type == REDIS_REPLY_INTEGER)
+    {
+        if (reply->query()->integer >= 0)
+            return (unsigned __int64)reply->query()->integer;
+        else
+            throwUnexpected();
+    }
+    throwUnexpected();
+}
+void Connection::subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey)
+{
+    StringBuffer channel;
+    if (lockedKey)
+        encodeChannel(channel, keyOrChannel, _database);
+    else
+        channel.set(keyOrChannel);
+
+#if(0)//Replicate a lingering SUBSCRIBE to test channel comparison test when reading message.
+    {
+    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE oldChannel"));
+    assertOnErrorWithCmdMsg(reply->query(), "Test lingering SUBSCRIBE", "oldChannel");
+    }
+#endif
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
+    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
+    if (reply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", reply->query()->element[0]->str) != 0 )
+        fail("SUBSCRIBE", "failed to register SUB", channel.str());
+
+    readReply(reply);
+    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
+
+    if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0 && reply->query()->elements > 2)
+    {
+        if (strcmp( channel.str(), reply->query()->element[1]->str) != 0)//NOTE: In case the cached subscription connection encounters a failed unsubscribe resulting in mangled subscriptions.
+            throwUnexpected();
+
+        if (reply->query()->element[2]->len > 0)
+        {
+            messageSize = (size_t)reply->query()->element[2]->len;
+            message = reinterpret_cast<char*>(allocateAndCopy(reply->query()->element[2]->str, messageSize));
+        }
+        else
+        {
+            messageSize = 0;
+            message = NULL;
+        }
+        return;
+    }
+    throwUnexpected();
+}
 //--------------------------------------------------------------------------------
 //                           ECL SERVICE ENTRYPOINTS
 //--------------------------------------------------------------------------------
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, cachedPubConnection, options, 0, password, timeout);
+    return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
+{
+    size_t _messageSize = 0;
+    Owned<Connection> master = Connection::createConnection(ctx, cachedSubConnection, options, 0, password, timeout, true);
+    master->subscribe(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
+    messageSize = static_cast<size32_t>(_messageSize);
+}
 ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->clear(ctx);
 }
 ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     return master->exists(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->del(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->persist(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->expire(ctx, key, _expire);
 }
 ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     return master->dbSize(ctx);
 }
 //-----------------------------------SET------------------------------------------
@@ -686,7 +794,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & re
 //Set pointer types
 void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->lockSet(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -699,7 +807,7 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
 //--OUTER--
 void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->lockGet(ctx, key, returnSize, returnValue, password, expire);
 }
 //--INNER--
@@ -711,9 +819,9 @@ void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSi
     returnValue = reinterpret_cast<char*>(retVal.detach());
 }
 //---------------------------------------------------------------------------------------
-void Connection::encodeChannel(StringBuffer & channel, const char * key) const
+void Connection::encodeChannel(StringBuffer & channel, const char * key, int _database) const
 {
-    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
+    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(_database);
 }
 bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
@@ -759,7 +867,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
 
     StringBuffer channel;
-    encodeChannel(channel, key);
+    encodeChannel(channel, key, database);
 
     //Query key and set lock if non existent
     if (lock(ctx, key, channel.str(), expire))
@@ -854,7 +962,7 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
 {
     //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
     StringBuffer channel;
-    encodeChannel(channel, key);
+    encodeChannel(channel, key, database);
 
     if (size > 29)//c.f. 1st note below.
     {
@@ -862,21 +970,21 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
         if (expire == 0)
         {
             const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
         }
         else
         {
             const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
         }
         assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
@@ -965,7 +1073,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->unlock(ctx, key);
 }
 }//close namespace

+ 3 - 0
plugins/redis/redis.hpp

@@ -61,6 +61,9 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetData  (ICodeContext * _ctx,size32_t & returnLength, void * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
 
+    ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * _ctx, const char * keyOrChannel, size32_t messageLength, const char * message, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * _ctx, size32_t & messageLength, char * & message, const char * keyOrChannel, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
+
     //--------------------------------AUXILLARIES---------------------------
     ECL_REDIS_API bool             ECL_REDIS_CALL RExist  (ICodeContext * _ctx, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL RClear  (ICodeContext * _ctx, const char * options, int database, const char * pswd, unsigned timeout);