Browse Source

HPCC-13047 Redis plugin sync module

Signed-off-by: james <james.noss@lexisnexis.com>
james 10 years ago
parent
commit
fa2f212f3c

+ 42 - 0
cmake_modules/FindREDIS.cmake

@@ -0,0 +1,42 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# - Try to find the libhiredislibrary
+# Once done this will define
+#
+#  LIBHEDIS_FOUND - system has the libhiredis library
+#  LIBHIREDIS_INCLUDE_DIR - the libhiredis include directory(s)
+#  LIBHIREDIS_LIBRARY - The library needed to use hiredis
+
+IF (NOT LIBREDIS_FOUND)
+  IF (WIN32)
+    SET (libhiredis "libhiredis")
+  ELSE()
+    SET (libhiredis "hiredis")
+  ENDIF()
+
+  FIND_PATH(LIBHIREDIS_INCLUDE_DIR hiredis/hiredis.h PATHS /usr/include /usr/share/include /usr/local/include PATH_SUFFIXES hiredis)
+  FIND_LIBRARY(LIBHIREDIS_LIBRARY NAMES ${libhiredis} PATHS /usr/lib /usr/share /usr/lib64 /usr/local/lib /usr/local/lib64)
+
+  include(FindPackageHandleStandardArgs)
+  find_package_handle_standard_args(redis DEFAULT_MSG
+    LIBHIREDIS_LIBRARY
+    LIBHIREDIS_INCLUDE_DIR
+  )
+
+  MARK_AS_ADVANCED(LIBHIREDIS_INCLUDE_DIR LIBHIREDIS_LIBRARY)
+ENDIF()
+

+ 2 - 1
cmake_modules/commonSetup.cmake

@@ -89,10 +89,11 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
   option(USE_JNI "Enable Java JNI support" ON)
   option(USE_RINSIDE "Enable R support" ON)
   option(USE_MEMCACHED "Enable Memcached support" ON)
+  option(USE_REDIS "Enable Redis support" ON)
 
   option(USE_OPTIONAL "Automatically disable requested features with missing dependencies" ON)
 
-  if ( USE_PYTHON OR USE_V8 OR USE_JNI OR USE_RINSIDE OR USE_SQLITE3 OR USE_MYSQL OR USE_CASSANDRA OR USE_MEMCACHED)
+  if ( USE_PYTHON OR USE_V8 OR USE_JNI OR USE_RINSIDE OR USE_SQLITE3 OR USE_MYSQL OR USE_CASSANDRA OR USE_MEMCACHED OR USE_REDIS)
       set( WITH_PLUGINS ON )
   endif()
 

+ 1 - 0
plugins/CMakeLists.txt

@@ -32,3 +32,4 @@ add_subdirectory (javaembed)
 add_subdirectory (Rembed)
 add_subdirectory (cassandra)
 add_subdirectory (memcached)
+add_subdirectory (redis)

+ 67 - 0
plugins/redis/CMakeLists.txt

@@ -0,0 +1,67 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# Component: redis
+
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for redis
+#####################################################
+
+project( redis )
+
+if (USE_REDIS)
+  ADD_PLUGIN(redis PACKAGES REDIS OPTION MAKE_REDIS)
+  if ( MAKE_REDIS )
+    set (    SRCS
+             redisplugin.hpp
+             redissync.hpp
+
+             redisplugin.cpp
+             redissync.cpp
+        )
+
+    include_directories (
+             ./../../system/include
+             ./../../rtl/eclrtl
+             ./../../rtl/include
+             ./../../common/deftype
+             ./../../system/jlib
+             ${LIBHIREDIS_INCLUDE_DIR}
+        )
+
+    ADD_DEFINITIONS( -D_USRDLL -DECL_REDIS_EXPORTS)
+
+    HPCC_ADD_LIBRARY( redis SHARED ${SRCS} )
+    if (${CMAKE_VERSION} VERSION_LESS "2.8.9")
+      message("WARNING: Cannot set NO_SONAME. shlibdeps will give warnings when package is installed")
+    elif(NOT APPLE)
+      set_target_properties( redis PROPERTIES NO_SONAME 1 )
+    endif()
+
+    install ( TARGETS redis DESTINATION plugins)
+
+    target_link_libraries ( redis
+        eclrtl
+        jlib
+        ${LIBHIREDIS_LIBRARY}
+        )
+  endif()
+endif()
+
+#Even if not making the redis plugin, we want to install the header
+install ( FILES ${CMAKE_CURRENT_SOURCE_DIR}/lib_redis.ecllib DESTINATION plugins COMPONENT Runtime)

+ 71 - 0
plugins/redis/lib_redis.ecllib

@@ -0,0 +1,71 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the License);
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an AS IS BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+EXPORT sync := SERVICE : plugin('redis'), namespace('RedisPlugin')
+  SetUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUChar';
+  SetString(  CONST VARSTRING key, CONST STRING value,  CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetStr';
+  SetUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUtf8';
+  SetBoolean( CONST VARSTRING key, BOOLEAN value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetBool';
+  SetReal(    CONST VARSTRING key, REAL value,          CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetReal';
+  SetInteger( CONST VARSTRING key, INTEGER value,       CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetInt';
+  SetUnsigned(CONST VARSTRING key, UNSIGNED value,      CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetUInt';
+  SetData(    CONST VARSTRING key, CONST DATA value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='SyncRSetData';
+
+  INTEGER8   GetInteger(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetInt8';
+  UNSIGNED8 GetUnsigned(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUint8';
+  STRING      GetString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetStr';
+  UNICODE    GetUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUChar';
+  UTF8          GetUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetUtf8';
+  BOOLEAN    GetBoolean(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetBool';
+  REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetDouble';
+  DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='SyncRGetData';
+
+  BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='RExist';
+  FlushDB(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RClear';
+  Del(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RDel';
+  Persist(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RPersist';
+  Expire(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED4 expire, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,action,context,entrypoint='RExpire';
+  INTEGER DBSize(CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', unsigned timeout = 1000000) : cpp,once,context,entrypoint='RDBSize';
+END;
+
+EXPORT RedisSync    (CONST VARSTRING options, CONST VARSTRING password = '', unsigned timeout = 1000000) := MODULE
+  EXPORT  SetUnicode(CONST VARSTRING key, CONST UNICODE value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnicode (key, value, options, database, expire, password, timeout);
+  EXPORT   SetString(CONST VARSTRING key, CONST STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetString  (key, value, options, database, expire, password, timeout);
+  EXPORT     SetUtf8(CONST VARSTRING key, CONST UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUtf8    (key, value, options, database, expire, password, timeout);
+  EXPORT  SetBoolean(CONST VARSTRING key, CONST BOOLEAN value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetBoolean (key, value, options, database, expire, password, timeout);
+  EXPORT     SetReal(CONST VARSTRING key, CONST REAL value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetReal    (key, value, options, database, expire, password, timeout);
+  EXPORT  SetInteger(CONST VARSTRING key, CONST INTEGER value,  UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetInteger (key, value, options, database, expire, password, timeout);
+  EXPORT SetUnsigned(CONST VARSTRING key, CONST UNSIGNED value, UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetUnsigned(key, value, options, database, expire, password, timeout);
+  EXPORT     SetData(CONST VARSTRING key, CONST DATA value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := sync.SetData    (key, value, options, database, expire, password, timeout);
+
+  EXPORT  GetUnicode(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetUnicode(key, options, database, password, timeout);
+  EXPORT   GetString(CONST VARSTRING key, UNSIGNED database = 0) :=   sync.GetString(key, options, database, password, timeout);
+  EXPORT     GetUtf8(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetUtf8(key, options, database, password, timeout);
+  EXPORT  GetBoolean(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetBoolean(key, options, database, password, timeout);
+  EXPORT     GetReal(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetReal(key, options, database, password, timeout);
+  EXPORT  GetInteger(CONST VARSTRING key, UNSIGNED database = 0) :=  sync.GetInteger(key, options, database, password, timeout);
+  EXPORT GetUnsigned(CONST VARSTRING key, UNSIGNED database = 0) := sync.GetUnsigned(key, options, database, password, timeout);
+  EXPORT     GetData(CONST VARSTRING key, UNSIGNED database = 0) :=     sync.GetData(key, options, database, password, timeout);
+
+  EXPORT Exists(CONST VARSTRING key, UNSIGNED database = 0) := sync.Exists(key, options, database, password, timeout);
+  EXPORT FlushDB(UNSIGNED database = 0) := sync.FlushDB(options, database, password, timeout);
+  EXPORT Del(CONST VARSTRING key, UNSIGNED database = 0) := sync.Del(key, options, database, password, timeout);
+  EXPORT Persist(CONST VARSTRING key, UNSIGNED database = 0) := sync.Persist(key, options, database, password, timeout);
+  EXPORT Expire(CONST VARSTRING key, UNSIGNED4 expire, UNSIGNED database = 0)  := sync.Expire(key, options, expire, database, password, timeout);
+  EXPORT DBSize(UNSIGNED database = 0) := sync.DBSize(options, database, password, timeout);
+END;

+ 114 - 0
plugins/redis/redisplugin.cpp

@@ -0,0 +1,114 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "eclrtl.hpp"
+#include "jstring.hpp"
+#include "redisplugin.hpp"
+
+#define REDIS_VERSION "redis plugin 1.0.0"
+
+ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
+{
+    if (pb->size != sizeof(ECLPluginDefinitionBlock))
+        return false;
+
+    pb->magicVersion = PLUGIN_VERSION;
+    pb->version = REDIS_VERSION;
+    pb->moduleName = "lib_redis";
+    pb->ECL = NULL;
+    pb->flags = PLUGIN_IMPLICIT_MODULE;
+    pb->description = "ECL plugin library for the C API hiredis\n";
+    return true;
+}
+
+namespace RedisPlugin {
+
+StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
+{
+    if (expire > 0)
+        buffer.append(" EX ").append(expire*unitExpire);
+    return buffer;
+}
+
+void RedisServer::parseOptions(ICodeContext * ctx, const char * _options)
+{
+    StringArray optionStrings;
+    optionStrings.appendList(_options, " ");
+    ForEachItemIn(idx, optionStrings)
+    {
+        const char *opt = optionStrings.item(idx);
+        if (strncmp(opt, "--SERVER=", 9) == 0)
+        {
+            opt += 9;
+            StringArray splitPort;
+            splitPort.appendList(opt, ":");
+            if (splitPort.ordinality()==2)
+            {
+                ip.set(splitPort.item(0));
+                port = atoi(splitPort.item(1));
+            }
+        }
+        else
+        {
+            VStringBuffer err("RedisPlugin: unsupported option string %s", opt);
+            rtlFail(0, err.str());
+        }
+    }
+    if (ip.isEmpty())
+    {
+        ip.set("localhost");
+        port = 6379;
+        if (ctx)
+        {
+            VStringBuffer msg("Redis Plugin: WARNING - using default server (%s:%d)", ip.str(), port);
+            ctx->logString(msg.str());
+        }
+    }
+    return;
+}
+Connection::Connection(ICodeContext * ctx, const char * _options, const char * pswd, unsigned __int64 _timeout) : alreadyInitialized(false), database(0), timeout(_timeout)
+{
+    server.set(new RedisServer(ctx, _options, pswd));
+}
+Connection::Connection(ICodeContext * ctx, RedisServer * _server) : alreadyInitialized(false), database(0), timeout(0)
+{
+    server.setown(_server);
+}
+bool Connection::isSameConnection(ICodeContext * ctx, unsigned hash) const
+{
+    return server->isSame(ctx, hash);
+}
+void * Connection::allocateAndCopy(const char * src, size_t size)
+{
+    void * value = rtlMalloc(size);
+    return memcpy(value, src, size);
+}
+const char * Connection::appendIfKeyNotFoundMsg(const redisReply * reply, const char * key, StringBuffer & target) const
+{
+    if (reply && reply->type == REDIS_REPLY_NIL)
+        target.append("(key: '").append(key).append("') ");
+    return target.str();
+}
+void Connection::init(ICodeContext * ctx)
+{
+    logServerStats(ctx);
+    alreadyInitialized = true;
+}
+}//close namespace
+
+

+ 128 - 0
plugins/redis/redisplugin.hpp

@@ -0,0 +1,128 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ECL_REDIS_INCL
+#define ECL_REDIS_INCL
+
+#ifdef _WIN32
+#define ECL_REDIS_CALL _cdecl
+#ifdef ECL_REDIS_EXPORTS
+#define ECL_REDIS_API __declspec(dllexport)
+#else
+#define ECL_REDIS_API __declspec(dllimport)
+#endif
+#else
+#define ECL_REDIS_CALL
+#define ECL_REDIS_API
+#endif
+
+#include "jhash.hpp"
+#include "hqlplugins.hpp"
+#include "eclhelper.hpp"
+#include "jexcept.hpp"
+#include "hiredis/hiredis.h"
+
+extern "C"
+{
+    ECL_REDIS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
+    ECL_REDIS_API void setPluginContext(IPluginContext * _ctx);
+}
+
+class StringBuffer;
+
+namespace RedisPlugin {
+static const unsigned unitExpire = 86400;//1 day (secs)
+
+#define setFailMsg "'Set' request failed - "
+#define getFailMsg "'Get<type>' request failed - "
+
+StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire);
+
+class RedisServer : public CInterface
+{
+public :
+    RedisServer(ICodeContext * ctx, const char * _options, const char * pswd)
+    {
+        serverIpPortPasswordHash = hashc((const unsigned char*)pswd, strlen(pswd), 0);
+        serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
+        options.set(_options);
+        parseOptions(ctx, _options);
+    }
+    bool isSame(ICodeContext * ctx, unsigned hash) const
+    {
+        return (serverIpPortPasswordHash == hash);
+    }
+    const char * getIp() { return ip.str(); }
+    int getPort() { return port; }
+    void parseOptions(ICodeContext * ctx, const char * _options);
+
+private :
+    unsigned serverIpPortPasswordHash;
+    StringAttr options;
+    StringAttr ip;
+    int port;
+};
+class Connection : public CInterface
+{
+public :
+    Connection(ICodeContext * ctx, const char * _options, const char * pswd, unsigned __int64 _timeout);
+    Connection(ICodeContext * ctx, RedisServer * _server);
+
+    bool isSameConnection(ICodeContext * ctx, unsigned hash) const;
+    const char * ip() const { return server->getIp(); }
+    int port() const { return server->getPort(); }
+
+protected :
+    virtual void assertOnError(const redisReply * reply, const char * _msg) { }
+    virtual void assertConnection() { }
+    virtual void logServerStats(ICodeContext * ctx) { }
+    virtual void updateTimeout(unsigned __int64 _timeout) { }
+
+    const char * appendIfKeyNotFoundMsg(const redisReply * reply, const char * key, StringBuffer & target) const;
+    void * allocateAndCopy(const char * src, size_t size);
+    void init(ICodeContext * ctx);
+
+protected :
+    Owned<RedisServer> server;
+    unsigned __int64 timeout;
+    unsigned __int64 database;
+    bool alreadyInitialized;
+};
+
+class Reply : public CInterface
+{
+public :
+    inline Reply() { reply = NULL; };
+    inline Reply(void * _reply) { reply = (redisReply*)_reply; }
+    inline Reply(redisReply * _reply) { reply = _reply; }
+    inline ~Reply()
+    {
+        if (reply)
+            freeReplyObject(reply);
+    }
+
+    static Reply * createReply(void * _reply) { return new Reply(_reply); }
+    inline const redisReply * query() const { return reply; }
+
+private :
+    redisReply * reply;
+};
+typedef Owned<RedisPlugin::Reply> OwnedReply;
+
+}//close namespace
+
+#endif

+ 412 - 0
plugins/redis/redissync.cpp

@@ -0,0 +1,412 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "jthread.hpp"
+#include "jhash.hpp"
+#include "eclrtl.hpp"
+#include "jstring.hpp"
+#include "redissync.hpp"
+
+namespace RedisPlugin {
+static __thread SyncConnection * cachedConnection;
+static __thread ThreadTermFunc threadHookChain;
+
+//The following class is here to ensure destruction of the cachedConnection within the main thread
+//as this is not handled by the thread hook mechanism.
+static class mainThreadCachedConnection
+{
+public :
+    mainThreadCachedConnection() { }
+    ~mainThreadCachedConnection()
+    {
+        if (cachedConnection)
+            cachedConnection->Release();
+    }
+} mainThread;
+
+static void releaseContext()
+{
+    if (cachedConnection)
+        cachedConnection->Release();
+    if (threadHookChain)
+    {
+        (*threadHookChain)();
+        threadHookChain = NULL;
+    }
+}
+
+SyncConnection::SyncConnection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
+  : Connection(ctx, _options, pswd, _timeout)
+{
+    connect(ctx, _database, pswd);
+}
+SyncConnection::SyncConnection(ICodeContext * ctx, RedisServer * _server, unsigned __int64 _database, const char * pswd)
+  : Connection(ctx, _server)
+{
+    connect(ctx, _database, pswd);
+}
+void SyncConnection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * pswd)
+{
+    struct timeval to = { timeout/1000000, timeout%1000000 };
+    context = redisConnectWithTimeout(server->getIp(), server->getPort(), to);
+    assertConnection();
+    authenticate(ctx, pswd);
+    selectDB(ctx, _database);
+    init(ctx);
+}
+void SyncConnection::authenticate(ICodeContext * ctx, const char * pswd)
+{
+    if (strlen(pswd) > 0)
+    {
+        OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", pswd, strlen(pswd)));
+        assertOnError(reply->query(), "server authentication failed");
+    }
+}
+SyncConnection * SyncConnection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * pswd, unsigned __int64 _timeout)
+{
+    if (!cachedConnection)
+    {
+        cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
+        threadHookChain = addThreadTermFunc(releaseContext);
+        return LINK(cachedConnection);
+    }
+
+    unsigned optionsPswdHash = hashc((const unsigned char*)options, strlen(options), hashc((const unsigned char*)pswd, strlen(pswd), 0));
+    if (cachedConnection->isSameConnection(ctx, optionsPswdHash))
+    {
+        //MORE: need to check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
+        //At present updateTimeout calls assertConnection.
+        cachedConnection->updateTimeout(_timeout);
+        cachedConnection->selectDB(ctx, _database);
+        return LINK(cachedConnection);
+    }
+
+    cachedConnection->Release();
+    cachedConnection = new SyncConnection(ctx, options, _database, pswd, _timeout);
+    return LINK(cachedConnection);
+}
+void SyncConnection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
+{
+    if (database == _database)
+        return;
+    database = _database;
+    VStringBuffer cmd("SELECT %" I64F "u", database);
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
+    assertOnError(reply->query(), "'SELECT' request failed");
+}
+void SyncConnection::updateTimeout(unsigned __int64 _timeout)
+{
+    if (timeout == _timeout)
+        return;
+    assertConnection();
+    timeout = _timeout;
+    struct timeval to = { timeout/1000000, timeout%1000000 };
+    if (redisSetTimeout(context, to) != REDIS_OK)
+    {
+        if (context->err)
+        {
+            VStringBuffer msg("RedisPlugin: failed to set timeout - %s", context->errstr);
+            rtlFail(0, msg.str());
+        }
+        else
+            rtlFail(0, "RedisPlugin: failed to set timeout - no message available");
+    }
+}
+void SyncConnection::logServerStats(ICodeContext * ctx)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "INFO"));
+    assertOnError(reply->query(), "'INFO' request failed");
+    StringBuffer stats("Redis Plugin : Server stats - ");
+    stats.newline().append(reply->query()->str).newline();
+    ctx->logString(stats.str());
+}
+void SyncConnection::assertOnError(const redisReply * reply, const char * _msg)
+{
+    if (!reply)
+    {
+        //There should always be a context error if no reply error
+        assertConnection();
+        VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
+        rtlFail(0, msg.str());
+    }
+    else if (reply->type == REDIS_REPLY_ERROR)
+    {
+        if (strncmp(reply->str, "NOAUTH", 6) == 0)
+        {
+            VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
+            rtlFail(0, msg.str());
+        }
+        else
+        {
+            VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
+            rtlFail(0, msg.str());
+        }
+    }
+}
+void SyncConnection::assertConnection()
+{
+    if (!context)
+        rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
+    else if (context->err)
+    {
+        VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip(), port());
+        rtlFail(0, msg.str());
+    }
+}
+
+void SyncConnection::clear(ICodeContext * ctx)
+{
+    //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
+    OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
+    //NOTE: documented as never failing, but in case
+    assertOnError(reply->query(), "'FlushDB' request failed");
+}
+void SyncConnection::del(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
+    assertOnError(reply->query(), "'Del' request failed");
+}
+void SyncConnection::persist(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
+    assertOnError(reply->query(), "'Persist' request failed");
+}
+void SyncConnection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b %u", key, strlen(key), _expire));
+    assertOnError(reply->query(), "'Expire' request failed");
+}
+bool SyncConnection::exists(ICodeContext * ctx, const char * key)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
+    assertOnError(reply->query(), "'Exists' request failed");
+    return (reply->query()->integer != 0);
+}
+unsigned __int64 SyncConnection::dbSize(ICodeContext * ctx)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
+    assertOnError(reply->query(), "'DBSIZE' request failed");
+    return reply->query()->integer;
+}
+//-------------------------------------------SET-----------------------------------------
+//--OUTER--
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
+    master->set(ctx, key, value, expire);
+}
+//Set pointer types
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueLength, const type * value, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 _timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, _options, database, pswd, _timeout);
+    master->set(ctx, key, valueLength, value, expire);
+}
+//--INNER--
+template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
+{
+    const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
+    const char * msg = setFailMsg;
+
+    StringBuffer cmd("SET %b %b");
+    appendExpire(cmd, expire);
+
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
+    assertOnError(reply->query(), msg);
+}
+template<class type> void SyncConnection::set(ICodeContext * ctx, const char * key, size32_t valueLength, const type * value, unsigned expire)
+{
+    const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
+    const char * msg = setFailMsg;
+
+    StringBuffer cmd("SET %b %b");
+    appendExpire(cmd, expire);
+    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueLength));
+    assertOnError(reply->query(), msg);
+}
+//-------------------------------------------GET-----------------------------------------
+//--OUTER--
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
+    master->get(ctx, key, returnValue);
+}
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, type * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
+    master->get(ctx, key, returnLength, returnValue);
+}
+void SyncRGetVoidPtrLenPair(ICodeContext * ctx, const char * options, const char * key, size_t & returnLength, void * & returnValue, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, _timeout);
+    master->getVoidPtrLenPair(ctx, key, returnLength, returnValue);
+}
+//--INNER--
+template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, type & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+
+    StringBuffer keyMsg = getFailMsg;
+    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
+
+    size_t returnSize = reply->query()->len;
+    if (sizeof(type)!=returnSize)
+    {
+        VStringBuffer msg("RedisPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
+
+        rtlFail(0, msg.str());
+    }
+    memcpy(&returnValue, reply->query()->str, returnSize);
+}
+template<class type> void SyncConnection::get(ICodeContext * ctx, const char * key, size_t & returnLength, type * & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+
+    StringBuffer keyMsg = getFailMsg;
+    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
+
+    returnLength = reply->query()->len;
+    size_t returnSize = returnLength;
+
+    returnValue = reinterpret_cast<type*>(allocateAndCopy(reply->query()->str, returnSize));
+}
+void SyncConnection::getVoidPtrLenPair(ICodeContext * ctx, const char * key, size_t & returnLength, void * & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+    StringBuffer keyMsg = getFailMsg;
+    assertOnError(reply->query(), appendIfKeyNotFoundMsg(reply->query(), key, keyMsg));
+
+    returnLength = reply->query()->len;
+    returnValue = reinterpret_cast<void*>(allocateAndCopy(reply->query()->str, reply->query()->len));
+}
+
+//--------------------------------------------------------------------------------
+//                           ECL SERVICE ENTRYPOINTS
+//--------------------------------------------------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    master->clear(ctx);
+}
+ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    return master->exists(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    master->del(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    master->persist(ctx, key);
+}
+ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned _expire, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    master->expire(ctx, key, _expire*unitExpire);
+}
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    Owned<SyncConnection> master = SyncConnection::createConnection(ctx, options, database, pswd, timeout);
+    return master->dbSize(ctx);
+}
+//-----------------------------------SET------------------------------------------
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueLength, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, valueLength, value, database, expire, pswd, timeout);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned __int64 timeout)
+{
+    SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, pswd, timeout);
+}
+//-------------------------------------GET----------------------------------------
+ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    bool value;
+    SyncRGet(ctx, options, key, value, database, pswd, timeout);
+    return value;
+}
+ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    double value;
+    SyncRGet(ctx, options, key, value, database, pswd, timeout);
+    return value;
+}
+ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    signed __int64 value;
+    SyncRGet(ctx, options, key, value, database, pswd, timeout);
+    return value;
+}
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    unsigned __int64 value;
+    SyncRGet(ctx, options, key, value, database, pswd, timeout);
+    return value;
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    size_t _returnLength;
+    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
+    returnLength = static_cast<size32_t>(_returnLength);
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    size_t _returnLength;
+    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
+    returnLength = static_cast<size32_t>(_returnLength/sizeof(UChar));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    size_t _returnLength;
+    SyncRGet(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
+    returnLength = static_cast<size32_t>(rtlUtf8Length(_returnLength, returnValue));
+}
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnLength, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 timeout)
+{
+    size_t _returnLength;
+    SyncRGetVoidPtrLenPair(ctx, options, key, _returnLength, returnValue, database, pswd, timeout);
+    returnLength = static_cast<size32_t>(_returnLength);
+}
+}//close namespace

+ 97 - 0
plugins/redis/redissync.hpp

@@ -0,0 +1,97 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ECL_REDIS_SYNC_INCL
+#define ECL_REDIS_SYNC_INCL
+
+#include "redisplugin.hpp"
+
+namespace RedisPlugin
+{
+class SyncConnection : public Connection
+{
+public :
+    SyncConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout);
+    SyncConnection(ICodeContext * ctx, RedisServer * _server, unsigned __int64 database, const char * pswd);
+    ~SyncConnection()
+    {
+        if (context)
+            redisFree(context);
+    }
+    static SyncConnection * createConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned __int64 _timeout);
+
+    //set
+    template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
+    template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueLength, const type * value, unsigned expire);
+    //get
+    template <class type> void get(ICodeContext * ctx, const char * key, type & value);
+    template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueLength, type * & value);
+    void getVoidPtrLenPair(ICodeContext * ctx, const char * key, size_t & valueLength, void * & value);
+    void persist(ICodeContext * ctx, const char * key);
+    void expire(ICodeContext * ctx, const char * key, unsigned _expire);
+    void del(ICodeContext * ctx, const char * key);
+    void clear(ICodeContext * ctx);
+    unsigned __int64 dbSize(ICodeContext * ctx);
+    bool exists(ICodeContext * ctx, const char * key);
+
+protected :
+    void connect(ICodeContext * ctx, unsigned __int64 _database, const char * pswd);
+    void selectDB(ICodeContext * ctx, unsigned __int64 _database);
+    void authenticate(ICodeContext * ctx, const char * pswd);
+
+    virtual void updateTimeout(unsigned __int64 _timeout);
+    virtual void assertOnError(const redisReply * reply, const char * _msg);
+    virtual void assertConnection();
+    virtual void logServerStats(ICodeContext * ctx);
+
+protected :
+    redisContext * context;
+};
+}//close namespace
+
+extern "C++"
+{
+namespace RedisPlugin {
+    //--------------------------SET----------------------------------------
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool (ICodeContext * _ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt  (ICodeContext * _ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt (ICodeContext * _ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal (ICodeContext * _ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8 (ICodeContext * _ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr  (ICodeContext * _ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * _ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData (ICodeContext * _ctx, const char * key, size32_t valueLength, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
+    //--------------------------GET----------------------------------------
+    ECL_REDIS_API bool             ECL_REDIS_CALL SyncRGetBool  (ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API signed __int64   ECL_REDIS_CALL SyncRGetInt8  (ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8 (ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API double           ECL_REDIS_CALL SyncRGetDouble(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncRGetData  (ICodeContext * _ctx,size32_t & returnLength, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+
+    //--------------------------------AUXILLARIES---------------------------
+    ECL_REDIS_API bool             ECL_REDIS_CALL RExist  (ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL RClear  (ICodeContext * _ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL RDel    (ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL RPersist(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL RExpire (ICodeContext * _ctx, const char * key, const char * options, unsigned expire, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize (ICodeContext * _ctx, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+}
+}
+#endif

+ 57 - 0
testing/regress/ecl/key/redissynctest.xml

@@ -0,0 +1,57 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>true</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>3.14159265359</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>9.869604401090658</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>123456789</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>9.869604401090658</Result_5></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>123456789</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><Result_7>7</Result_7></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><Result_8>supercalifragilisticexpialidocious</Result_8></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><Result_9>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_9></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><Result_10>אבגדהוזחטיךכלםמןנסעףפץצקרשת</Result_10></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>D790D791D792D793D794D795D796D798D799D79AD79BD79CD79DD79DD79ED79FD7A0D7A1D7A2D7A3D7A4D7A5D7A6D7A7D7A8D7A9D7AA</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><Result_12>true</Result_12></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><Result_13>false</Result_13></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><Result_14>4614256656552046314</Result_14></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><Result_15>6</Result_15></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>1</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><Result_17>0</Result_17></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><Result_18>false</Result_18></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><Result_19>300</Result_19></Row>
+</Dataset>

+ 100 - 0
testing/regress/ecl/redissynctest.ecl

@@ -0,0 +1,100 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+IMPORT sync FROM lib_redis;
+
+STRING server := '--SERVER=127.0.0.1:6379';
+STRING password := 'foobared';
+sync.FlushDB(server, /*database*/, password);
+
+sync.SetBoolean('b', TRUE, server, /*database*/, /*expire*/, password);
+sync.GetBoolean('b', server, /*database*/, password);
+
+IMPORT redisSync FROM lib_redis;
+myRedis := redisSync(server, password);
+
+REAL pi := 3.14159265359;
+myRedis.SetReal('pi', pi);
+myRedis.GetReal('pi');
+
+REAL pi2 := pi*pi;
+myRedis.SetReal('pi', pi2, 1);
+myRedis.GetReal('pi', 1);
+
+INTEGER i := 123456789;
+myRedis.SetInteger('i', i);
+myRedis.GetInteger('i');
+
+myRedis2 := redisSync('--SERVER=127.0.0.1:6380', 'youarefoobared');
+
+myRedis2.SetReal('pi', pi2, 1);
+myRedis2.GetReal('pi', 1);
+
+myRedis2.SetInteger('i', i);
+myRedis2.GetInteger('i');
+
+UNSIGNED u := 7;
+myRedis.SetUnsigned('u', u);
+myRedis.GetUnsigned('u');
+
+STRING str  := 'supercalifragilisticexpialidocious';
+myRedis.SetString('str', str);
+myRedis.GetString('str');
+
+UNICODE uni := U'אבגדהוזחטיךכלםמןנסעףפץצקרשת';
+myRedis.setUnicode('uni', uni);
+myRedis.getUnicode('uni');
+
+UTF8 utf := U'אבגדהוזחטיךכלםמןנסעףפץצקרשת';
+myRedis.SetUtf8('utf8', utf);
+myRedis.GetUtf8('utf8');
+
+DATA mydata := x'd790d791d792d793d794d795d796d798d799d79ad79bd79cd79dd79dd79ed79fd7a0d7a1d7a2d7a3d7a4d7a5d7a6d7a7d7a8d7a9d7aa';
+myRedis.SetData('data', mydata);
+myRedis.GetData('data');
+
+SEQUENTIAL(
+    myRedis.Exists('utf8'),
+    myRedis.Del('utf8'),
+    myRedis.Exists('uft8')
+    );
+
+myRedis.Expire('str', 1); 
+myRedis.Persist('str');
+
+myRedis.GetInteger('pi');
+
+NOFOLD(myRedis.DBSize());
+NOFOLD(myRedis.DBSize(1));
+NOFOLD(myRedis.DBSize(2));
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    NOFOLD(myRedis.Exists('str'))
+    );
+myRedis2.FlushDB();
+
+//The follwoing tests the multithreaded caching of the redis connections
+//SUM(NOFOLD(s1 + s2), a) uses two threads
+myRedis.FlushDB();
+INTEGER x := 2;
+INTEGER N := 100;
+myRedis.SetInteger('i', x);
+s1 :=DATASET(N, TRANSFORM({ integer a }, SELF.a := NOFOLD(myRedis.GetInteger('i'))));
+s2 :=DATASET(N, TRANSFORM({ integer a }, SELF.a := NOFOLD(myRedis.GetInteger('i'))/2));
+SUM(NOFOLD(s1 + s2), a);//answer = (x+x/2)*N, in this case 3N.
+myRedis.FlushDB();