Browse Source

HPCC-13051 Redis locking module

Signed-off-by: James Noss <james.noss@lexisnexis.com>
James Noss 10 years ago
parent
commit
8bec3d0c8c

+ 2 - 5
plugins/redis/CMakeLists.txt

@@ -28,11 +28,8 @@ if (USE_REDIS)
   ADD_PLUGIN(redis PACKAGES REDIS OPTION MAKE_REDIS)
   if ( MAKE_REDIS )
     set (    SRCS
-             redisplugin.hpp
-             redissync.hpp
-
-             redisplugin.cpp
-             redissync.cpp
+             redis.hpp
+             redis.cpp
         )
 
     include_directories (

+ 160 - 0
plugins/redis/README.md

@@ -0,0 +1,160 @@
+ECL Redis Plugin
+================
+
+This is the ECL plugin to utilize the persistent key-value cache [Redis](http://redis.io).
+It utilises the C API [hiredis](http://github.com/redis/hiredis).
+
+Installation and Dependencies
+----------------------------
+
+To build the redis plugin with the HPCC-Platform, libhiredis-dev is required.
+```
+sudo apt-get install libhiredis-dev
+```
+
+The redis server and client software can be obtained via either - [binaries](http://redis.io/download), [source](https://github.com/antirez/redis) or the preferred method:
+```
+sudo apt-get redis-server
+```
+
+Getting started
+---------------
+
+The server can be started by typing `redis-server` within a terminal. To run with with a non-default configuration run as `redis-server redis.conf`, where
+redis.conf is the configuration file supplied with the redis-server package.
+
+For example, to require the server to **password authenticate**, locate and copy redis.conf to a desired dir. Then locate and alter the 'requirepass' variable within the file.
+Similarly the server **port** can also be altered here. *Note:* that the default is 6379 and that if multiple and individual caches are required then they are by definition redis-servers
+on different ports.
+
+The **redis-server** package comes with the redis client **redis-cli**. This can be used to send and receive commands to and from the server, invoked by `redis-cli` or, for example,
+`redis-cli -p 6380` to connect to the redis-cache on port 6380 (assuming one has been started).
+
+Perhaps on of the most handy uses of **redis-cli** is the ability to monitor all commands issued to the server via the redis command `MONITOR`. `INFO ALL` is also a useful command
+for listing the server and cache settings and statistics. *Note:* that if **requirepass** is activated **redis-cli** with require you to authenticate via `AUTH <passcode>`.
+
+Further [documentation](http://redis.io/documentation) is available with a full list of redis [commands](http://redis.io/commands).
+
+The Actual Plugin
+-----------------
+
+The bulk of this redis plugin for **ECL** is made up of the various `SET` and `GET` commands e.g. `GetString` or `SetReal`. They are accessible via the module `sync`
+from the redis plugin **ECL** library `lib-redis`. i.e.
+```
+IMPORT sync FROM lib_redis;
+```
+Here is a list of the core plugin **functions**.
+
+###Set
+```
+SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+```
+
+###Get
+```
+INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+```
+
+###Utility
+```
+BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+Del(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000)
+```
+
+The core points to note here are:
+   * There is a **SET** and **GET** function associated with each fundamental **ECL** type. These must be used for and with their correct *value* types! Miss-use *should* result
+   in an runtime exception, however, this is only conditional on having the value retrieved from the server fitting into memory of the requested type. E.g. it is possible for a
+   STRING of length 8, set with SetString, being successfully retrieved from the cache via GetInteger without an **ECL** exception being thrown.
+   * `CONST VARSTRING options` passes the server **IP** and **port** to the plugin in the *strict* format - `--SERVER=<ip>:<port>`. If `options` is empty, the default
+   127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
+   * `UNSIGNED timeout` has units micro seconds and has a default value of 1 second. This is not a timeout duration for an entire plugin call but rather that set for each
+   communication transaction with the redis server. *c.f.* 'Behaviour and Implementation Details' below.
+
+###The redisServer MODULE
+To avoid the combersom and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
+*wrap* the above functions.
+```
+IMPORT redisServer FROM lib_redis;
+myRedis := redisServer('--SERVER=127.0.0.1:6379', 'foobared');
+myRedis.SetString('myKey', 'supercalifragilisticexpialidocious');
+myRedis.GetString('myKey');
+```
+
+###A Redis 'Database'
+The notion of a *database* within a redis cache is a that of an UNSIGNED INTEGER, effectively partitioning the cache such that it may contain an identical key per database e.g.
+```
+myRedis.SetString('myKey', 'foo', 0);
+myRedis.SetString('myKey', 'bar', 1);
+
+myRedis.GetString('myKey', 'foo', 0);//returns 'foo'
+myRedis.GetString('myKey', 'bar', 1);//returns 'bar'
+```
+*Note:* that the default database is 0.
+
+
+Race Retrieval and Locking Keys
+-------------------------------
+A common use of external caching systems such as **redis** is for temporarily storing data that may be expensive, computationally or otherwise, to obtain and thus doing so
+*only once* is paramount. In such a scenario it is possible (in cases usual) for multiple clients/requests to *hit* the cache simultaneously and upon finding that the data
+requested has not yet been stored, it is desired that only one of such requests obtain the new value and then store it for the others to then also obtain (from the cache).
+This plugin offers a solution to such a problem via the `locking` MODULE within the `redisServer` MODULE. This module contains only three function categories - the
+`SET` and `GET` functions for **STRING**, **UTF8**, and **UNICODE** (i.e. only those that return empty strings) and lastley, an auxiliary function `Unlock` used to manually
+unlock locked keys as it be discussed.
+
+The principle here is based around a *cache miss* in which a requested key does not exist, the first requester (*race winner*) 'locks' the key in an atomic fashion.
+Any other simultaneous requester (*race loser*) finds that the key exists but has been locked and thus **SUBSCRIBES** to the key awaiting a **PUBLICATION** message
+from the *race-winner* that the value has been set. Such a paradigm is well suited by redis due to its efficiently implemented **PUB-SUB** infrastructure.
+
+###An ECL Example
+```c
+IMPORT redisServer FROM lib_redis
+
+myRedis := redisServer('--SERVER=127.0.0.1:6379');
+
+STRING poppins := 'supercalifragilisticexpialidocious'; //Value to externally compute/retrieve from 3rd party vendor.
+
+myFunc(STRING key, UNSIGNED database) := FUNCTION  //Function for computing/retrieving a value.
+  return myRedis.GetString(key, database);
+END;
+
+SEQUENTIAL(
+    myRedis.SetString('poppins', poppins, 3),
+
+    //If the key does not exist it will 'lock' the key and retrun an empty STRING.
+    STRING value := myRedis.locking.GetString('supercali- what?');
+    //All locking.Set<type>() return the value passed in as the 2nd parameter.
+    IF (LENGTH(value) == 0, myRedis.locking.SetString('supercali- what?', myFunc('poppins', 3)), value);
+    );
+```
+
+Behaviour and Implementation Details
+------------------------------------
+A few notes to point out here:
+   * PUB-SUB channels are not disconnected from the keyspace as they are in their native redis usage. The key itself is used as the lock with its value being set as the channel to later
+   PUBLISH on or SUBSCRIBE to. This channel is unique to the *server-IP*, *cache-port*, *key*, and *database*. It is in fact the underscore concatenation of all four, prefixed with the string **redis_ecl_lock**.
+   * The lock itself is set to expire with a duration equal to the `timeout` value passed to the `locking.Exists(<key>` function (default 1s).
+   * It is possible to manually 'unlock' this lock (`DEL` the key) via the `locking.Unlock(<key>)` function. *Note:* this function will fail on any communication or reply error however, 
+   it will **silently fail**, leaving the lock to expire, if the server observes any change to the key during the function call duration.
+   * When the *race-winner* publishes, it actually publishes the value itself and that any subscriber will then obtain the key-value in this fashion. Therefore, not requiring an
+    additional `GET` and possible further race conditions in doing so. *Note:* This does however, mean that it is possible for the actual redis `SET` to fail on one client/process,
+    have the key-value received on another, and yet, the key-value still does not exist on the cache.
+   * At present the 'lock' is not as such an actual lock, as only the `locking.Get<type>` functions acknowledge it. By current implementation it is better thought as a flag for
+   `GET` to wait and subscribe. I.e. the locked key can be deleted and re-set just as any other key can be.

+ 74 - 49
plugins/redis/lib_redis.ecllib

@@ -17,57 +17,82 @@
 
 
 EXPORT sync := SERVICE : plugin('redis'), namespace('RedisPlugin')
-  SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUChar';
-  SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetStr';
-  SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUtf8';
-  SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetBool';
-  SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetReal';
-  SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetInt';
-  SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUInt';
-  SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetData';
-
-  INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetInt8';
-  UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUint8';
-  STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetStr';
-  UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUChar';
-  UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUtf8';
-  BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetBool';
-  REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetDouble';
-  DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetData';
-
-  BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='RExist';
-  FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RClear';
-  Del(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RDel';
-  Delete(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RDel';
-  Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RPersist';
-  Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED4 expire, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RExpire';
-  INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='RDBSize';
+  SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUChar';
+  SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetStr';
+  SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUtf8';
+  SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetBool';
+  SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetReal';
+  SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetInt';
+  SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUInt';
+  SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetData';
+
+  INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetInt8';
+  UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetUint8';
+  STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetStr';
+  UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetUChar';
+  UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetUtf8';
+  BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetBool';
+  REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetDouble';
+  DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncRGetData';
+
+  BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RExist';
+  FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RClear';
+  Del(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RDel';
+  Delete(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RDel';
+  Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RPersist';
+  Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RExpire';
+  INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='RDBSize';
+END;
+
+EXPORT locking := SERVICE : plugin('redis'), namespace('RedisPlugin')
+  STRING   SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncLockRSetStr';
+  UNICODE  SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncLockRSetUChar';
+  UTF8     SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncLockRSetUtf8';
+
+  STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,once,context,entrypoint='SyncLockRGetStr';
+  UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,once,context,entrypoint='SyncLockRGetUChar';
+  UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,once,context,entrypoint='SyncLockRGetUtf8';
+
+  Unlock(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000000) : cpp,action,context,entrypoint='SyncLockRUnlock';
 END;
 
-EXPORT RedisSync    (CONST VARSTRING options, CONST VARSTRING password = '', unsigned timeout = 1000000) := MODULE
-  EXPORT  SetUnicode(CONST VARSTRING key, CONST UNICODE value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnicode (key, value, options, database, expire, password, timeout);
-  EXPORT   SetString(CONST VARSTRING key, CONST STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetString  (key, value, options, database, expire, password, timeout);
-  EXPORT     SetUtf8(CONST VARSTRING key, CONST UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUtf8    (key, value, options, database, expire, password, timeout);
-  EXPORT  SetBoolean(CONST VARSTRING key, CONST BOOLEAN value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetBoolean (key, value, options, database, expire, password, timeout);
-  EXPORT     SetReal(CONST VARSTRING key, CONST REAL value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetReal    (key, value, options, database, expire, password, timeout);
-  EXPORT  SetInteger(CONST VARSTRING key, CONST INTEGER value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetInteger (key, value, options, database, expire, password, timeout);
-  EXPORT SetUnsigned(CONST VARSTRING key, CONST UNSIGNED value, UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnsigned(key, value, options, database, expire, password, timeout);
-  EXPORT     SetData(CONST VARSTRING key, CONST DATA value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetData    (key, value, options, database, expire, password, timeout);
-
-  EXPORT  GetUnicode(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetUnicode(key, options, database, password, timeout);
-  EXPORT   GetString(CONST VARSTRING key, UNSIGNED database = 0) :=   sync.GetString(key, options, database, password, timeout);
-  EXPORT     GetUtf8(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetUtf8(key, options, database, password, timeout);
-  EXPORT  GetBoolean(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetBoolean(key, options, database, password, timeout);
-  EXPORT     GetReal(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetReal(key, options, database, password, timeout);
-  EXPORT  GetInteger(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetInteger(key, options, database, password, timeout);
-  EXPORT GetUnsigned(CONST VARSTRING key, UNSIGNED database = 0) := sync.GetUnsigned(key, options, database, password, timeout);
-  EXPORT     GetData(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetData(key, options, database, password, timeout);
-
-  EXPORT Exists(CONST VARSTRING key, UNSIGNED database = 0) := sync.Exists(key, options, database, password, timeout);
+EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED timeout = 1000000) := MODULE
+  EXPORT  SetUnicode(VARSTRING key, UNICODE value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnicode (key, value, options, database, expire, password, timeout);
+  EXPORT   SetString(VARSTRING key, STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetString  (key, value, options, database, expire, password, timeout);
+  EXPORT     SetUtf8(VARSTRING key, UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUtf8    (key, value, options, database, expire, password, timeout);
+  EXPORT  SetBoolean(VARSTRING key, BOOLEAN value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetBoolean (key, value, options, database, expire, password, timeout);
+  EXPORT     SetReal(VARSTRING key, REAL value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetReal    (key, value, options, database, expire, password, timeout);
+  EXPORT  SetInteger(VARSTRING key, INTEGER value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetInteger (key, value, options, database, expire, password, timeout);
+  EXPORT SetUnsigned(VARSTRING key, UNSIGNED value, UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnsigned(key, value, options, database, expire, password, timeout);
+  EXPORT     SetData(VARSTRING key, DATA value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetData    (key, value, options, database, expire, password, timeout);
+
+  EXPORT  GetUnicode(VARSTRING key, UNSIGNED database = 0) :=  sync.GetUnicode(key, options, database, password, timeout);
+  EXPORT   GetString(VARSTRING key, UNSIGNED database = 0) :=   sync.GetString(key, options, database, password, timeout);
+  EXPORT     GetUtf8(VARSTRING key, UNSIGNED database = 0) :=     sync.GetUtf8(key, options, database, password, timeout);
+  EXPORT  GetBoolean(VARSTRING key, UNSIGNED database = 0) :=  sync.GetBoolean(key, options, database, password, timeout);
+  EXPORT     GetReal(VARSTRING key, UNSIGNED database = 0) :=     sync.GetReal(key, options, database, password, timeout);
+  EXPORT  GetInteger(VARSTRING key, UNSIGNED database = 0) :=  sync.GetInteger(key, options, database, password, timeout);
+  EXPORT GetUnsigned(VARSTRING key, UNSIGNED database = 0) := sync.GetUnsigned(key, options, database, password, timeout);
+  EXPORT     GetData(VARSTRING key, UNSIGNED database = 0) :=     sync.GetData(key, options, database, password, timeout);
+
+  EXPORT Exists(VARSTRING key, UNSIGNED database = 0) := sync.Exists(key, options, database, password, timeout);
   EXPORT FlushDB(UNSIGNED database = 0) := sync.FlushDB(options, database, password, timeout);
-  EXPORT Del(CONST VARSTRING key, UNSIGNED database = 0) := sync.Del(key, options, database, password, timeout);
-  EXPORT Delete(CONST VARSTRING key, UNSIGNED database = 0) := sync.Delete(key, options, database, password, timeout);
-  EXPORT Persist(CONST VARSTRING key, UNSIGNED database = 0) := sync.Persist(key, options, database, password, timeout);
-  EXPORT Expire(CONST VARSTRING key, UNSIGNED4 expire, UNSIGNED database = 0)  := sync.Expire(key, options, expire, database, password, timeout);
+  EXPORT Del(VARSTRING key, UNSIGNED database = 0) := sync.Del(key, options, database, password, timeout);
+  EXPORT Delete(VARSTRING key, UNSIGNED database = 0) := sync.Delete(key, options, database, password, timeout);
+  EXPORT Persist(VARSTRING key, UNSIGNED database = 0) := sync.Persist(key, options, database, password, timeout);
+  EXPORT Expire(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire)  := sync.Expire(key, options, database, expire, password, timeout);
   EXPORT DBSize(UNSIGNED database = 0) := sync.DBSize(options, database, password, timeout);
+
+  //locking
+  EXPORT locking := MODULE
+    EXPORT  SetUnicode(VARSTRING key, UNICODE value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := locking.SetUnicode (key, value, options, database, expire, password, timeout);
+    EXPORT   SetString(VARSTRING key, STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := locking.SetString  (key, value, options, database, expire, password, timeout);
+    EXPORT     SetUtf8(VARSTRING key, UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := locking.SetUtf8    (key, value, options, database, expire, password, timeout);
+
+    EXPORT  GetUnicode(VARSTRING key, UNSIGNED database = 0) :=  locking.GetUnicode(key, options, database, password, timeout);
+    EXPORT   GetString(VARSTRING key, UNSIGNED database = 0) :=   locking.GetString(key, options, database, password, timeout);
+    EXPORT     GetUtf8(VARSTRING key, UNSIGNED database = 0) :=     locking.GetUtf8(key, options, database, password, timeout);
+
+    EXPORT Unlock(VARSTRING key, UNSIGNED database = 0) := locking.unlock(key, options, database, password, timeout);
+   END;
 END;

+ 904 - 0
plugins/redis/redis.cpp

@@ -0,0 +1,904 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "jthread.hpp"
+#include "eclrtl.hpp"
+#include "jstring.hpp"
+#include "redis.hpp"
+#include "hiredis/hiredis.h"
+
+#define REDIS_VERSION "redis plugin 1.0.0"
+ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
+{
+    if (pb->size != sizeof(ECLPluginDefinitionBlock))
+        return false;
+
+    pb->magicVersion = PLUGIN_VERSION;
+    pb->version = REDIS_VERSION;
+    pb->moduleName = "lib_redis";
+    pb->ECL = NULL;
+    pb->flags = PLUGIN_IMPLICIT_MODULE;
+    pb->description = "ECL plugin library for the C API hiredis\n";
+    return true;
+}
+
+namespace RedisPlugin {
+
+class Connection;
+static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
+static __thread Connection * cachedConnection;
+static __thread ThreadTermFunc threadHookChain;
+
+StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
+{
+    if (expire > 0)
+        buffer.append(" EX ").append(expire/1000000);
+    return buffer;
+}
+class Reply : public CInterface
+{
+public :
+    inline Reply() : reply(NULL) { };
+    inline Reply(void * _reply) : reply((redisReply*)_reply) { }
+    inline Reply(redisReply * _reply) : reply(_reply) { }
+    inline ~Reply()
+    {
+        if (reply)
+            freeReplyObject(reply);
+    }
+
+    static Reply * createReply(void * _reply) { return new Reply(_reply); }
+    inline const redisReply * query() const { return reply; }
+    void setClear(redisReply * _reply)
+    {
+        if (reply)
+            freeReplyObject(reply);
+        reply = _reply;
+    }
+
+private :
+    redisReply * reply;
+};
+typedef Owned<RedisPlugin::Reply> OwnedReply;
+
+class Connection : public CInterface
+{
+public :
+    Connection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
+    Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout);
+    ~Connection()
+    {
+        if (context)
+            redisFree(context);
+    }
+    static Connection * createConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
+
+    //set
+    template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
+    template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
+    //get
+    template <class type> void get(ICodeContext * ctx, const char * key, type & value);
+    template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
+
+    //-------------------------------LOCKING------------------------------------------------
+    void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
+    void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password);
+    void unlock(ICodeContext * ctx, const char * key);
+    //--------------------------------------------------------------------------------------
+
+    void persist(ICodeContext * ctx, const char * key);
+    void expire(ICodeContext * ctx, const char * key, unsigned _expire);
+    void del(ICodeContext * ctx, const char * key);
+    void clear(ICodeContext * ctx);
+    unsigned __int64 dbSize(ICodeContext * ctx);
+    bool exists(ICodeContext * ctx, const char * key);
+
+protected :
+    void parseOptions(ICodeContext * ctx, const char * _options);
+    void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
+    void selectDB(ICodeContext * ctx, unsigned __int64 _database);
+    void authenticate(ICodeContext * ctx, const char * password);
+    void resetContextErr();
+    void readReply(Reply * reply);
+    void readReplyAndAssert(Reply * reply, const char * msg);
+    void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
+    void assertKey(const redisReply * reply, const char * key);
+    void assertOnError(const redisReply * reply, const char * _msg);
+    void assertOnCommandError(const redisReply * reply, const char * cmd);
+    void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
+    void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
+    void assertConnection();
+    void updateTimeout(unsigned __int64 _timeout);
+    void * allocateAndCopy(const char * src, size_t size);
+    bool isSameConnection(ICodeContext * ctx, const char * password) const;
+
+    //-------------------------------LOCKING------------------------------------------------
+    void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
+    void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password);
+    void encodeChannel(StringBuffer & channel, const char * key) const;
+    bool lock(ICodeContext * ctx, const char * key, const char * channel);
+    //--------------------------------------------------------------------------------------
+
+protected :
+    redisContext * context;
+    StringAttr options;
+    StringAttr ip;
+    unsigned serverIpPortPasswordHash;
+    int port;
+    unsigned __int64 timeout;
+    unsigned __int64 database;
+};
+
+//The following class is here to ensure destruction of the cachedConnection within the main thread
+//as this is not handled by the thread hook mechanism.
+static class mainThreadCachedConnection
+{
+public :
+    mainThreadCachedConnection() { }
+    ~mainThreadCachedConnection()
+    {
+        if (cachedConnection)
+            cachedConnection->Release();
+    }
+} mainThread;
+
+static void releaseContext()
+{
+    if (cachedConnection)
+    {
+        cachedConnection->Release();
+        cachedConnection = NULL;
+    }
+    if (threadHookChain)
+    {
+        (*threadHookChain)();
+        threadHookChain = NULL;
+    }
+}
+Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
+  : database(0), timeout(_timeout), port(0)
+{
+    serverIpPortPasswordHash = hashc((const unsigned char*)password, strlen(password), 0);
+    serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
+    options.set(_options, strlen(_options));
+    parseOptions(ctx, _options);
+    connect(ctx, _database, password);
+}
+Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
+  : database(0), timeout(_timeout), serverIpPortPasswordHash(_serverIpPortPasswordHash), port(_port)
+{
+    options.set(_options, strlen(_options));
+    ip.set(_ip, strlen(_ip));
+    connect(ctx, _database, password);
+}
+void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
+{
+    struct timeval to = { timeout/1000000, timeout%1000000 };
+    context = redisConnectWithTimeout(ip.str(), port, to);
+    redisSetTimeout(context, to);
+    assertConnection();
+
+    //The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
+    //such that they may be pipelined to save an extra round trip to the server and back.
+    if (password && *password)
+      redisAppendCommand(context, "AUTH %b", password, strlen(password));
+
+    if (database != _database)
+    {
+        VStringBuffer cmd("SELECT %" I64F "u", database);
+        redisAppendCommand(context, cmd.str());
+    }
+
+    //Now read replies.
+    OwnedReply reply = new Reply();
+    if (password && *password)
+        readReplyAndAssert(reply, "server authentication failed");
+
+    if (database != _database)
+    {
+        readReplyAndAssert(reply, "request to SELECT database failed");
+        database = _database;
+    }
+}
+bool Connection::isSameConnection(ICodeContext * ctx, const char * password) const
+{
+    unsigned hash = hashc((const unsigned char*)options.str(), options.length(), hashc((const unsigned char*)password, strlen(password), 0));
+    return (serverIpPortPasswordHash == hash);
+}
+void * Connection::allocateAndCopy(const char * src, size_t size)
+{
+    void * value = rtlMalloc(size);
+    return memcpy(value, src, size);
+}
+void Connection::parseOptions(ICodeContext * ctx, const char * _options)
+{
+    StringArray optionStrings;
+    optionStrings.appendList(_options, " ");
+    ForEachItemIn(idx, optionStrings)
+    {
+        const char *opt = optionStrings.item(idx);
+        if (strncmp(opt, "--SERVER=", 9) == 0)
+        {
+            opt += 9;
+            StringArray splitPort;
+            splitPort.appendList(opt, ":");
+            if (splitPort.ordinality()==2)
+            {
+                ip.set(splitPort.item(0));
+                port = atoi(splitPort.item(1));
+            }
+        }
+        else
+        {
+            VStringBuffer err("RedisPlugin: unsupported option string %s", opt);
+            rtlFail(0, err.str());
+        }
+    }
+    if (ip.isEmpty())
+    {
+        ip.set("localhost");
+        port = 6379;
+        if (ctx)
+        {
+            VStringBuffer msg("Redis Plugin: WARNING - using default server (%s:%d)", ip.str(), port);
+            ctx->logString(msg.str());
+        }
+    }
+    return;
+}
+void Connection::authenticate(ICodeContext * ctx, const char * password)
+{
+    if (password && *password)
+    {
+        OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", password, strlen(password)));
+        assertOnError(reply->query(), "server authentication failed");
+    }
+}
+void Connection::resetContextErr()
+{
+    if (context)
+        context->err = REDIS_OK;
+}
+void Connection::readReply(Reply * reply)
+{
+    redisReply * nakedReply = NULL;
+    redisGetReply(context, (void**)&nakedReply);
+    assertex(reply);
+    reply->setClear(nakedReply);
+}
+void Connection::readReplyAndAssert(Reply * reply, const char * msg)
+{
+    readReply(reply);
+    assertex(reply);
+    assertOnError(reply->query(), msg);
+}
+void Connection::readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key)
+{
+    readReply(reply);
+    assertex(reply);
+    assertOnCommandErrorWithKey(reply->query(), msg, key);
+}
+Connection * Connection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
+{
+    if (!cachedConnection)
+    {
+        cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+        threadHookChain = addThreadTermFunc(releaseContext);
+        return LINK(cachedConnection);
+    }
+
+    if (cachedConnection->isSameConnection(ctx, password))
+    {
+        //MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
+        //At present updateTimeout calls assertConnection.
+        cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
+        cachedConnection->updateTimeout(_timeout);
+        cachedConnection->selectDB(ctx, _database);
+        return LINK(cachedConnection);
+    }
+
+    cachedConnection->Release();
+    cachedConnection = new Connection(ctx, options, _database, password, _timeout);
+    return LINK(cachedConnection);
+}
+void Connection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
+{
+    if (database == _database)
+        return;
+    database = _database;
+    VStringBuffer cmd("SELECT %" I64F "u", database);
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
+    assertOnCommandError(reply->query(), "SELECT");
+}
+void Connection::updateTimeout(unsigned __int64 _timeout)
+{
+    if (timeout == _timeout)
+        return;
+    assertConnection();
+    timeout = _timeout;
+    struct timeval to = { timeout/1000000, timeout%1000000 };
+    assertex(context);
+    if (redisSetTimeout(context, to) != REDIS_OK)
+    {
+        if (context->err)
+        {
+            VStringBuffer msg("RedisPlugin: failed to set timeout - %s", context->errstr);
+            rtlFail(0, msg.str());
+        }
+        else
+            rtlFail(0, "RedisPlugin: failed to set timeout - no message available");
+    }
+}
+void Connection::assertOnError(const redisReply * reply, const char * _msg)
+{
+    if (!reply)//assertex(reply)?
+    {
+        //There should always be a context error if no reply error
+        assertConnection();
+        VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
+        rtlFail(0, msg.str());
+    }
+    else if (reply->type == REDIS_REPLY_ERROR)
+    {
+        if (strncmp(reply->str, "NOAUTH", 6) == 0)
+        {
+            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
+            rtlFail(0, msg.str());
+        }
+        else
+        {
+            VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
+            rtlFail(0, msg.str());
+        }
+    }
+}
+void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
+{
+    if (!reply)//assertex(reply)?
+    {
+        //There should always be a context error if no reply error
+        assertConnection();
+        VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, key, database);
+        rtlFail(0, msg.str());
+    }
+    else if (reply->type == REDIS_REPLY_ERROR)
+    {
+        if (strncmp(reply->str, "NOAUTH", 6) == 0)
+        {
+            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
+            rtlFail(0, msg.str());
+        }
+        else
+        {
+            VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
+            rtlFail(0, msg.str());
+        }
+    }
+}
+void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
+{
+    if (!reply)//assertex(reply)?
+    {
+        //There should always be a context error if no reply error
+        assertConnection();
+        VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, database);
+        rtlFail(0, msg.str());
+    }
+    else if (reply->type == REDIS_REPLY_ERROR)
+    {
+        if (strncmp(reply->str, "NOAUTH", 6) == 0)
+        {
+            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
+            rtlFail(0, msg.str());
+        }
+        else
+        {
+            VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
+            rtlFail(0, msg.str());
+        }
+    }
+}
+void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
+{
+    if (!reply)//assertex(reply)?
+    {
+        //There should always be a context error if no reply error
+        assertConnection();
+        VStringBuffer msg("Redis Plugin: ERROR - %s failed with neither 'reply' nor connection error available", cmd);
+        rtlFail(0, msg.str());
+    }
+    else if (reply->type == REDIS_REPLY_ERROR)
+    {
+        if (strncmp(reply->str, "NOAUTH", 6) == 0)
+        {
+            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
+            rtlFail(0, msg.str());
+        }
+        else
+        {
+            VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
+            rtlFail(0, msg.str());
+        }
+    }
+}
+void Connection::assertKey(const redisReply * reply, const char * key)
+{
+    if (reply && reply->type == REDIS_REPLY_NIL)
+    {
+        VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %" I64F "u", key, database);
+        rtlFail(0, msg.str());
+    }
+}
+void Connection::assertConnection()
+{
+    if (!context)
+        rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
+    else if (context->err)
+    {
+        VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip.str(),  port);
+        rtlFail(0, msg.str());
+    }
+}
+void Connection::clear(ICodeContext * ctx)
+{
+    //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
+    OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
+    //NOTE: documented as never failing, but in case
+    assertOnCommandErrorWithDatabase(reply->query(), "FlushDB");
+}
+void Connection::del(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
+    assertOnCommandErrorWithKey(reply->query(), "Del", key);
+}
+void Connection::persist(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
+    assertOnCommandErrorWithKey(reply->query(), "Persist", key);
+}
+void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "EXPIRE %b %u", key, strlen(key), _expire/1000000));
+    assertOnCommandErrorWithKey(reply->query(), "Expire", key);
+}
+bool Connection::exists(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
+    assertOnCommandErrorWithKey(reply->query(), "Exists", key);
+    return (reply->query()->integer != 0);
+}
+unsigned __int64 Connection::dbSize(ICodeContext * ctx)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
+    assertOnCommandErrorWithDatabase(reply->query(), "DBSIZE");
+    return reply->query()->integer;
+}
+//-------------------------------------------SET-----------------------------------------
+//--OUTER--
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    master->set(ctx, key, value, expire);
+}
+//Set pointer types
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    master->set(ctx, key, valueSize, value, expire);
+}
+//--INNER--
+template<class type> void Connection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
+{
+    const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
+
+    StringBuffer cmd("SET %b %b");
+    appendExpire(cmd, expire);
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
+    assertOnCommandErrorWithKey(reply->query(), "SET", key);
+}
+template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
+{
+    const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
+
+    StringBuffer cmd("SET %b %b");
+    appendExpire(cmd, expire);
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
+    assertOnCommandErrorWithKey(reply->query(), "SET", key);
+}
+//-------------------------------------------GET-----------------------------------------
+//--OUTER--
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    master->get(ctx, key, returnValue);
+}
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    master->get(ctx, key, returnSize, returnValue);
+}
+//--INNER--
+template<class type> void Connection::get(ICodeContext * ctx, const char * key, type & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+
+    assertOnError(reply->query(), "GET");
+    assertKey(reply->query(), key);
+
+    size_t returnSize = reply->query()->len;
+    if (sizeof(type)!=returnSize)
+    {
+        VStringBuffer msg("RedisPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
+        rtlFail(0, msg.str());
+    }
+    memcpy(&returnValue, reply->query()->str, returnSize);
+}
+template<class type> void Connection::get(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+
+    assertOnError(reply->query(), "GET");
+    assertKey(reply->query(), key);
+
+    returnSize = reply->query()->len;
+    returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
+}
+//--------------------------------------------------------------------------------
+//                           ECL SERVICE ENTRYPOINTS
+//--------------------------------------------------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    master->clear(ctx);
+}
+ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    return master->exists(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    master->del(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    master->persist(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, unsigned _expire, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    master->expire(ctx, key, _expire);
+}
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    return master->dbSize(ctx);
+}
+//-----------------------------------SET------------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout);
+}
+//-------------------------------------GET----------------------------------------
+ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    bool value;
+    SyncRGet(ctx, options, key, value, database, password, timeout);
+    return value;
+}
+ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    double value;
+    SyncRGet(ctx, options, key, value, database, password, timeout);
+    return value;
+}
+ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    signed __int64 value;
+    SyncRGet(ctx, options, key, value, database, password, timeout);
+    return value;
+}
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    unsigned __int64 value;
+    SyncRGet(ctx, options, key, value, database, password, timeout);
+    return value;
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t _returnSize;
+    SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
+    returnSize = static_cast<size32_t>(_returnSize);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t returnSize;
+    SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
+    returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t returnSize;
+    SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
+    returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t _returnSize;
+    SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
+    returnSize = static_cast<size32_t>(_returnSize);
+}
+//----------------------------------LOCK------------------------------------------
+//-----------------------------------SET-----------------------------------------
+//Set pointer types
+void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
+    master->lockSet(ctx, key, valueSize, value, expire);
+}
+//--INNER--
+void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire)
+{
+    const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
+    handleLockOnSet(ctx, key, _value, (size_t)valueSize, expire);
+}
+//-------------------------------------------GET-----------------------------------------
+//--OUTER--
+void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
+    master->lockGet(ctx, key, returnSize, returnValue, password);
+}
+//--INNER--
+void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password)
+{
+    MemoryAttr retVal;
+    handleLockOnGet(ctx, key, &retVal, password);
+    returnSize = retVal.length();
+    returnValue = reinterpret_cast<char*>(retVal.detach());
+}
+//---------------------------------------------------------------------------------------
+void Connection::encodeChannel(StringBuffer & channel, const char * key) const
+{
+    channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database).append("_").append(ip.str()).append("_").append(port);
+}
+bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel)
+{
+    StringBuffer cmd("SET %b %b NX EX ");
+    cmd.append(timeout/1000000);
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
+    assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
+
+    if (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0)
+        return true;
+    return false;
+}
+void Connection::unlock(ICodeContext * ctx, const char * key)
+{
+    //WATCH key, if altered between WATCH and EXEC abort all commands inbetween
+    redisAppendCommand(context, "WATCH %b", key, strlen(key));
+    redisAppendCommand(context, "GET %b", key, strlen(key));
+
+    //Read replies
+    OwnedReply reply = new Reply();
+    readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//WATCH reply
+    readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//GET reply
+
+    //check if locked
+    if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
+    {
+        //MULTI - all commands between MULTI and EXEC are considered an atomic transaction on the server
+        redisAppendCommand(context, "MULTI");//MULTI
+        redisAppendCommand(context, "DEL %b", key, strlen(key));//DEL
+        redisAppendCommand(context, "EXEC");//EXEC
+#if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
+        sleep(10);
+#endif
+        readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//MULTI reply
+        readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//DEL reply
+        readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//EXEC reply
+    }
+    //If the above is aborted, let the lock expire.
+}
+void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password)
+{
+    StringBuffer channel;
+    encodeChannel(channel, key);
+
+    //Query key and set lock if non existent
+    if (lock(ctx, key, channel.str()))
+        return;
+
+    //SUB before GET
+    //Requires separate connection from GET so that the replies are not mangled. This could be averted
+    Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeout);
+    OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), channel.length()));
+    assertOnCommandErrorWithKey(reply->query(), "GET", key);
+    if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
+    {
+        VStringBuffer msg("Redis Plugin: ERROR - GET '%s' on database %" I64F "u failed : failed to register SUB", key, database);
+        rtlFail(0, msg.str());
+    }
+
+#if(0)
+    {
+    OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
+    assertOnError(pubReply->query(), "pub fail");
+    }
+#endif
+
+    //Now GET
+    reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
+    assertOnCommandErrorWithKey(reply->query(), "GET", key);
+    assertKey(reply->query(), key);
+
+#if(0)
+    {
+    OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
+    assertOnError(pubReply->query(), "pub fail");
+    }
+#endif
+
+    //Check if returned value is locked
+    if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
+    {
+        //Not locked so return value
+        retVal->set(reply->query()->len, reply->query()->str);
+        return;
+    }
+    else
+    {
+        //Check that we SUBSCRIBEd to the correct channel (which could have been manually SET).
+        if (strcmp(reply->query()->str, channel) !=0 )
+        {
+            VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to.", key, database, reply->query()->str);
+            rtlFail(0, msg.str());
+            //MORE: We could attempt to recover at this stage by subscribing to the channel that the key was actually locked with.
+            //However, we may have missed the massage publication already or by then.
+            //If we ever changed the semantics of the 'timeout' to be that of these plugin functions rather than each redis call, we might as well
+            //subscribe again if there was time left on the clock.
+            //Since they are not, we could, though is this desirable behaviour?
+        }
+#if(0)//Added to allow for manual pub testing via redis-cli
+        struct timeval to = { 10, 0 };//10secs
+        redisSetTimeout(subConnection->context, to);
+#endif
+        //Locked so SUBSCRIBE
+        redisReply * nakedReply = NULL;
+        bool err = redisGetReply(subConnection->context, (void**)&nakedReply);
+        reply->setClear(nakedReply);
+        if (err != REDIS_OK)
+            rtlFail(0, "RedisPlugin: ERROR - GET timed out.");
+        assertOnCommandErrorWithKey(nakedReply, "GET", key);
+        if (nakedReply->type == REDIS_REPLY_ARRAY && strcmp("message", nakedReply->element[0]->str) == 0)
+        {
+            retVal->set(nakedReply->element[2]->len, nakedReply->element[2]->str);//return the published value rather than another (WATCHed) GET.
+            return;
+        }
+    }
+    throwUnexpected();
+}
+void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
+{
+    StringBuffer cmd("SET %b %b");
+    RedisPlugin::appendExpire(cmd, expire);
+
+    //Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
+    //It is possible for a process/call to 'own' a lock and store this info in the LockObject, however, this prevents sharing between clients.
+    redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
+    StringBuffer channel;
+    encodeChannel(channel, key);
+    redisAppendCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), value, size);//PUB
+
+    //Now read and assert replies
+    OwnedReply replyContainer = new Reply();
+    readReplyAndAssertWithKey(replyContainer, "SET", key);//SET reply
+    readReplyAndAssertWithKey(replyContainer, "PUB for the key", key);//PUB reply
+
+    //NOTE: Pipelining the above commands may not be the desired behaviour but instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
+    //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. However, this may not be expected behaviour
+    //as it's now possible for the key-value to actually exists in the cache when it was retrieved via redis plugin get function. This is documented in the README.
+    //Further more, it is possible that the locked value and thus the channel stored within the key is not that expected, i.e. computed via encodeChannel() (e.g.
+    //if set by a non-conforming external client/process). It is however, possible to account for this via using a GETSET instead of just the SET. This returns the old
+    //value stored, this can then be checked if it is a lock (i.e. has at least the "redis_key_lock prefix"), if it doesn't, PUB on the channel from encodeChannel(),
+    //otherwise PUB on the value retrieved from GETSET or possibly only if it at least has the prefix "redis_key_lock".
+    //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing.
+}
+//--------------------------------------------------------------------------------
+//                           ECL SERVICE ENTRYPOINTS
+//--------------------------------------------------------------------------------
+//-----------------------------------SET------------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    SyncLockRSet(ctx, options, key, valueSize, value, expire,  database, password, timeout);
+    returnSize = valueSize;
+    returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    unsigned valueSize = (valueLength)*sizeof(UChar);
+    SyncLockRSet(ctx, options, key, valueSize, (char*)value, expire, database, password, timeout);
+    returnLength= valueLength;
+    returnValue = (UChar*)memcpy(rtlMalloc(valueSize), (void*)value, valueSize);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
+{
+    unsigned valueSize = rtlUtf8Size(valueLength, value);
+    SyncLockRSet(ctx, options, key, valueSize, value, expire, database, password, timeout);
+    returnLength = valueLength;
+    returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
+}
+//-------------------------------------GET----------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t _returnSize;
+    SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
+    returnSize = static_cast<size32_t>(_returnSize);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t returnSize;
+    char  * _returnValue;
+    SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, password, timeout);
+    returnValue = (UChar*)_returnValue;
+    returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    size_t returnSize;
+    SyncLockRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
+    returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
+    master->unlock(ctx, key);
+}
+}//close namespace

+ 29 - 43
plugins/redis/redissync.hpp

@@ -18,51 +18,26 @@
 #ifndef ECL_REDIS_SYNC_INCL
 #define ECL_REDIS_SYNC_INCL
 
-#include "redisplugin.hpp"
-
-namespace RedisPlugin
-{
-class SyncConnection : public Connection
-{
-public :
-    SyncConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout);
-    SyncConnection(ICodeContext * ctx, RedisServer * _server, unsigned __int64 database, const char * pswd);
-    ~SyncConnection()
-    {
-        if (context)
-            redisFree(context);
-    }
-    static SyncConnection * createConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout);
-
-    //set
-    template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
-    template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueLength, const type * value, unsigned expire);
-    //get
-    template <class type> void get(ICodeContext * ctx, const char * key, type & value);
-    template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueLength, type * & value);
-    void getVoidPtrLenPair(ICodeContext * ctx, const char * key, size_t & valueLength, void * & value);
-    void persist(ICodeContext * ctx, const char * key);
-    void expire(ICodeContext * ctx, const char * key, unsigned _expire);
-    void del(ICodeContext * ctx, const char * key);
-    void clear(ICodeContext * ctx);
-    unsigned __int64 dbSize(ICodeContext * ctx);
-    bool exists(ICodeContext * ctx, const char * key);
-
-protected :
-    void connect(ICodeContext * ctx, unsigned __int64 _database, const char * pswd);
-    void selectDB(ICodeContext * ctx, unsigned __int64 _database);
-    void authenticate(ICodeContext * ctx, const char * pswd);
-    void resetContextErr();
+#ifdef _WIN32
+#define ECL_REDIS_CALL _cdecl
+#ifdef ECL_REDIS_EXPORTS
+#define ECL_REDIS_API __declspec(dllexport)
+#else
+#define ECL_REDIS_API __declspec(dllimport)
+#endif
+#else
+#define ECL_REDIS_CALL
+#define ECL_REDIS_API
+#endif
 
-    virtual void updateTimeout(unsigned __int64 _timeout);
-    virtual void assertOnError(const redisReply * reply, const char * _msg);
-    virtual void assertConnection();
-    virtual void logServerStats(ICodeContext * ctx);
+#include "hqlplugins.hpp"
+#include "eclhelper.hpp"
 
-protected :
-    redisContext * context;
-};
-}//close namespace
+extern "C"
+{
+    ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
+    ECL_REDIS_API void setPluginContext(IPluginContext * _ctx);
+}
 
 extern "C++"
 {
@@ -93,6 +68,17 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL RPersist(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL RExpire (ICodeContext * _ctx, const char * key, const char * options, unsigned expire, unsigned __int64 database, const char * pswd, unsigned timeout);
     ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize (ICodeContext * _ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+
+    //--------------------------SET----------------------------------------
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetUtf8 (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetStr  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    //--------------------------GET----------------------------------------
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+
+    ECL_REDIS_API bool ECL_REDIS_CALL SyncLockRMissThenLock(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout);
 }
 }
 #endif

+ 0 - 114
plugins/redis/redisplugin.cpp

@@ -1,114 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#include "platform.h"
-#include "eclrtl.hpp"
-#include "jstring.hpp"
-#include "redisplugin.hpp"
-
-#define REDIS_VERSION "redis plugin 1.0.0"
-
-ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
-{
-    if (pb->size != sizeof(ECLPluginDefinitionBlock))
-        return false;
-
-    pb->magicVersion = PLUGIN_VERSION;
-    pb->version = REDIS_VERSION;
-    pb->moduleName = "lib_redis";
-    pb->ECL = NULL;
-    pb->flags = PLUGIN_IMPLICIT_MODULE;
-    pb->description = "ECL plugin library for the C API hiredis\n";
-    return true;
-}
-
-namespace RedisPlugin {
-
-StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
-{
-    if (expire > 0)
-        buffer.append(" EX ").append(expire);
-    return buffer;
-}
-
-void RedisServer::parseOptions(ICodeContext * ctx, const char * _options)
-{
-    StringArray optionStrings;
-    optionStrings.appendList(_options, " ");
-    ForEachItemIn(idx, optionStrings)
-    {
-        const char *opt = optionStrings.item(idx);
-        if (strncmp(opt, "--SERVER=", 9) == 0)
-        {
-            opt += 9;
-            StringArray splitPort;
-            splitPort.appendList(opt, ":");
-            if (splitPort.ordinality()==2)
-            {
-                ip.set(splitPort.item(0));
-                port = atoi(splitPort.item(1));
-            }
-        }
-        else
-        {
-            VStringBuffer err("RedisPlugin: unsupported option string %s", opt);
-            rtlFail(0, err.str());
-        }
-    }
-    if (ip.isEmpty())
-    {
-        ip.set("localhost");
-        port = 6379;
-        if (ctx)
-        {
-            VStringBuffer msg("Redis Plugin: WARNING - using default server (%s:%d)", ip.str(), port);
-            ctx->logString(msg.str());
-        }
-    }
-    return;
-}
-Connection::Connection(ICodeContext * ctx, const char * _options, const char * pswd, unsigned __int64 _timeout) : alreadyInitialized(false), database(0), timeout(_timeout)
-{
-    server.setown(new RedisServer(ctx, _options, pswd));
-}
-Connection::Connection(ICodeContext * ctx, RedisServer * _server) : alreadyInitialized(false), database(0), timeout(0)
-{
-    server.setown(_server);
-}
-bool Connection::isSameConnection(ICodeContext * ctx, unsigned hash) const
-{
-    return server->isSame(ctx, hash);
-}
-void * Connection::allocateAndCopy(const char * src, size_t size)
-{
-    void * value = rtlMalloc(size);
-    return memcpy(value, src, size);
-}
-const char * Connection::appendIfKeyNotFoundMsg(const redisReply * reply, const char * key, StringBuffer & target) const
-{
-    if (reply && reply->type == REDIS_REPLY_NIL)
-        target.append("(key: '").append(key).append("') ");
-    return target.str();
-}
-void Connection::init(ICodeContext * ctx)
-{
-    logServerStats(ctx);
-    alreadyInitialized = true;
-}
-}//close namespace
-
-

+ 0 - 126
plugins/redis/redisplugin.hpp

@@ -1,126 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#ifndef ECL_REDIS_INCL
-#define ECL_REDIS_INCL
-
-#ifdef _WIN32
-#define ECL_REDIS_CALL _cdecl
-#ifdef ECL_REDIS_EXPORTS
-#define ECL_REDIS_API __declspec(dllexport)
-#else
-#define ECL_REDIS_API __declspec(dllimport)
-#endif
-#else
-#define ECL_REDIS_CALL
-#define ECL_REDIS_API
-#endif
-
-#include "jhash.hpp"
-#include "hqlplugins.hpp"
-#include "eclhelper.hpp"
-#include "jexcept.hpp"
-#include "hiredis/hiredis.h"
-
-extern "C"
-{
-    ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
-    ECL_REDIS_API void setPluginContext(IPluginContext * _ctx);
-}
-
-class StringBuffer;
-
-namespace RedisPlugin {
-#define setFailMsg "'Set' request failed - "
-#define getFailMsg "'Get<type>' request failed - "
-
-StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire);
-
-class RedisServer : public CInterface
-{
-public :
-    RedisServer(ICodeContext * ctx, const char * _options, const char * pswd)
-    {
-        serverIpPortPasswordHash = hashc((const unsigned char*)pswd, strlen(pswd), 0);
-        serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
-        options.set(_options);
-        parseOptions(ctx, _options);
-    }
-    bool isSame(ICodeContext * ctx, unsigned hash) const
-    {
-        return (serverIpPortPasswordHash == hash);
-    }
-    const char * getIp() { return ip.str(); }
-    int getPort() { return port; }
-    void parseOptions(ICodeContext * ctx, const char * _options);
-
-private :
-    unsigned serverIpPortPasswordHash;
-    StringAttr options;
-    StringAttr ip;
-    int port;
-};
-class Connection : public CInterface
-{
-public :
-    Connection(ICodeContext * ctx, const char * _options, const char * pswd, unsigned __int64 _timeout);
-    Connection(ICodeContext * ctx, RedisServer * _server);
-
-    bool isSameConnection(ICodeContext * ctx, unsigned hash) const;
-    const char * ip() const { return server->getIp(); }
-    int port() const { return server->getPort(); }
-
-protected :
-    virtual void assertOnError(const redisReply * reply, const char * _msg) { }
-    virtual void assertConnection() { }
-    virtual void logServerStats(ICodeContext * ctx) { }
-    virtual void updateTimeout(unsigned __int64 _timeout) { }
-
-    const char * appendIfKeyNotFoundMsg(const redisReply * reply, const char * key, StringBuffer & target) const;
-    void * allocateAndCopy(const char * src, size_t size);
-    void init(ICodeContext * ctx);
-
-protected :
-    Owned<RedisServer> server;
-    unsigned __int64 timeout;
-    unsigned __int64 database;
-    bool alreadyInitialized;
-};
-
-class Reply : public CInterface
-{
-public :
-    inline Reply() { reply = NULL; };
-    inline Reply(void * _reply) { reply = (redisReply*)_reply; }
-    inline Reply(redisReply * _reply) { reply = _reply; }
-    inline ~Reply()
-    {
-        if (reply)
-            freeReplyObject(reply);
-    }
-
-    static Reply * createReply(void * _reply) { return new Reply(_reply); }
-    inline const redisReply * query() const { return reply; }
-
-private :
-    redisReply * reply;
-};
-typedef Owned<RedisPlugin::Reply> OwnedReply;
-
-}//close namespace
-
-#endif

+ 0 - 418
plugins/redis/redissync.cpp

@@ -1,418 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#include "platform.h"
-#include "jthread.hpp"
-#include "jhash.hpp"
-#include "eclrtl.hpp"
-#include "jstring.hpp"
-#include "redissync.hpp"
-
-namespace RedisPlugin {
-static __thread SyncConnection * cachedConnection;
-static __thread ThreadTermFunc threadHookChain;
-
-//The following class is here to ensure destruction of the cachedConnection within the main thread
-//as this is not handled by the thread hook mechanism.
-static class mainThreadCachedConnection
-{
-public :
-    mainThreadCachedConnection() { }
-    ~mainThreadCachedConnection()
-    {
-        if (cachedConnection)
-            cachedConnection->Release();
-    }
-} mainThread;
-
-static void releaseContext()
-{
-    if (cachedConnection)
-        cachedConnection->Release();
-    if (threadHookChain)
-    {
-        (*threadHookChain)();
-        threadHookChain = NULL;
-    }
-}
-
-SyncConnection::SyncConnection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
-  : Connection(ctx, _options, pswd, _timeout)
-{
-    connect(ctx, _database, pswd);
-}
-SyncConnection::SyncConnection(ICodeContext * ctx, RedisServer * _server, unsigned __int64 _database, const char * pswd)
-  : Connection(ctx, _server)
-{
-    connect(ctx, _database, pswd);
-}
-void SyncConnection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * pswd)
-{
-    struct timeval to = { timeout/1000000, timeout%1000000 };
-    context = redisConnectWithTimeout(server->getIp(), server->getPort(), to);
-    assertConnection();
-    authenticate(ctx, pswd);
-    selectDB(ctx, _database);
-    init(ctx);
-}
-void SyncConnection::authenticate(ICodeContext * ctx, const char * pswd)
-{
-    if (strlen(pswd) > 0)
-    {
-        OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", pswd, strlen(pswd)));
-        assertOnError(reply->query(), "server authentication failed");
-    }
-}
-void SyncConnection::resetContextErr()
-{
-    if (context)
-        context->err = REDIS_OK;
-}
-SyncConnection * SyncConnection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
-{
-    if (!cachedConnection)
-    {
-        cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
-        threadHookChain = addThreadTermFunc(releaseContext);
-        return LINK(cachedConnection);
-    }
-
-    unsigned optionsPswdHash = hashc((const unsigned char*)options, strlen(options), hashc((const unsigned char*)pswd, strlen(pswd), 0));
-    if (cachedConnection->isSameConnection(ctx, optionsPswdHash))
-    {
-        //MORE: need to check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
-        //At present updateTimeout calls assertConnection.
-        cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
-        cachedConnection->updateTimeout(_timeout);
-        cachedConnection->selectDB(ctx, _database);
-        return LINK(cachedConnection);
-    }
-
-    cachedConnection->Release();
-    cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
-    return LINK(cachedConnection);
-}
-void SyncConnection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
-{
-    if (database == _database)
-        return;
-    database = _database;
-    VStringBuffer cmd("SELECT %" I64F "u", database);
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
-    assertOnError(reply->query(), "'SELECT' request failed");
-}
-void SyncConnection::updateTimeout(unsigned __int64 _timeout)
-{
-    if (timeout == _timeout)
-        return;
-    assertConnection();
-    timeout = _timeout;
-    struct timeval to = { timeout/1000000, timeout%1000000 };
-    if (redisSetTimeout(context, to) != REDIS_OK)
-    {
-        if (context->err)
-        {
-            VStringBuffer msg("RedisPlugin: failed to set timeout - %s", context->errstr);
-            rtlFail(0, msg.str());
-        }
-        else
-            rtlFail(0, "RedisPlugin: failed to set timeout - no message available");
-    }
-}
-void SyncConnection::logServerStats(ICodeContext * ctx)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "INFO"));
-    assertOnError(reply->query(), "'INFO' request failed");
-    StringBuffer stats("Redis Plugin : Server stats - ");
-    stats.newline().append(reply->query()->str).newline();
-    ctx->logString(stats.str());
-}
-void SyncConnection::assertOnError(const redisReply * reply, const char * _msg)
-{
-    if (!reply)
-    {
-        //There should always be a context error if no reply error
-        assertConnection();
-        VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
-        rtlFail(0, msg.str());
-    }
-    else if (reply->type == REDIS_REPLY_ERROR)
-    {
-        if (strncmp(reply->str, "NOAUTH", 6) == 0)
-        {
-            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
-            rtlFail(0, msg.str());
-        }
-        else
-        {
-            VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
-            rtlFail(0, msg.str());
-        }
-    }
-}
-void SyncConnection::assertConnection()
-{
-    if (!context)
-        rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
-    else if (context->err)//This check requires that context->err be reset after a non terminating error was encountered. However, the nature of context->err implies a runtime exception.
-    {
-        VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip(), port());
-        rtlFail(0, msg.str());
-    }
-}
-
-void SyncConnection::clear(ICodeContext * ctx)
-{
-    //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
-    OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
-    //NOTE: documented as never failing, but in case
-    assertOnError(reply->query(), "'FlushDB' request failed");
-}
-void SyncConnection::del(ICodeContext * ctx, const char * key)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
-    assertOnError(reply->query(), "'Del' request failed");
-}
-void SyncConnection::persist(ICodeContext * ctx, const char * key)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
-    assertOnError(reply->query(), "'Persist' request failed");
-}
-void SyncConnection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b %u", key, strlen(key), _expire));
-    assertOnError(reply->query(), "'Expire' request failed");
-}
-bool SyncConnection::exists(ICodeContext * ctx, const char * key)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
-    assertOnError(reply->query(), "'Exists' request failed");
-    return (reply->query()->integer != 0);
-}
-unsigned __int64 SyncConnection::dbSize(ICodeContext * ctx)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
-    assertOnError(reply->query(), "'DBSIZE' request failed");
-    return reply->query()->integer;
-}
-//-------------------------------------------SET-----------------------------------------
-//--OUTER--
-template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
-    master->set(ctx, key, value, expire);
-}
-//Set pointer types
-template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueLength, const type * value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
-    master->set(ctx, key, valueLength, value, expire);
-}
-//--INNER--
-template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
-{
-    const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
-    const char * msg = setFailMsg;
-
-    StringBuffer cmd("SET %b %b");
-    appendExpire(cmd, expire);
-
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
-    assertOnError(reply->query(), msg);
-}
-template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, size32_t valueLength, const type * value, unsigned expire)
-{
-    const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
-    const char * msg = setFailMsg;
-
-    StringBuffer cmd("SET %b %b");
-    appendExpire(cmd, expire);
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueLength));
-    assertOnError(reply->query(), msg);
-}
-//-------------------------------------------GET-----------------------------------------
-//--OUTER--
-template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
-    master->get(ctx, key, returnValue);
-}
-template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, type * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
-    master->get(ctx, key, returnLength, returnValue);
-}
-void SyncRGetVoidPtrLenPair(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, void * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
-    master->getVoidPtrLenPair(ctx, key, returnLength, returnValue);
-}
-//--INNER--
-template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, type & returnValue)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
-
-    StringBuffer keyMsg = getFailMsg;
-    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
-
-    size_t returnSize = reply->query()->len;
-    if (sizeof(type)!=returnSize)
-    {
-        VStringBuffer msg("RedisPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
-
-        rtlFail(0, msg.str());
-    }
-    memcpy(&returnValue, reply->query()->str, returnSize);
-}
-template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, size_t & returnLength, type * & returnValue)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
-
-    StringBuffer keyMsg = getFailMsg;
-    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
-
-    returnLength = reply->query()->len;
-    size_t returnSize = returnLength;
-
-    returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
-}
-void SyncConnection::getVoidPtrLenPair(ICodeContext * ctx, const char * key, size_t & returnLength, void * & returnValue)
-{
-    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
-    StringBuffer keyMsg = getFailMsg;
-    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
-
-    returnLength = reply->query()->len;
-    returnValue = reinterpret_cast<void*>(allocateAndCopy(reply->query()->str, reply->query()->len));
-}
-
-//--------------------------------------------------------------------------------
-//                           ECL SERVICE ENTRYPOINTS
-//--------------------------------------------------------------------------------
-ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    master->clear(ctx);
-}
-ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    return master->exists(ctx, key);
-}
-ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    master->del(ctx, key);
-}
-ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    master->persist(ctx, key);
-}
-ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned _expire, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    master->expire(ctx, key, _expire);
-}
-ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
-    return master->dbSize(ctx);
-}
-//-----------------------------------SET------------------------------------------
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueLength, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
-{
-    SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, pswd, timeout);
-}
-//-------------------------------------GET----------------------------------------
-ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    bool value;
-    SyncRGet(ctx, options, key, value, database, pswd, timeout);
-    return value;
-}
-ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    double value;
-    SyncRGet(ctx, options, key, value, database, pswd, timeout);
-    return value;
-}
-ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    signed __int64 value;
-    SyncRGet(ctx, options, key, value, database, pswd, timeout);
-    return value;
-}
-ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    unsigned __int64 value;
-    SyncRGet(ctx, options, key, value, database, pswd, timeout);
-    return value;
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    size_t _returnLength;
-    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
-    returnLength = static_cast<size32_t>(_returnLength);
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    size_t _returnLength;
-    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
-    returnLength = static_cast<size32_t>(_returnLength/sizeof(UChar));
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    size_t _returnLength;
-    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
-    returnLength = static_cast<size32_t>(rtlUtf8Length(_returnLength, returnValue));
-}
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnLength, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
-{
-    size_t _returnLength;
-    SyncRGetVoidPtrLenPair(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
-    returnLength = static_cast<size32_t>(_returnLength);
-}
-}//close namespace

+ 72 - 0
testing/regress/ecl/key/redislockingtest.xml

@@ -0,0 +1,72 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_5></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><Result_7>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_7></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><Result_8>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_8></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><Result_9>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_9></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><Result_10>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_10></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><Result_12>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_12></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><Result_13>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_13></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><Result_14>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_14></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><Result_15>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_15></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><Result_17>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_17></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><Result_18>Good boy Einnie!</Result_18></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><Result_19>This way kido</Result_19></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><Result_20></Result_20></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>true</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><Result_22>false</Result_22></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><Result_23>true</Result_23></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><Result_24>false</Result_24></Row>
+</Dataset>

+ 36 - 6
testing/regress/ecl/key/redissynctest.xml

@@ -38,20 +38,50 @@
  <Row><Result_13>false</Result_13></Row>
 </Dataset>
 <Dataset name='Result 14'>
- <Row><Result_14>4614256656552046314</Result_14></Row>
+ <Row><Result_14>7523094288207667809</Result_14></Row>
 </Dataset>
 <Dataset name='Result 15'>
- <Row><Result_15>6</Result_15></Row>
+ <Row><Result_15>true</Result_15></Row>
 </Dataset>
 <Dataset name='Result 16'>
- <Row><Result_16>1</Result_16></Row>
+ <Row><Result_16>false</Result_16></Row>
 </Dataset>
 <Dataset name='Result 17'>
- <Row><Result_17>0</Result_17></Row>
+ <Row><Result_17>true</Result_17></Row>
 </Dataset>
 <Dataset name='Result 18'>
- <Row><Result_18>false</Result_18></Row>
+ <Row><Result_18>true</Result_18></Row>
 </Dataset>
 <Dataset name='Result 19'>
- <Row><Result_19>300</Result_19></Row>
+ <Row><Result_19>4614256656552046314</Result_19></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><Result_20>true</Result_20></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>false</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><Result_22>Woof</Result_22></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><Result_23>Grrrr</Result_23></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><Result_24>Woof-Woof</Result_24></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><Result_25>9</Result_25></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>2</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><Result_27>0</Result_27></Row>
+</Dataset>
+<Dataset name='Result 28'>
+ <Row><Result_28>false</Result_28></Row>
+</Dataset>
+<Dataset name='Result 29'>
+ <Row><Result_29>300</Result_29></Row>
 </Dataset>

+ 125 - 0
testing/regress/ecl/redislockingtest.ecl

@@ -0,0 +1,125 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+IMPORT redisServer FROM lib_redis;
+
+STRING server := '--SERVER=127.0.0.1:6379';
+STRING password := 'foobared';
+myRedis := redisServer(server, password);
+
+myFuncStr(STRING key) := FUNCTION
+ value := myRedis.getString(key);
+ return value;
+END;
+myFuncUtf8(STRING key) := FUNCTION
+ value := myRedis.getUtf8(key);
+ return value;
+END;
+myFuncUni(STRING key) := FUNCTION
+ value := myRedis.getUnicode(key);
+ return value;
+END;
+
+getString(STRING key, STRING key2, myFuncStr func) := FUNCTION
+    value := myRedis.locking.GetString(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.locking.SetString(key, func(key2)), value);
+END;
+getUtf8(STRING key, STRING key2, myFuncUtf8 func) := FUNCTION
+    value := myRedis.locking.GetUtf8(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.locking.SetUtf8(key, func(key2)), value);
+END;
+getUnicode(STRING key, STRING key2, myFuncUni func) := FUNCTION
+    value := myRedis.locking.GetUnicode(key);
+    RETURN IF (LENGTH(value) = 0, myRedis.locking.SetUnicode(key, func(key2)), value);
+END;
+
+CSleep(INTEGER duration) := BEGINC++
+    sleep(duration);
+ENDC++;
+
+//Test compatibiltiy between locking and normal functions.
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.SetUtf8('utf8', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUtf8('utf8'),
+    myRedis.locking.GetUtf8('utf8'),
+    myRedis.FlushDB(),
+    myRedis.locking.SetUtf8('utf8', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUtf8('utf8'),
+    myRedis.locking.GetUtf8('utf8'),
+    myRedis.FlushDB(),
+
+    myRedis.SetUnicode('unicode', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUnicode('unicode'),
+    myRedis.locking.GetUnicode('unicode'),
+    myRedis.FlushDB(),
+    myRedis.locking.SetUnicode('unicode', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUnicode('unicode'),
+    myRedis.locking.GetUnicode('unicode'),
+    myRedis.FlushDB()
+    );
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.SetUtf8('utf8', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUtf8('utf8'),
+    myRedis.locking.GetUtf8('utf8'),
+    myRedis.FlushDB()
+    );
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.locking.SetUtf8('utf8', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    myRedis.GetUtf8('utf8'),
+    myRedis.locking.GetUtf8('utf8'),
+    myRedis.FlushDB()
+    );
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.setUtf8('utf8-2', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    getUtf8('utf8', 'utf8-2', myFuncUtf8),
+);
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.setUnicode('utf8-2', U'אבגדהוזחטיךכלםמןנסעףפץצקרשת'),
+    getUnicode('utf8', 'utf8-2', myFuncUni),
+);
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.setString('Einnie sit', 'Good boy Einnie!'),
+    getString('Einstein', 'Einnie sit', myFuncStr),
+
+    myRedis.setString('Einstein', 'This way kido'),
+    getString('Einstein', 'Einnie sit', myFuncStr),
+);
+
+//Test unlock
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.locking.Getstring('testlock'),/*by default lock expires after 1s*/
+    myRedis.exists('testlock'),
+    CSleep(2),
+    myRedis.exists('testlock'),
+
+    myRedis.setString('testlock', 'redis_ecl_lock_blah_blah_blah'),
+    myRedis.exists('testlock'),
+    myRedis.locking.unlock('testlock'),
+    myRedis.exists('testlock'),
+    myRedis.FlushDB(),
+    );

+ 48 - 13
testing/regress/ecl/redissynctest.ecl

@@ -24,8 +24,8 @@ sync.FlushDB(server, /*database*/, password);
 sync.SetBoolean('b', TRUE, server, /*database*/, /*expire*/, password);
 sync.GetBoolean('b', server, /*database*/, password);
 
-IMPORT redisSync FROM lib_redis;
-myRedis := redisSync(server, password);
+IMPORT redisServer FROM lib_redis;
+myRedis := redisServer(server, password);
 
 REAL pi := 3.14159265359;
 myRedis.SetReal('pi', pi);
@@ -39,7 +39,7 @@ INTEGER i := 123456789;
 myRedis.SetInteger('i', i);
 myRedis.GetInteger('i');
 
-myRedis2 := redisSync('--SERVER=127.0.0.1:6380', 'youarefoobared');
+myRedis2 := redisServer('--SERVER=127.0.0.1:6380', 'youarefoobared');
 
 myRedis2.SetReal('pi', pi2, 1);
 myRedis2.GetReal('pi', 1);
@@ -73,28 +73,63 @@ SEQUENTIAL(
     myRedis.Exists('uft8')
     );
 
-myRedis.Expire('str', 1); 
-myRedis.Persist('str');
+SEQUENTIAL(
+    myRedis.SetString('str2int', 'abcdefgh');//Heterogeneous calls will only result in an exception when the retrieved value does not fit into memory of the requested type.
+    myRedis.GetInteger('str2int');
+    );
+
+CSleep(INTEGER duration) := BEGINC++
+    sleep(duration);
+ENDC++;
+
+SEQUENTIAL(
+    myRedis.Exists('str'),
+    myRedis.Expire('str', , 1000000),/*\mu-s*/
+    CSleep(2),
+    myRedis.Exists('str'),
+
+    myRedis.SetString('str', str),
+    myRedis.Exists('str'),
+    myRedis.Expire('str', , 1000000),/*\mu-s*/
+    myRedis.Persist('str'),
+    CSleep(2),
+    myRedis.Exists('str')
+    );
 
 myRedis.GetInteger('pi');
 
-NOFOLD(myRedis.DBSize());
-NOFOLD(myRedis.DBSize(1));
-NOFOLD(myRedis.DBSize(2));
+SEQUENTIAL(
+    myRedis.SetString('Einnie', 'Woof', 0, 1000000),
+    myRedis.Exists('Einnie');
+    CSleep(2);
+    myRedis.Exists('Einnie');
+
+    myRedis.SetString('Einnie', 'Woof', 0),
+    myRedis.SetString('Einnie', 'Grrrr', 1),
+    myRedis.GetString('Einnie', 0),
+    myRedis.GetString('Einnie', 1),
+    myRedis.SetString('Einnie', 'Woof-Woof'),
+    myRedis.GetString('Einnie'),
+    );
+
+myRedis.DBSize();
+myRedis.DBSize(1);
+myRedis.DBSize(2);
 
 SEQUENTIAL(
     myRedis.FlushDB(),
-    NOFOLD(myRedis.Exists('str'))
+    myRedis.Exists('str')
     );
-myRedis2.FlushDB();
 
 //The follwoing tests the multithreaded caching of the redis connections
 //SUM(NOFOLD(s1 + s2), a) uses two threads
-myRedis.FlushDB();
 INTEGER x := 2;
 INTEGER N := 100;
+myRedis.FlushDB();
 myRedis.SetInteger('i', x);
-s1 :=DATASET(N, TRANSFORM({ integer a }, SELF.a := NOFOLD(myRedis.GetInteger('i'))));
-s2 :=DATASET(N, TRANSFORM({ integer a }, SELF.a := NOFOLD(myRedis.GetInteger('i'))/2));
+s1 := DATASET(N, TRANSFORM({ integer a }, SELF.a := myRedis.GetInteger('i')));
+s2 := DATASET(N, TRANSFORM({ integer a }, SELF.a := myRedis.GetInteger('i')/2));
 SUM(NOFOLD(s1 + s2), a);//answer = (x+x/2)*N, in this case 3N.
+
 myRedis.FlushDB();
+myRedis2.FlushDB();