Browse Source

Merge pull request #7890 from jamienoss/issue14286-redis-plugin-add-independent-pub-sub-functionality-with-db-no-cachedSubConnection-c

HPCC-14286 Redis Plugin - Add independent pub/sub functions

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 năm trước cách đây
mục cha
commit
3a4c35c164

+ 36 - 24
plugins/redis/README.md

@@ -53,36 +53,42 @@ Here is a list of the core plugin **functions**.
 
 ###Set
 ```
-SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
 ```
 
 ###Get
 ```
-INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
 ```
 
 ###Utility
 ```
-BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Delete(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
-INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+FlushDB(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Delete(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Persist(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+INTEGER DBSize(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+```
+
+###PUB-SUB
+```
+UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
+STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
 ```
 
 The core points to note here are:
@@ -91,8 +97,12 @@ The core points to note here are:
    STRING of length 8, set with SetString, being successfully retrieved from the cache via GetInteger without an **ECL** exception being thrown.
    * `CONST VARSTRING options` passes the server **IP** and **port** to the plugin in the *strict* format - `--SERVER=<ip>:<port>`. If `options` is empty, the default
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
-   * `UNSIGNED timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
+   * `UNSIGNED4 timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
    * `UNSIGNED expire` has units *ms* and a default of **0**, i.e. *forever*.
+   * Both `Publish` and `Subscribe` have a flag `BOOLEAN lockedKey = FALSE` such, that when **TRUE**, will encode `CONST VARSTRING keyOrChannel` as if it were a key
+   allowing key-channel encoding compatibility with the `GetOrLockString` and `SetAndPublishString` functions. For this reason, they both also take a **Database** value as
+   this is used in the encoding of the lock and channel. Please note however that the redis pub-sub paradigm is actually irrespective of database.
+   * *c.f.* redis documentation for the following - [Exists](http://redis.io/commands/exists), [FlushDB](http://redis.io/commands/flushdb), [Delete](http://redis.io/commands/del), [Persist](http://redis.io/commands/persist), [Expire](http://redis.io/commands/expire), [DBSize](http://redis.io/commands/dbsize), [Publish](http://redis.io/commands/publish), & [Subscribe](http://redis.io/commands/subscribe).
 
 ###The redisServer MODULE
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
@@ -138,7 +148,7 @@ myRedis := redisServer('--SERVER=127.0.0.1:6379');
 
 STRING poppins := 'supercalifragilisticexpialidocious'; //Value to externally compute/retrieve from 3rd party vendor.
 
-myFunc(STRING key, UNSIGNED database) := FUNCTION  //Function for computing/retrieving a value.
+myFunc(STRING key, INTEGER4 database) := FUNCTION  //Function for computing/retrieving a value.
   return myRedis.GetString(key, database);
 END;
 
@@ -200,3 +210,5 @@ A few notes to point out here:
 | SetAndPublish (value length > 29) | 1       | 5       | new connection   |
 | SetAndPublish (value length < 29) | 4       | 8       | new connection   |
 | Unlock              | 5       | 9       | new connection   |
+| Publish             | 1       | 4       | new connection   |
+| Subscribe           | 5       | 5       | new connection always needed   |

+ 9 - 0
plugins/redis/lib_redis.ecllib

@@ -43,6 +43,9 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,action,context,entrypoint='RExpire';
   UNSIGNED DBSize(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='RDBSize';
 
+  UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRPub';
+  STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRSub';
+
   STRING   SetAndPublishString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetStr';
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
@@ -81,6 +84,9 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED4 timeout
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0) := redis.DBSize(options, database, password, timeout);
 
+  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
+  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
+
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
@@ -119,6 +125,9 @@ EXPORT RedisServerWithoutTimeout(VARSTRING options, VARSTRING password = '') :=
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire, UNSIGNED4 timeout = 1000)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.DBSize(options, database, password, timeout);
 
+  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
+  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
+
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);

+ 140 - 60
plugins/redis/redis.cpp

@@ -41,9 +41,10 @@ namespace RedisPlugin {
 
 class Connection;
 static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
-static __thread Connection * cachedConnection;
-static __thread ThreadTermFunc threadHookChain;
-static __thread bool threadHooked;
+static __thread Connection * cachedConnection = NULL;
+static __thread Connection * cachedPubConnection = NULL;//database should always = 0
+static __thread ThreadTermFunc threadHookChain = NULL;
+static __thread bool threadHooked = false;
 
 static void * allocateAndCopy(const void * src, size_t size)
 {
@@ -69,6 +70,7 @@ public :
 
     static Reply * createReply(void * _reply) { return new Reply(_reply); }
     inline const redisReply * query() const { return reply; }
+    void setClear(void * _reply) { setClear((redisReply*)_reply); }
     void setClear(redisReply * _reply)
     {
         if (reply)
@@ -103,14 +105,14 @@ private :
 class Connection : public CInterface
 {
 public :
-    Connection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
+    Connection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout);
     Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout);
     ~Connection()
     {
         if (context)
             redisFree(context);
     }
-    static Connection * createConnection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
+    static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, int database, const char * password, unsigned _timeout);
 
     //set
     template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
@@ -125,6 +127,11 @@ public :
     void unlock(ICodeContext * ctx, const char * key);
     //--------------------------------------------------------------------------------------
 
+    //-------------------------------PUB/SUB------------------------------------------------
+    unsigned __int64 publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey);
+    void subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey);
+    //--------------------------------------------------------------------------------------
+
     void persist(ICodeContext * ctx, const char * key);
     void expire(ICodeContext * ctx, const char * key, unsigned _expire);
     void del(ICodeContext * ctx, const char * key);
@@ -139,7 +146,7 @@ protected :
     void parseOptions(ICodeContext * ctx, const char * _options);
     void connect(ICodeContext * ctx, int _database, const char * password);
     void selectDB(ICodeContext * ctx, int _database);
-    void resetContextErr();
+    void reset(ICodeContext * ctx, const char * password, unsigned _timeout);
     void readReply(Reply * reply);
     void readReplyAndAssert(Reply * reply, const char * msg);
     void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = NULL);
@@ -157,7 +164,7 @@ protected :
     //-------------------------------LOCKING------------------------------------------------
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
     void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
-    void encodeChannel(StringBuffer & channel, const char * key) const;
+    void encodeChannel(StringBuffer & channel, const char * key, int _database) const;
     bool noScript(const redisReply * reply) const;
     bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
     //--------------------------------------------------------------------------------------
@@ -172,22 +179,6 @@ protected :
     int database; //NOTE: redis stores the maximum number of dbs as an 'int'.
 };
 
-//The following class is here to ensure destruction of the cachedConnection within the main thread
-//as this is not handled by the thread hook mechanism.
-static class MainThreadCachedConnection
-{
-public :
-    MainThreadCachedConnection() { }
-    ~MainThreadCachedConnection()
-    {
-        if (cachedConnection)
-        {
-            cachedConnection->Release();
-            cachedConnection = NULL;
-        }
-    }
-} mainThread;
-
 static void releaseContext()
 {
     if (cachedConnection)
@@ -195,6 +186,11 @@ static void releaseContext()
         cachedConnection->Release();
         cachedConnection = NULL;
     }
+    if (cachedPubConnection)
+    {
+        cachedPubConnection->Release();
+        cachedPubConnection = NULL;
+    }
     if (threadHookChain)
     {
         (*threadHookChain)();
@@ -202,6 +198,14 @@ static void releaseContext()
     }
     threadHooked = false;
 }
+//The following class is here to ensure destruction of the cachedConnection within the main thread
+//as this is not handled by the thread hook mechanism.
+static class MainThreadCachedConnection
+{
+public :
+    MainThreadCachedConnection() { }
+    ~MainThreadCachedConnection() { releaseContext(); }
+} mainThread;
 Connection::Connection(ICodeContext * ctx, const char * _options, int _database, const char * password, unsigned _timeout)
   : database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
 {
@@ -330,10 +334,16 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
         }
     }
 }
-void Connection::resetContextErr()
+void Connection::reset(ICodeContext * ctx, const char * password, unsigned _timeout)
 {
-    if (context)
-        context->err = REDIS_OK;
+    timeout.reset(_timeout);
+    if (context && context->err != REDIS_OK)
+    {
+        redisFree(context);
+        context = NULL;
+        database = 0;
+        connect(ctx, 0, password);
+    }
 }
 void Connection::readReply(Reply * reply)
 {
@@ -352,32 +362,32 @@ void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, c
     readReply(reply);
     assertOnErrorWithCmdMsg(reply->query(), msg, key);
 }
-Connection * Connection::createConnection(ICodeContext * ctx, const char * options, int _database, const char * password, unsigned _timeout)
+Connection * Connection::createConnection(ICodeContext * ctx,  Connection * & _cachedConnection, const char * options, int _database, const char * password, unsigned _timeout)
 {
-    if (!cachedConnection)
+    if (!_cachedConnection)
     {
-        cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+        _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+
         if (!threadHooked)
         {
             threadHookChain = addThreadTermFunc(releaseContext);
             threadHooked = true;
         }
-        return LINK(cachedConnection);
+        return LINK(_cachedConnection);
     }
 
-    if (cachedConnection->isSameConnection(ctx, options, password))
+    if (_cachedConnection->isSameConnection(ctx, options, password))
     {
         //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
-        cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
-        cachedConnection->timeout.reset(_timeout);
-        cachedConnection->selectDB(ctx, _database);
-        return LINK(cachedConnection);
+        _cachedConnection->reset(ctx, password, _timeout);
+        _cachedConnection->selectDB(ctx, _database);
+        return LINK(_cachedConnection);
     }
 
-    cachedConnection->Release();
-    cachedConnection = NULL;
-    cachedConnection = new Connection(ctx, options, _database, password, _timeout);
-    return LINK(cachedConnection);
+    _cachedConnection->Release();
+    _cachedConnection = NULL;
+    _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+    return LINK(_cachedConnection);
 }
 void Connection::selectDB(ICodeContext * ctx, int _database)
 {
@@ -499,13 +509,13 @@ unsigned __int64 Connection::dbSize(ICodeContext * ctx)
 //--OUTER--
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->set(ctx, key, value, expire);
 }
 //Set pointer types
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->set(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -532,12 +542,12 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
 //--OUTER--
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->get(ctx, key, returnValue);
 }
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->get(ctx, key, returnSize, returnValue);
 }
 //--INNER--
@@ -566,37 +576,107 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
     returnSize = reply->query()->len;
     returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
 }
+unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey)
+{
+    StringBuffer channel;
+    if (lockedKey)
+        encodeChannel(channel, keyOrChannel, _database);
+    else
+        channel.set(keyOrChannel);
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
+    assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
+    if (reply->query()->type == REDIS_REPLY_INTEGER)
+    {
+        if (reply->query()->integer >= 0)
+            return (unsigned __int64)reply->query()->integer;
+        else
+            throwUnexpected();
+    }
+    throwUnexpected();
+}
+void Connection::subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey)
+{
+    StringBuffer channel;
+    if (lockedKey)
+        encodeChannel(channel, keyOrChannel, _database);
+    else
+        channel.set(keyOrChannel);
+
+#if(0)//Replicate a lingering SUBSCRIBE to test channel comparison test when reading message.
+    {
+    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE oldChannel"));
+    assertOnErrorWithCmdMsg(reply->query(), "Test lingering SUBSCRIBE", "oldChannel");
+    }
+#endif
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
+    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
+    if (reply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", reply->query()->element[0]->str) != 0 )
+        fail("SUBSCRIBE", "failed to register SUB", channel.str());
+
+    readReply(reply);
+    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
+
+    if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0 && reply->query()->elements == 3)
+    {
+        if (reply->query()->element[2]->len > 0)
+        {
+            messageSize = (size_t)reply->query()->element[2]->len;
+            message = reinterpret_cast<char*>(allocateAndCopy(reply->query()->element[2]->str, messageSize));
+        }
+        else
+        {
+            messageSize = 0;
+            message = NULL;
+        }
+        return;
+    }
+    throwUnexpected();
+}
 //--------------------------------------------------------------------------------
 //                           ECL SERVICE ENTRYPOINTS
 //--------------------------------------------------------------------------------
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, cachedPubConnection, options, 0, password, timeout);
+    return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
+{
+    size_t _messageSize = 0;
+    Owned<Connection> master = new Connection(ctx,  options, 0, password, timeout);
+    master->subscribe(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
+    messageSize = static_cast<size32_t>(_messageSize);
+}
 ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->clear(ctx);
 }
 ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     return master->exists(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->del(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->persist(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->expire(ctx, key, _expire);
 }
 ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     return master->dbSize(ctx);
 }
 //-----------------------------------SET------------------------------------------
@@ -686,7 +766,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & re
 //Set pointer types
 void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
     master->lockSet(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -699,7 +779,7 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
 //--OUTER--
 void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
     master->lockGet(ctx, key, returnSize, returnValue, password, expire);
 }
 //--INNER--
@@ -711,9 +791,9 @@ void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSi
     returnValue = reinterpret_cast<char*>(retVal.detach());
 }
 //---------------------------------------------------------------------------------------
-void Connection::encodeChannel(StringBuffer & channel, const char * key) const
+void Connection::encodeChannel(StringBuffer & channel, const char * key, int _database) const
 {
-    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
+    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(_database);
 }
 bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
@@ -759,7 +839,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
 
     StringBuffer channel;
-    encodeChannel(channel, key);
+    encodeChannel(channel, key, database);
 
     //Query key and set lock if non existent
     if (lock(ctx, key, channel.str(), expire))
@@ -854,7 +934,7 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
 {
     //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
     StringBuffer channel;
-    encodeChannel(channel, key);
+    encodeChannel(channel, key, database);
 
     if (size > 29)//c.f. 1st note below.
     {
@@ -862,21 +942,21 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
         if (expire == 0)
         {
             const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
         }
         else
         {
             const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
         }
         assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
@@ -965,7 +1045,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
     master->unlock(ctx, key);
 }
 }//close namespace

+ 3 - 0
plugins/redis/redis.hpp

@@ -61,6 +61,9 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetData  (ICodeContext * _ctx,size32_t & returnLength, void * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
 
+    ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * _ctx, const char * keyOrChannel, size32_t messageLength, const char * message, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * _ctx, size32_t & messageLength, char * & message, const char * keyOrChannel, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
+
     //--------------------------------AUXILLARIES---------------------------
     ECL_REDIS_API bool             ECL_REDIS_CALL RExist  (ICodeContext * _ctx, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL RClear  (ICodeContext * _ctx, const char * options, int database, const char * pswd, unsigned timeout);

+ 33 - 0
testing/regress/ecl/key/rediserrortests.xml

@@ -0,0 +1,33 @@
+<Dataset name='Result 1'>
+ <Row><value>Auth Failed</value></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><value>Redis Plugin: ERROR - the requested key &apos;authTest1&apos; does not exist on database 0 on 127.0.0.1:6379</value></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><value>Redis Plugin: ERROR - connection for 127.0.0.1:9999 failed : Connection refused</value></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><value>Redis Plugin: ERROR - unsupported option string &apos;blahblahblah&apos;</value></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><value>Redis Plugin: ERROR - SELECT 16 on database 0 for 127.0.0.1:6379 failed : ERR invalid DB index</value></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;channelTest1&apos; on database 0 for 127.0.0.1:6379 failed : key locked with a channel (&apos;redis_ecl_lock_blah_blah_blah&apos;) different to that subscribed to (redis_ecl_lock_channelTest1_0).</value></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;channelTest2&apos; on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable</value></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><Result_8></Result_8></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;timeoutTest1&apos; on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable</value></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><Result_10></Result_10></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><value>Timed Out</value></Row>
+</Dataset>

+ 6 - 24
testing/regress/ecl/key/redislockingtest.xml

@@ -71,10 +71,10 @@
  <Row><Result_24>false</Result_24></Row>
 </Dataset>
 <Dataset name='Result 25'>
- <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;channelTest1&apos; on database 0 for 127.0.0.1:6379 failed : key locked with a channel (&apos;redis_ecl_lock_blah_blah_blah&apos;) different to that subscribed to (redis_ecl_lock_channelTest1_0).</value></Row>
+ <Row><Result_25>databaseThenExpire</Result_25></Row>
 </Dataset>
 <Dataset name='Result 26'>
- <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;channelTest2&apos; on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable</value></Row>
+ <Row><Result_26>databaseThenExpire</Result_26></Row>
 </Dataset>
 <Dataset name='Result 27'>
  <Row><Result_27>databaseThenExpire</Result_27></Row>
@@ -98,10 +98,10 @@
  <Row><Result_33>databaseThenExpire</Result_33></Row>
 </Dataset>
 <Dataset name='Result 34'>
- <Row><Result_34>databaseThenExpire</Result_34></Row>
+ <Row><Result_34>Good boy Einnie!</Result_34></Row>
 </Dataset>
 <Dataset name='Result 35'>
- <Row><Result_35>databaseThenExpire</Result_35></Row>
+ <Row><Result_35>Good boy Einnie!</Result_35></Row>
 </Dataset>
 <Dataset name='Result 36'>
  <Row><Result_36>Good boy Einnie!</Result_36></Row>
@@ -110,10 +110,10 @@
  <Row><Result_37>Good boy Einnie!</Result_37></Row>
 </Dataset>
 <Dataset name='Result 38'>
- <Row><Result_38>Good boy Einnie!</Result_38></Row>
+ <Row><Result_38>supercalifragilisticexpialidocious</Result_38></Row>
 </Dataset>
 <Dataset name='Result 39'>
- <Row><Result_39>Good boy Einnie!</Result_39></Row>
+ <Row><Result_39>supercalifragilisticexpialidocious</Result_39></Row>
 </Dataset>
 <Dataset name='Result 40'>
  <Row><Result_40>supercalifragilisticexpialidocious</Result_40></Row>
@@ -121,21 +121,3 @@
 <Dataset name='Result 41'>
  <Row><Result_41>supercalifragilisticexpialidocious</Result_41></Row>
 </Dataset>
-<Dataset name='Result 42'>
- <Row><Result_42>supercalifragilisticexpialidocious</Result_42></Row>
-</Dataset>
-<Dataset name='Result 43'>
- <Row><Result_43>supercalifragilisticexpialidocious</Result_43></Row>
-</Dataset>
-<Dataset name='Result 44'>
- <Row><Result_44></Result_44></Row>
-</Dataset>
-<Dataset name='Result 45'>
- <Row><value>Redis Plugin: ERROR - GetOrLock&lt;type&gt; &apos;timeoutTest1&apos; on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable</value></Row>
-</Dataset>
-<Dataset name='Result 46'>
- <Row><Result_46></Result_46></Row>
-</Dataset>
-<Dataset name='Result 47'>
- <Row><value>Timed Out</value></Row>
-</Dataset>

+ 2 - 11
testing/regress/ecl/key/redissynctest.xml

@@ -89,17 +89,8 @@
  <Row><Result_30>300</Result_30></Row>
 </Dataset>
 <Dataset name='Result 31'>
- <Row><value>Auth Failed</value></Row>
+ <Row><Result_31>2000</Result_31></Row>
 </Dataset>
 <Dataset name='Result 32'>
- <Row><value>Redis Plugin: ERROR - the requested key &apos;authTest1&apos; does not exist on database 0 on 127.0.0.1:6379</value></Row>
-</Dataset>
-<Dataset name='Result 33'>
- <Row><value>Redis Plugin: ERROR - connection for 127.0.0.1:9999 failed : Connection refused</value></Row>
-</Dataset>
-<Dataset name='Result 34'>
- <Row><value>Redis Plugin: ERROR - unsupported option string &apos;blahblahblah&apos;</value></Row>
-</Dataset>
-<Dataset name='Result 35'>
- <Row><value>Redis Plugin: ERROR - SELECT 16 on database 0 for 127.0.0.1:6379 failed : ERR invalid DB index</value></Row>
+ <Row><Result_32>OK</Result_32></Row>
 </Dataset>

+ 123 - 0
testing/regress/ecl/rediserrortests.ecl

@@ -0,0 +1,123 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=embedded
+//class=3rdparty
+
+//nothor
+
+IMPORT * FROM lib_redis;
+IMPORT Std;
+
+STRING server := '--SERVER=127.0.0.1:6379';
+STRING password := 'foobared';
+redis.FlushDB(server, /*database*/, password);
+myRedis := redisServer(server, password);
+
+myFuncStr(STRING key) := FUNCTION
+ value := myRedis.GetString(key);
+ return value;
+END;
+myFuncUtf8(STRING key) := FUNCTION
+ value := myRedis.GetUtf8(key);
+ return value;
+END;
+myFuncUni(STRING key) := FUNCTION
+ value := myRedis.GetUnicode(key);
+ return value;
+END;
+
+getString(STRING key, STRING key2, myFuncStr func) := FUNCTION
+    value := myRedis.GetOrLockString(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.SetAndPublishString(key, func(key2)), value);
+END;
+getUtf8(STRING key, STRING key2, myFuncUtf8 func) := FUNCTION
+    value := myRedis.GetOrLockUtf8(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.SetAndPublishUtf8(key, func(key2)), value);
+END;
+getUnicode(STRING key, STRING key2, myFuncUni func) := FUNCTION
+    value := myRedis.GetOrLockUnicode(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.SetAndPublishUnicode(key, func(key2)), value);
+END;
+
+//Test some exceptions
+myRedis4 := RedisServer(server);
+STRING noauth := 'Redis Plugin: ERROR - authentication for 127.0.0.1:6379 failed : NOAUTH Authentication required.';
+STRING opNotPerm :=  'Redis Plugin: ERROR - authentication for 127.0.0.1:6379 failed : ERR operation not permitted';
+ds1 := DATASET(NOFOLD(1), TRANSFORM({STRING value}, SELF.value := myRedis4.GetString('authTest' + (STRING)COUNTER)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    OUTPUT(CATCH(ds1, ONFAIL(TRANSFORM({ STRING value }, SELF.value := IF(FAILMESSAGE = noauth OR FAILMESSAGE = opNotPerm, 'Auth Failed', 'Unexpected Error - ' + FAILMESSAGE)))));
+    );
+
+ds2 := DATASET(NOFOLD(1), TRANSFORM({STRING value}, SELF.value := myRedis.GetString('authTest' + (STRING)COUNTER)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    OUTPUT(CATCH(ds2, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+myRedis5 := RedisServer('--SERVER=127.0.0.1:9999');
+ds3 := DATASET(NOFOLD(1), TRANSFORM({STRING value}, SELF.value := myRedis5.GetString('connectTest' + (STRING)COUNTER)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    OUTPUT(CATCH(ds3, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+ds4 := DATASET(NOFOLD(1), TRANSFORM({STRING value}, SELF.value := redis.GetString('option' + (STRING)COUNTER, 'blahblahblah')));
+SEQUENTIAL(
+    OUTPUT(CATCH(ds4, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+ds5 := DATASET(NOFOLD(1), TRANSFORM({STRING value}, SELF.value := myRedis.GetString('maxDB' + (STRING)COUNTER, 16)));
+SEQUENTIAL(
+    OUTPUT(CATCH(ds5, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+//Test exception for checking expected channels
+ds6 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetOrLockString('channelTest' + (string)COUNTER)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.SetString('channelTest1', 'redis_ecl_lock_blah_blah_blah');
+    OUTPUT(CATCH(ds6, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+ds7 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetOrLockString('channelTest' + (string)(1+COUNTER))));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.SetString('channelTest2', 'redis_ecl_lock_channelTest2_0');
+    OUTPUT(CATCH(ds7, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+//Test timeout
+myRedisNoTO := redisServerWithoutTimeout(server, password);
+dsTO := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedisNoTO.GetOrLockString('timeoutTest' + (string)COUNTER,,,1000)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.GetOrLockString('timeoutTest1');
+    OUTPUT(CATCH(dsTO, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    );
+
+STRING pluginTO := 'Redis Plugin: ERROR - function timed out internally.';
+STRING redisTO := 'Redis Plugin: ERROR - GetOrLock<type> \'timeoutTest2\' on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable';
+dsTO2 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := redis.GetOrLockString('timeoutTest' + (string)(1+COUNTER), server, /*database*/, password, 1/*ms*/)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.GetOrLockString('timeoutTest2');
+    OUTPUT(CATCH(dsTO2, ONFAIL(TRANSFORM({ STRING value }, SELF.value := IF(FAILMESSAGE = pluginTO OR FAILMESSAGE = redisTO, 'Timed Out', 'Unexpected Error - ' + FAILMESSAGE)))));
+    );
+
+myRedis.FlushDB();

+ 1 - 34
testing/regress/ecl/redislockingtest.ecl

@@ -18,7 +18,7 @@
 //class=embedded
 //class=3rdparty
 
-//nothor
+//nohthor
 
 IMPORT * FROM lib_redis;
 IMPORT Std;
@@ -126,21 +126,6 @@ SEQUENTIAL(
     myRedis.FlushDB(),
     );
 
-//Test exception for checking expected channels
-ds1 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetOrLockString('channelTest' + (string)COUNTER)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    myRedis.SetString('channelTest1', 'redis_ecl_lock_blah_blah_blah');
-    OUTPUT(CATCH(ds1, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
-
-ds2 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetOrLockString('channelTest' + (string)(1+COUNTER))));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    myRedis.SetString('channelTest2', 'redis_ecl_lock_channelTest2_0');
-    OUTPUT(CATCH(ds2, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
-
 SEQUENTIAL(
     myRedis.FlushDB(1);
     myRedis.SetAndPublishString('testDatabaseExpire1', 'databaseThenExpire', 1, 10000);
@@ -183,22 +168,4 @@ SEQUENTIAL(
     myRedis.FlushDB(1);
     );
 
-//Test timeout
-myRedisNoTO := redisServerWithoutTimeout(server, password);
-dsTO := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedisNoTO.GetOrLockString('timeoutTest' + (string)COUNTER,,,1000)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    myRedis.GetOrLockString('timeoutTest1');
-    OUTPUT(CATCH(dsTO, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
-
-STRING pluginTO := 'Redis Plugin: ERROR - function timed out internally.';
-STRING redisTO := 'Redis Plugin: ERROR - GetOrLock<type> \'timeoutTest2\' on database 0 for 127.0.0.1:6379 failed : Resource temporarily unavailable';
-dsTO2 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := redis.GetOrLockString('timeoutTest' + (string)(1+COUNTER), server, /*database*/, password, 1/*ms*/)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    myRedis.GetOrLockString('timeoutTest2');
-    OUTPUT(CATCH(dsTO2, ONFAIL(TRANSFORM({ STRING value }, SELF.value := IF(FAILMESSAGE = pluginTO OR FAILMESSAGE = redisTO, 'Timed Out', 'Unexpected Error - ' + FAILMESSAGE)))));
-    );
-
 myRedis.FlushDB();

+ 24 - 28
testing/regress/ecl/redissynctest.ecl

@@ -18,7 +18,7 @@
 //class=embedded
 //class=3rdparty
 
-//nothor
+//nohthor
 
 IMPORT redis FROM lib_redis;
 IMPORT Std;
@@ -160,38 +160,34 @@ SEQUENTIAL(
     OUTPUT(SUM(NOFOLD(s1 + s2), a))//answer = (x+x/2)*N, in this case 300.
     );
 
-//Test some exceptions
-myRedis4 := RedisServer(server);
-STRING noauth := 'Redis Plugin: ERROR - authentication for 127.0.0.1:6379 failed : NOAUTH Authentication required.';
-STRING opNotPerm :=  'Redis Plugin: ERROR - authentication for 127.0.0.1:6379 failed : ERR operation not permitted';
-ds1 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis4.GetString('authTest' + (string)COUNTER)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    OUTPUT(CATCH(ds1, ONFAIL(TRANSFORM({ STRING value }, SELF.value := IF(FAILMESSAGE = noauth OR FAILMESSAGE = opNotPerm, 'Auth Failed', 'Unexpected Error - ' + FAILMESSAGE)))));
-    );
-
-ds2 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetString('authTest' + (string)COUNTER)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    OUTPUT(CATCH(ds2, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
+//Test Publish and Subscribe
+//SUM(NOFOLD(s1 + s2), a) uses two threads - this test relies on this fact to work!
+INTEGER N2 := 1000;
+subDS := DATASET(N2, TRANSFORM({ integer a }, SELF.a := (INTEGER)myRedis.Subscribe('PubSubTest' + (STRING)COUNTER)));
 
-myRedis5 := RedisServer('--SERVER=127.0.0.1:9999');
-ds3 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis5.GetString('connectTest' + (string)COUNTER)));
-SEQUENTIAL(
-    myRedis.FlushDB();
-    OUTPUT(CATCH(ds3, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
+INTEGER pub(STRING channel) := FUNCTION
+        sl := Std.System.Debug.Sleep(2);
+        value :=  myRedis.Publish(channel, '1');
+     RETURN WHEN(value, sl, BEFORE);
+END;
+pubDS := DATASET(N2, TRANSFORM({ integer a }, SELF.a := pub('PubSubTest' + (STRING)COUNTER)));
 
-ds4 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := redis.GetString('option' + (string)COUNTER, 'blahblahblah')));
-SEQUENTIAL(
-    OUTPUT(CATCH(ds4, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
-    );
+INTEGER pub2(STRING channel) := FUNCTION
+        sl := SEQUENTIAL(
+            Std.System.Debug.Sleep(2),
+            myRedis.Publish(channel, '3')//This pub is the one read by the sub.
+            );
+        value :=  myRedis.Publish(channel, '10000');//This pub isn't read by the sub, however the returned subscription count is present in the sum
+     RETURN WHEN(value, sl, BEFORE);
+END;
+pubDS2 := DATASET(N2, TRANSFORM({ integer a }, SELF.a := pub2('PubSubTest' + (STRING)COUNTER)));
 
-ds5 := DATASET(NOFOLD(1), TRANSFORM({string value}, SELF.value := myRedis.GetString('maxDB' + (string)COUNTER, 16)));
+value := SUM(NOFOLD(subDS + pubDS2), a);
 SEQUENTIAL(
-    OUTPUT(CATCH(ds5, ONFAIL(TRANSFORM({ STRING value }, SELF.value := FAILMESSAGE))));
+    OUTPUT(SUM(NOFOLD(subDS + pubDS), a));//answer = N*2 = 2000
+    OUTPUT( IF (value > N2*4, (STRING)value, 'OK'));//ideally result = N*3, less than this => not all subs had pubs (but this would cause a timeout).
     );
+    // N*3 > result < N*4 => all subs received a pub, however, there were result-N*3 subs still open for the second pub. result > N*4 => gremlins.
 
 myRedis.FlushDB();
 myRedis2.FlushDB();