Преглед изворни кода

Merge pull request #7888 from jamienoss/revert-7853-issue14286-redis-plugin-add-independent-pub-sub-functionality-with-db

Revert "HPCC-14286 Redis Plugin - add publish and subscribe functions"

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday пре 9 година
родитељ
комит
ba78b8dfec
4 измењених фајлова са 81 додато и 213 уклоњено
  1. 24 36
      plugins/redis/README.md
  2. 0 9
      plugins/redis/lib_redis.ecllib
  3. 57 165
      plugins/redis/redis.cpp
  4. 0 3
      plugins/redis/redis.hpp

+ 24 - 36
plugins/redis/README.md

@@ -53,42 +53,36 @@ Here is a list of the core plugin **functions**.
 
 ###Set
 ```
-SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
 ```
 
 ###Get
 ```
-INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
+INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
 ```
 
 ###Utility
 ```
-BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-FlushDB(CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-Delete(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-Persist(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-INTEGER DBSize(CONST VARSTRING options, INTEGER database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)
-```
-
-###PUB-SUB
-```
-UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
-STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE)
+BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+Delete(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
+INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000)
 ```
 
 The core points to note here are:
@@ -97,12 +91,8 @@ The core points to note here are:
    STRING of length 8, set with SetString, being successfully retrieved from the cache via GetInteger without an **ECL** exception being thrown.
    * `CONST VARSTRING options` passes the server **IP** and **port** to the plugin in the *strict* format - `--SERVER=<ip>:<port>`. If `options` is empty, the default
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
-   * `UNSIGNED4 timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
+   * `UNSIGNED timeout` has units *ms* and has a default value of 1 second (0 := infinity).  *c.f.* 'Timeout Values' below for advice on choosing appropriate values.
    * `UNSIGNED expire` has units *ms* and a default of **0**, i.e. *forever*.
-   * Both `Publish` and `Subscribe` have a flag `BOOLEAN lockedKey = FALSE` such, that when **TRUE**, will encode `CONST VARSTRING keyOrChannel` as if it were a key
-   allowing key-channel encoding compatibility with the `GetOrLockString` and `SetAndPublishString` functions. For this reason, they both also take a **Database** value as
-   this is used in the encoding of the lock and channel. Please note however that the redis pub-sub paradigm is actually irrespective of database.
-   * *c.f.* redis documentation for the following - [Exists](http://redis.io/commands/exists), [FlushDB](http://redis.io/commands/flushdb), [Delete](http://redis.io/commands/del), [Persist](http://redis.io/commands/persist), [Expire](http://redis.io/commands/expire), [DBSize](http://redis.io/commands/dbsize), [Publish](http://redis.io/commands/publish), & [Subscribe](http://redis.io/commands/subscribe).
 
 ###The redisServer MODULE
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
@@ -148,7 +138,7 @@ myRedis := redisServer('--SERVER=127.0.0.1:6379');
 
 STRING poppins := 'supercalifragilisticexpialidocious'; //Value to externally compute/retrieve from 3rd party vendor.
 
-myFunc(STRING key, INTEGER database) := FUNCTION  //Function for computing/retrieving a value.
+myFunc(STRING key, UNSIGNED database) := FUNCTION  //Function for computing/retrieving a value.
   return myRedis.GetString(key, database);
 END;
 
@@ -210,5 +200,3 @@ A few notes to point out here:
 | SetAndPublish (value length > 29) | 1       | 5       | new connection   |
 | SetAndPublish (value length < 29) | 4       | 8       | new connection   |
 | Unlock              | 5       | 9       | new connection   |
-| Publish             | 1       | 4       | new connection   |
-| Subscribe           | 2       | 5       | new connection   |

+ 0 - 9
plugins/redis/lib_redis.ecllib

@@ -43,9 +43,6 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   Expire(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,action,context,entrypoint='RExpire';
   UNSIGNED DBSize(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='RDBSize';
 
-  UNSIGNED Publish(CONST VARSTRING keyOrChannel, CONST STRING message, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRPub';
-  STRING Subscribe(CONST VARSTRING keyOrChannel, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) : cpp,once,context,entrypoint='SyncRSub';
-
   STRING   SetAndPublishString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetStr';
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, INTEGER4 database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
@@ -84,9 +81,6 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED4 timeout
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0) := redis.DBSize(options, database, password, timeout);
 
-  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
-  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
-
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
@@ -125,9 +119,6 @@ EXPORT RedisServerWithoutTimeout(VARSTRING options, VARSTRING password = '') :=
   EXPORT Expire(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 expire, UNSIGNED4 timeout = 1000)  := redis.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.DBSize(options, database, password, timeout);
 
-  EXPORT Publish(VARSTRING keyOrChannel, STRING message, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Publish(keyOrChannel, message, options, database, password, timeout, lockedKey);
-  EXPORT Subscribe(VARSTRING keyOrChannel, INTEGER4 database = 0, UNSIGNED4 timeout = 1000, BOOLEAN lockedKey = FALSE) := redis.Subscribe(keyOrChannel, options, database, password, timeout, lockedKey);
-
   EXPORT  SetAndPublishUnicode(VARSTRING key, UNICODE value,  INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUnicode (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     INTEGER4 database = 0, UNSIGNED4 expire = 0, UNSIGNED4 timeout = 1000) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);

+ 57 - 165
plugins/redis/redis.cpp

@@ -40,13 +40,10 @@ ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
 namespace RedisPlugin {
 
 class Connection;
-class SubConnection;
 static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
-static __thread Connection * cachedConnection = NULL;
-static __thread Connection * cachedPubConnection = NULL;//database should always = 0
-static __thread Connection * cachedSubConnection = NULL;//intended to always point to SubConnection
-static __thread ThreadTermFunc threadHookChain = NULL;
-static __thread bool threadHooked = false;
+static __thread Connection * cachedConnection;
+static __thread ThreadTermFunc threadHookChain;
+static __thread bool threadHooked;
 
 static void * allocateAndCopy(const void * src, size_t size)
 {
@@ -72,7 +69,6 @@ public :
 
     static Reply * createReply(void * _reply) { return new Reply(_reply); }
     inline const redisReply * query() const { return reply; }
-    void setClear(void * _reply) { setClear((redisReply*)_reply); }
     void setClear(redisReply * _reply)
     {
         if (reply)
@@ -107,14 +103,14 @@ private :
 class Connection : public CInterface
 {
 public :
-    Connection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout);
+    Connection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
     Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout);
     ~Connection()
     {
         if (context)
             redisFree(context);
     }
-    static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, int database, const char * password, unsigned _timeout, bool sub = false);
+    static Connection * createConnection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
 
     //set
     template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
@@ -129,11 +125,6 @@ public :
     void unlock(ICodeContext * ctx, const char * key);
     //--------------------------------------------------------------------------------------
 
-    //-------------------------------PUB/SUB------------------------------------------------
-    unsigned __int64 publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey);
-    void subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey);
-    //--------------------------------------------------------------------------------------
-
     void persist(ICodeContext * ctx, const char * key);
     void expire(ICodeContext * ctx, const char * key, unsigned _expire);
     void del(ICodeContext * ctx, const char * key);
@@ -142,14 +133,13 @@ public :
     bool exists(ICodeContext * ctx, const char * key);
 
 protected :
-    virtual void selectDB(ICodeContext * ctx, int _database);
-    virtual void reset();
-
     void redisSetTimeout();
     void redisConnect();
     unsigned timeLeft();
     void parseOptions(ICodeContext * ctx, const char * _options);
     void connect(ICodeContext * ctx, int _database, const char * password);
+    void selectDB(ICodeContext * ctx, int _database);
+    void resetContextErr();
     void readReply(Reply * reply);
     void readReplyAndAssert(Reply * reply, const char * msg);
     void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = NULL);
@@ -167,7 +157,7 @@ protected :
     //-------------------------------LOCKING------------------------------------------------
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
     void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
-    void encodeChannel(StringBuffer & channel, const char * key, int _database) const;
+    void encodeChannel(StringBuffer & channel, const char * key) const;
     bool noScript(const redisReply * reply) const;
     bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
     //--------------------------------------------------------------------------------------
@@ -181,16 +171,22 @@ protected :
     TimeoutHandler timeout;
     int database; //NOTE: redis stores the maximum number of dbs as an 'int'.
 };
-class SubConnection : public Connection
+
+//The following class is here to ensure destruction of the cachedConnection within the main thread
+//as this is not handled by the thread hook mechanism.
+static class MainThreadCachedConnection
 {
 public :
-    SubConnection(ICodeContext * ctx, const char * _options, int database, const char * password, unsigned _timeout) :
-        Connection(ctx, _options, 0, password, _timeout) { };
-
-protected :
-    virtual void selectDB(ICodeContext * ctx, int _database) { };
-    virtual void reset();
-};
+    MainThreadCachedConnection() { }
+    ~MainThreadCachedConnection()
+    {
+        if (cachedConnection)
+        {
+            cachedConnection->Release();
+            cachedConnection = NULL;
+        }
+    }
+} mainThread;
 
 static void releaseContext()
 {
@@ -199,16 +195,6 @@ static void releaseContext()
         cachedConnection->Release();
         cachedConnection = NULL;
     }
-    if (cachedPubConnection)
-    {
-        cachedPubConnection->Release();
-        cachedPubConnection = NULL;
-    }
-    if (cachedSubConnection)
-    {
-        cachedSubConnection->Release();
-        cachedSubConnection = NULL;
-    }
     if (threadHookChain)
     {
         (*threadHookChain)();
@@ -216,14 +202,6 @@ static void releaseContext()
     }
     threadHooked = false;
 }
-//The following class is here to ensure destruction of the cachedConnection within the main thread
-//as this is not handled by the thread hook mechanism.
-static class MainThreadCachedConnection
-{
-public :
-    MainThreadCachedConnection() { }
-    ~MainThreadCachedConnection() { releaseContext(); }
-} mainThread;
 Connection::Connection(ICodeContext * ctx, const char * _options, int _database, const char * password, unsigned _timeout)
   : database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
 {
@@ -352,17 +330,11 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
         }
     }
 }
-void Connection::reset()
+void Connection::resetContextErr()
 {
     if (context)
         context->err = REDIS_OK;
 }
-void SubConnection::reset()
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "UNSUBSCRIBE"));
-    assertOnErrorWithCmdMsg(reply->query(), "UNSUBSCRIBE all channels");
-    Connection::reset();
-}
 void Connection::readReply(Reply * reply)
 {
     redisReply * nakedReply = NULL;
@@ -380,39 +352,32 @@ void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, c
     readReply(reply);
     assertOnErrorWithCmdMsg(reply->query(), msg, key);
 }
-Connection * Connection::createConnection(ICodeContext * ctx,  Connection * & _cachedConnection, const char * options, int _database, const char * password, unsigned _timeout, bool sub)
+Connection * Connection::createConnection(ICodeContext * ctx, const char * options, int _database, const char * password, unsigned _timeout)
 {
-    if (!_cachedConnection)
+    if (!cachedConnection)
     {
-        if (sub)
-            _cachedConnection = new SubConnection(ctx, options, _database, password, _timeout);
-        else
-            _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
-
+        cachedConnection = new Connection(ctx, options, _database, password, _timeout);
         if (!threadHooked)
         {
             threadHookChain = addThreadTermFunc(releaseContext);
             threadHooked = true;
         }
-        return LINK(_cachedConnection);
+        return LINK(cachedConnection);
     }
 
-    if (_cachedConnection->isSameConnection(ctx, options, password))
+    if (cachedConnection->isSameConnection(ctx, options, password))
     {
         //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
-        _cachedConnection->reset();//reset the context err to allow reuse when an error previously occurred.
-        _cachedConnection->timeout.reset(_timeout);
-        _cachedConnection->selectDB(ctx, _database);
-        return LINK(_cachedConnection);
+        cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
+        cachedConnection->timeout.reset(_timeout);
+        cachedConnection->selectDB(ctx, _database);
+        return LINK(cachedConnection);
     }
 
-    _cachedConnection->Release();
-    _cachedConnection = NULL;
-    if (sub)
-        _cachedConnection = new SubConnection(ctx, options, _database, password, _timeout);
-    else
-        _cachedConnection = new Connection(ctx, options, _database, password, _timeout);
-    return LINK(_cachedConnection);
+    cachedConnection->Release();
+    cachedConnection = NULL;
+    cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+    return LINK(cachedConnection);
 }
 void Connection::selectDB(ICodeContext * ctx, int _database)
 {
@@ -534,13 +499,13 @@ unsigned __int64 Connection::dbSize(ICodeContext * ctx)
 //--OUTER--
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
     master->set(ctx, key, value, expire);
 }
 //Set pointer types
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
     master->set(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -567,12 +532,12 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
 //--OUTER--
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
     master->get(ctx, key, returnValue);
 }
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
     master->get(ctx, key, returnSize, returnValue);
 }
 //--INNER--
@@ -601,110 +566,37 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
     returnSize = reply->query()->len;
     returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
 }
-unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, int _database, bool lockedKey)
-{
-    StringBuffer channel;
-    if (lockedKey)
-        encodeChannel(channel, keyOrChannel, _database);
-    else
-        channel.set(keyOrChannel);
-
-    OwnedReply reply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
-    assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
-    if (reply->query()->type == REDIS_REPLY_INTEGER)
-    {
-        if (reply->query()->integer >= 0)
-            return (unsigned __int64)reply->query()->integer;
-        else
-            throwUnexpected();
-    }
-    throwUnexpected();
-}
-void Connection::subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t & messageSize, char * & message, int _database, bool lockedKey)
-{
-    StringBuffer channel;
-    if (lockedKey)
-        encodeChannel(channel, keyOrChannel, _database);
-    else
-        channel.set(keyOrChannel);
-
-#if(0)//Replicate a lingering SUBSCRIBE to test channel comparison test when reading message.
-    {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE oldChannel"));
-    assertOnErrorWithCmdMsg(reply->query(), "Test lingering SUBSCRIBE", "oldChannel");
-    }
-#endif
-
-    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
-    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
-    if (reply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", reply->query()->element[0]->str) != 0 )
-        fail("SUBSCRIBE", "failed to register SUB", channel.str());
-
-    readReply(reply);
-    assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
-
-    if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0 && reply->query()->elements > 2)
-    {
-        if (strcmp( channel.str(), reply->query()->element[1]->str) != 0)//NOTE: In case the cached subscription connection encounters a failed unsubscribe resulting in mangled subscriptions.
-            throwUnexpected();
-
-        if (reply->query()->element[2]->len > 0)
-        {
-            messageSize = (size_t)reply->query()->element[2]->len;
-            message = reinterpret_cast<char*>(allocateAndCopy(reply->query()->element[2]->str, messageSize));
-        }
-        else
-        {
-            messageSize = 0;
-            message = NULL;
-        }
-        return;
-    }
-    throwUnexpected();
-}
 //--------------------------------------------------------------------------------
 //                           ECL SERVICE ENTRYPOINTS
 //--------------------------------------------------------------------------------
-ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
-{
-    Owned<Connection> master = Connection::createConnection(ctx, cachedPubConnection, options, 0, password, timeout);
-    return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey)
-{
-    size_t _messageSize = 0;
-    Owned<Connection> master = Connection::createConnection(ctx, cachedSubConnection, options, 0, password, timeout, true);
-    master->subscribe(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
-    messageSize = static_cast<size32_t>(_messageSize);
-}
 ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     master->clear(ctx);
 }
 ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     return master->exists(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     master->del(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     master->persist(ctx, key);
 }
 ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     master->expire(ctx, key, _expire);
 }
 ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     return master->dbSize(ctx);
 }
 //-----------------------------------SET------------------------------------------
@@ -794,7 +686,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & re
 //Set pointer types
 void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
     master->lockSet(ctx, key, valueSize, value, expire);
 }
 //--INNER--
@@ -807,7 +699,7 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
 //--OUTER--
 void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
     master->lockGet(ctx, key, returnSize, returnValue, password, expire);
 }
 //--INNER--
@@ -819,9 +711,9 @@ void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSi
     returnValue = reinterpret_cast<char*>(retVal.detach());
 }
 //---------------------------------------------------------------------------------------
-void Connection::encodeChannel(StringBuffer & channel, const char * key, int _database) const
+void Connection::encodeChannel(StringBuffer & channel, const char * key) const
 {
-    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(_database);
+    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
 }
 bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
@@ -867,7 +759,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
 
     StringBuffer channel;
-    encodeChannel(channel, key, database);
+    encodeChannel(channel, key);
 
     //Query key and set lock if non existent
     if (lock(ctx, key, channel.str(), expire))
@@ -962,7 +854,7 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
 {
     //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
     StringBuffer channel;
-    encodeChannel(channel, key, database);
+    encodeChannel(channel, key);
 
     if (size > 29)//c.f. 1st note below.
     {
@@ -970,21 +862,21 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
         if (expire == 0)
         {
             const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
-            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
-                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
         }
         else
         {
             const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
-                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
         }
         assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
@@ -1073,7 +965,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
 {
-    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout);
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
     master->unlock(ctx, key);
 }
 }//close namespace

+ 0 - 3
plugins/redis/redis.hpp

@@ -61,9 +61,6 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetData  (ICodeContext * _ctx,size32_t & returnLength, void * & returnValue, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
 
-    ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * _ctx, const char * keyOrChannel, size32_t messageLength, const char * message, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
-    ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * _ctx, size32_t & messageLength, char * & message, const char * keyOrChannel, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey);
-
     //--------------------------------AUXILLARIES---------------------------
     ECL_REDIS_API bool             ECL_REDIS_CALL RExist  (ICodeContext * _ctx, const char * key, const char * options, int database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL RClear  (ICodeContext * _ctx, const char * options, int database, const char * pswd, unsigned timeout);