瀏覽代碼

HPCC-15499 Redis Plugin: add redis command INCRBY

Signed-off-by: James Noss <james.noss@lexisnexis.com>
James Noss 9 年之前
父節點
當前提交
642ad2a005

+ 5 - 0
plugins/redis/README.md

@@ -76,6 +76,11 @@ REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 dat
 DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE)
 ```
 
+###Numeric
+```
+INTEGER8 INCRBY(CONST VARSTRING key, INTEGER8 value = 1, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE)
+```
+
 ###Utility
 ```
 BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000)

+ 6 - 0
plugins/redis/lib_redis.ecllib

@@ -35,6 +35,8 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   REAL          GetReal(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,once,context,entrypoint='SyncRGetDouble';
   DATA          GetData(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,once,context,entrypoint='SyncRGetData';
 
+  INTEGER8 INCRBY(CONST VARSTRING key, INTEGER8 value = 1, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,once,context,entrypoint='SyncRINCRBY';
+
   BOOLEAN Exists(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,once,context,entrypoint='RExist';
   FlushDB(CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,action,context,entrypoint='RClear';
   Delete(CONST VARSTRING key, CONST VARSTRING options, INTEGER4 database = 0, CONST VARSTRING password = '', UNSIGNED4 timeout = 1000, BOOLEAN cacheConnections = TRUE) : cpp,action,context,entrypoint='RDel';
@@ -76,6 +78,8 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED4 timeout
   EXPORT GetUnsigned(VARSTRING key, INTEGER4 database = 0) := redis.GetUnsigned(key, options, database, password, timeout, cacheConnections);
   EXPORT     GetData(VARSTRING key, INTEGER4 database = 0) :=     redis.GetData(key, options, database, password, timeout, cacheConnections);
 
+  EXPORT INCRBY(VARSTRING key, INTEGER8 value = 1) := redis.INCRBY(key, value, options, database, password, timeout, cacheConnections);
+
   EXPORT Exists(VARSTRING key, INTEGER4 database = 0) := redis.Exists(key, options, database, password, timeout, cacheConnections);
   EXPORT FlushDB(INTEGER4 database = 0) := redis.FlushDB(options, database, password, timeout, cacheConnections);
   EXPORT Delete(VARSTRING key, INTEGER4 database = 0) := redis.Delete(key, options, database, password, timeout, cacheConnections);
@@ -117,6 +121,8 @@ EXPORT RedisServerWithoutTimeout(VARSTRING options, VARSTRING password = '', BOO
   EXPORT GetUnsigned(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.GetUnsigned(key, options, database, password, timeout, cacheConnections);
   EXPORT     GetData(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 timeout = 1000) :=     redis.GetData(key, options, database, password, timeout, cacheConnections);
 
+  EXPORT INCRBY(VARSTRING key, INTEGER8 value = 1, INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.INCRBY(key, value, options, database, password, timeout, cacheConnections);
+
   EXPORT Exists(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.Exists(key, options, database, password, timeout, cacheConnections);
   EXPORT FlushDB(INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.FlushDB(options, database, password, timeout, cacheConnections);
   EXPORT Delete(VARSTRING key, INTEGER4 database = 0, UNSIGNED4 timeout = 1000) := redis.Delete(key, options, database, password, timeout, cacheConnections);

+ 118 - 32
plugins/redis/redis.cpp

@@ -128,9 +128,15 @@ public :
     //set
     template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
     template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
+    void setInt(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned);
+    void setReal(ICodeContext * ctx, const char * key, double value, unsigned expire);
+
     //get
     template <class type> void get(ICodeContext * ctx, const char * key, type & value);
     template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
+    template <class type> void getNumeric(ICodeContext * ctx, const char * key, type & value);
+    signed __int64 returnInt(const char * key, const char * cmd, const redisReply * reply);
+
 
     //-------------------------------LOCKING------------------------------------------------
     void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
@@ -149,6 +155,7 @@ public :
     void clear(ICodeContext * ctx);
     unsigned __int64 dbSize(ICodeContext * ctx);
     bool exists(ICodeContext * ctx, const char * key);
+    signed __int64 incrBy(ICodeContext * ctx, const char * key, signed __int64 value);
 
 protected :
     void redisSetTimeout();
@@ -167,7 +174,10 @@ protected :
     void assertConnection(const char * _msg);
     void assertConnectionWithCmdMsg(const char * cmd, const char * key = NULL);
     void fail(const char * cmd, const char * errmsg, const char * key = NULL);
-    void * redisCommand(redisContext * context, const char * format, ...);
+    void * redisCommand(const char * format, ...);
+    void fromStr(const char * str, const char * key, double & ret);
+    void fromStr(const char * str, const char * key, signed __int64 & ret);
+    void fromStr(const char * str, const char * key, unsigned __int64 & ret);
 #if HIREDIS_VERSION_OK_FOR_CACHING
     static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
     bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
@@ -278,7 +288,7 @@ void Connection::connect(ICodeContext * ctx, int _database, const char * passwor
         database = _database;
     }
 }
-void * Connection::redisCommand(redisContext * context, const char * format, ...)
+void * Connection::redisCommand(const char * format, ...)
 {
     //Copied from https://github.com/redis/hiredis/blob/master/hiredis.c ~line:1008 void * redisCommand(redisContext * context, const char * format, ...)
     //with redisSetTimeout(); added.
@@ -436,7 +446,7 @@ void Connection::selectDB(ICodeContext * ctx, int _database)
         return;
     database = _database;
     VStringBuffer cmd("SELECT %d", database);
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str()));
     assertOnErrorWithCmdMsg(reply->query(), cmd.str());
 }
 void Connection::fail(const char * cmd, const char * errmsg, const char * key)
@@ -515,38 +525,62 @@ void Connection::assertConnection(const char * _msg)
 void Connection::clear(ICodeContext * ctx)
 {
     //NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
-    OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
+    OwnedReply reply = Reply::createReply(redisCommand("FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
     //NOTE: documented as never failing, but in case
     assertOnErrorWithCmdMsg(reply->query(), "FlushDB");
 }
 void Connection::del(ICodeContext * ctx, const char * key)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
+    OwnedReply reply = Reply::createReply(redisCommand("DEL %b", key, strlen(key)));
     assertOnErrorWithCmdMsg(reply->query(), "Del", key);
 }
 void Connection::persist(ICodeContext * ctx, const char * key)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
+    OwnedReply reply = Reply::createReply(redisCommand("PERSIST %b", key, strlen(key)));
     assertOnErrorWithCmdMsg(reply->query(), "Persist", key);
 }
 void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
+    OwnedReply reply = Reply::createReply(redisCommand("PEXPIRE %b %u", key, strlen(key), _expire));
     assertOnErrorWithCmdMsg(reply->query(), "Expire", key);
 }
 bool Connection::exists(ICodeContext * ctx, const char * key)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
+    OwnedReply reply = Reply::createReply(redisCommand("EXISTS %b", key, strlen(key)));
     assertOnErrorWithCmdMsg(reply->query(), "Exists", key);
     return (reply->query()->integer != 0);
 }
 unsigned __int64 Connection::dbSize(ICodeContext * ctx)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
+    OwnedReply reply = Reply::createReply(redisCommand("DBSIZE"));
     assertOnErrorWithCmdMsg(reply->query(), "DBSIZE");
     return reply->query()->integer;
 }
+signed __int64 Connection::incrBy(ICodeContext * ctx, const char * key, signed __int64 value)
+{
+    OwnedReply reply = Reply::createReply(redisCommand("INCRBY %b %" I64F "d", key, strlen(key), value));
+    return returnInt(key, "INCRBY", reply->query());
+}
 //-------------------------------------------SET-----------------------------------------
+void Connection::setInt(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned)
+{
+    StringBuffer cmd("SET %b %" I64F);
+    if (_unsigned)
+        cmd.append("u");
+    else
+        cmd.append("d");
+    appendExpire(cmd, expire);
+
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), value));
+    assertOnErrorWithCmdMsg(reply->query(), "SET", key);
+}
+void Connection::setReal(ICodeContext * ctx, const char * key, double value, unsigned expire)
+{
+    StringBuffer cmd("SET %b %.16g");
+    appendExpire(cmd, expire);
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), value));
+    assertOnErrorWithCmdMsg(reply->query(), "SET", key);
+}
 //--OUTER--
 template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout, bool cacheConnections)
 {
@@ -567,7 +601,7 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
     StringBuffer cmd("SET %b %b");
     appendExpire(cmd, expire);
 
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), _value, sizeof(type)));
     assertOnErrorWithCmdMsg(reply->query(), "SET", key);
 }
 template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
@@ -576,11 +610,26 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
 
     StringBuffer cmd("SET %b %b");
     appendExpire(cmd, expire);
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), _value, (size_t)valueSize));
     assertOnErrorWithCmdMsg(reply->query(), "SET", key);
 }
 //-------------------------------------------GET-----------------------------------------
+signed __int64 Connection::returnInt(const char * key, const char * cmd, const redisReply * reply)
+{
+    assertOnErrorWithCmdMsg(reply, cmd, key);
+    assertKey(reply, key);
+    if (reply->type == REDIS_REPLY_INTEGER)
+        return reply->integer;
+
+    fail(cmd, "expected RESP integer from redis", key);
+    throwUnexpected(); //stop compiler complaining
+}
 //--OUTER--
+template<class type> void SyncRGetNumeric(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cacheConnections)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout, cacheConnections);
+    master->getNumeric(ctx, key, returnValue);
+}
 template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cacheConnections)
 {
     Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout, cacheConnections);
@@ -591,10 +640,39 @@ template<class type> void SyncRGet(ICodeContext * ctx, const char * options, con
     Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, _timeout, cacheConnections);
     master->get(ctx, key, returnSize, returnValue);
 }
+void Connection::fromStr(const char * str, const char * key, double & ret)
+{
+    char * end = nullptr;
+    ret = strtod(str, &end);
+    if (errno == ERANGE)
+        fail("GetReal", "value returned out of range", key);
+}
+void Connection::fromStr(const char * str, const char * key, signed __int64 & ret)
+{
+    char* end = nullptr;
+    ret = strtoll(str, &end, 10);
+    if (errno == ERANGE)
+        fail("GetInteger", "value returned out of range", key);
+}
+void Connection::fromStr(const char * str, const char * key, unsigned __int64 & ret)
+{
+    char* end = nullptr;
+    ret = strtoull(str, &end, 10);
+    if (errno == ERANGE)
+        fail("GetUnsigned", "value returned out of range", key);
+}
 //--INNER--
+template<class type> void Connection::getNumeric(ICodeContext * ctx, const char * key, type & returnValue)
+{
+    OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
+
+    assertOnErrorWithCmdMsg(reply->query(), "GET", key);
+    assertKey(reply->query(), key);
+    fromStr(reply->query()->str, key, returnValue);
+}
 template<class type> void Connection::get(ICodeContext * ctx, const char * key, type & returnValue)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+    OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
 
     assertOnErrorWithCmdMsg(reply->query(), "GET", key);
     assertKey(reply->query(), key);
@@ -609,7 +687,7 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
 }
 template<class type> void Connection::get(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
+    OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
 
     assertOnErrorWithCmdMsg(reply->query(), "GET", key);
     assertKey(reply->query(), key);
@@ -625,7 +703,7 @@ unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChann
     else
         channel.set(keyOrChannel);
 
-    OwnedReply reply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
+    OwnedReply reply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), message, (size_t)messageSize));
     assertOnErrorWithCmdMsg(reply->query(), "PUBLISH", channel.str());
     if (reply->query()->type == REDIS_REPLY_INTEGER)
     {
@@ -646,12 +724,12 @@ void Connection::subscribe(ICodeContext * ctx, const char * keyOrChannel, size_t
 
 #if(0)//Replicate a lingering SUBSCRIBE to test channel comparison test when reading message.
     {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE oldChannel"));
+    OwnedReply reply = Reply::createReply(redisCommand("SUBSCRIBE oldChannel"));
     assertOnErrorWithCmdMsg(reply->query(), "Test lingering SUBSCRIBE", "oldChannel");
     }
 #endif
 
-    OwnedReply reply = Reply::createReply(redisCommand(context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
+    OwnedReply reply = Reply::createReply(redisCommand("SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
     assertOnErrorWithCmdMsg(reply->query(), "SUBSCRIBE", channel.str());
     if (reply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", reply->query()->element[0]->str) != 0 )
         fail("SUBSCRIBE", "failed to register SUB", channel.str());
@@ -720,6 +798,11 @@ ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const
     Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout, cacheConnections);
     return master->dbSize(ctx);
 }
+ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRINCRBY(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, const char * password, unsigned timeout, bool cacheConnections)
+{
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout, cacheConnections);
+    return master->incrBy(ctx, key, value);
+}
 //-----------------------------------SET------------------------------------------
 ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cacheConnections)
 {
@@ -731,15 +814,18 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char *
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cacheConnections)
 {
-    SyncRSet(ctx, options, key, value, database, expire, password, timeout, cacheConnections);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout, cacheConnections);
+    master->setInt(ctx, key, value, expire, false);
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cacheConnections)
 {
-    SyncRSet(ctx, options, key, value, database, expire, password, timeout, cacheConnections);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout, cacheConnections);
+    master->setInt(ctx, key, value, expire, true);
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cacheConnections)
 {
-    SyncRSet(ctx, options, key, value, database, expire, password, timeout, cacheConnections);
+    Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, database, password, timeout, cacheConnections);
+    master->setReal(ctx, key, value, expire);
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cacheConnections)
 {
@@ -763,19 +849,19 @@ ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char *
 ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cacheConnections)
 {
     double value;
-    SyncRGet(ctx, options, key, value, database, password, timeout, cacheConnections);
+    SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cacheConnections);
     return value;
 }
 ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cacheConnections)
 {
     signed __int64 value;
-    SyncRGet(ctx, options, key, value, database, password, timeout, cacheConnections);
+    SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cacheConnections);
     return value;
 }
 ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cacheConnections)
 {
     unsigned __int64 value;
-    SyncRGet(ctx, options, key, value, database, password, timeout, cacheConnections);
+    SyncRGetNumeric(ctx, options, key, value, database, password, timeout, cacheConnections);
     return value;
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cacheConnections)
@@ -843,7 +929,7 @@ bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel
     StringBuffer cmd("SET %b %b NX PX ");
     cmd.append(expire);
 
-    OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
+    OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), channel, strlen(channel)));
     assertOnErrorWithCmdMsg(reply->query(), cmd.str(), key);
 
     return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
@@ -888,7 +974,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
 
 #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
     {
-    OwnedReply pubReply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
+    OwnedReply pubReply = Reply::createReply(redisCommand("DEL %b", key, strlen(key)));
     assertOnError(pubReply->query(), "del fail");
     }
 #endif
@@ -896,23 +982,23 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     //SUB before GET
     //Requires separate connection from GET so that the replies are not mangled. This could be averted
     Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeLeft());
-    OwnedReply subReply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
+    OwnedReply subReply = Reply::createReply(subConnection->redisCommand("SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
     //Defer checking of reply/connection errors until actually needed.
 
 #if(0)//Test publish before GET.
     {
-    OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
+    OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
     assertOnError(pubReply->query(), "pub fail");
     }
 #endif
 
     //Now GET
-    OwnedReply getReply = Reply::createReply((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
+    OwnedReply getReply = Reply::createReply((redisReply*)redisCommand("GET %b", key, strlen(key)));
     assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
 
 #if(0)//Test publish after GET.
     {
-    OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
+    OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
     assertOnError(pubReply->query(), "pub fail");
     }
 #endif
@@ -983,21 +1069,21 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
         if (expire == 0)
         {
             const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
-            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+            replyContainer->setClear(redisCommand("EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
-                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
+                replyContainer->setClear(redisCommand("EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
         }
         else
         {
             const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear(redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+            replyContainer->setClear(redisCommand("EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             {
                 const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
-                replyContainer->setClear(redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
+                replyContainer->setClear(redisCommand("EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
         }
         assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);

+ 2 - 0
plugins/redis/redis.hpp

@@ -70,6 +70,8 @@ namespace RedisPlugin {
     ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * _ctx, const char * keyOrChannel, size32_t messageLength, const char * message, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey, bool cacheConnections);
     ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * _ctx, size32_t & messageLength, char * & message, const char * keyOrChannel, const char * options, int database, const char * pswd, unsigned timeout, bool lockedKey, bool cacheConnections);
 
+    ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRINCRBY(ICodeContext * _ctx, const char * key, signed __int64 value, const char * options, int database, const char * pswd, unsigned timeout, bool cacheConnections);
+
     //--------------------------------AUXILLARIES---------------------------
     ECL_REDIS_API bool             ECL_REDIS_CALL RExist  (ICodeContext * _ctx, const char * key, const char * options, int database, const char * pswd, unsigned timeout, bool cacheConnections);
     ECL_REDIS_API void             ECL_REDIS_CALL RClear  (ICodeContext * _ctx, const char * options, int database, const char * pswd, unsigned timeout, bool cacheConnections);

+ 6 - 0
testing/regress/ecl/key/rediserrortests.xml

@@ -31,3 +31,9 @@
 <Dataset name='Result 11'>
  <Row><value>Timed Out</value></Row>
 </Dataset>
+<Dataset name='Result 12'>
+ <Row><value>1</value></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><value>1</value></Row>
+</Dataset>

+ 20 - 2
testing/regress/ecl/key/redissynctest.xml

@@ -5,7 +5,7 @@
  <Row><Result_2>3.14159265359</Result_2></Row>
 </Dataset>
 <Dataset name='Result 3'>
- <Row><Result_3>4614256656552046314</Result_3></Row>
+ <Row><Result_3>3</Result_3></Row>
 </Dataset>
 <Dataset name='Result 4'>
  <Row><Result_4>9.869604401090658</Result_4></Row>
@@ -47,7 +47,7 @@
  <Row><Result_16>D790D791D792D793D794D795D796D798D799D79AD79BD79CD79DD79DD79ED79FD7A0D7A1D7A2D7A3D7A4D7A5D7A6D7A7D7A8D7A9D7AA</Result_16></Row>
 </Dataset>
 <Dataset name='Result 17'>
- <Row><Result_17>7523094288207667809</Result_17></Row>
+ <Row><Result_17>0</Result_17></Row>
 </Dataset>
 <Dataset name='Result 18'>
  <Row><Result_18>false</Result_18></Row>
@@ -94,3 +94,21 @@
 <Dataset name='Result 32'>
  <Row><Result_32>OK</Result_32></Row>
 </Dataset>
+<Dataset name='Result 33'>
+ <Row><Result_33>21</Result_33></Row>
+</Dataset>
+<Dataset name='Result 34'>
+ <Row><Result_34>21</Result_34></Row>
+</Dataset>
+<Dataset name='Result 35'>
+ <Row><Result_35>489</Result_35></Row>
+</Dataset>
+<Dataset name='Result 36'>
+ <Row><Result_36>489</Result_36></Row>
+</Dataset>
+<Dataset name='Result 37'>
+ <Row><Result_37>611</Result_37></Row>
+</Dataset>
+<Dataset name='Result 38'>
+ <Row><Result_38>611</Result_38></Row>
+</Dataset>

+ 16 - 0
testing/regress/ecl/rediserrortests.ecl

@@ -126,4 +126,20 @@ SEQUENTIAL(
                                                                          'Timed Out', 'Unexpected Error - ' + FAILMESSAGE)))));
     );
 
+STRING pluginIntExpected := 'Redis Plugin: ERROR - INCRBY \'testINCRBY1\' on database 0 for 127.0.0.1:6379 failed : ERR value is not an integer or out of range';
+dsINCRBY := DATASET(NOFOLD(1), TRANSFORM({INTEGER value}, SELF.value := myRedis.INCRBY('testINCRBY' + (string)COUNTER, 11)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.setString('testINCRBY1', 'this is a string and not an integer'),
+    OUTPUT(CATCH(dsINCRBY, ONFAIL(TRANSFORM({ INTEGER value }, SELF.value := IF(FAILMESSAGE = pluginIntExpected, 1, 0) ))));
+    );
+
+STRING erange := 'Redis Plugin: ERROR - INCRBY \'testINCRBY11\' on database 0 for 127.0.0.1:6379 failed : ERR value is not an integer or out of range';
+dsINCRBY2 := DATASET(NOFOLD(1), TRANSFORM({INTEGER value}, SELF.value := myRedis.INCRBY('testINCRBY1' + (string)COUNTER, -11)));
+SEQUENTIAL(
+    myRedis.FlushDB();
+    myRedis.setUnsigned('testINCRBY11', -1),
+    OUTPUT(CATCH(dsINCRBY2, ONFAIL(TRANSFORM({ INTEGER value }, SELF.value := IF(FAILMESSAGE = erange, 1, 0) ))));
+    );
+
 myRedis.FlushDB();

+ 21 - 0
testing/regress/ecl/redissynctest.ecl

@@ -189,5 +189,26 @@ SEQUENTIAL(
     );
     // N*3 > result < N*4 => all subs received a pub, however, there were result-N*3 subs still open for the second pub. result > N*4 => gremlins.
 
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.SetInteger('testINCRBY', 10),
+    myRedis.INCRBY('testINCRBY', 11),
+    myRedis.GetInteger('testINCRBY')
+);
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.SetString('testINCRBY2', '500'),
+    myRedis.INCRBY('testINCRBY2', -11),
+    myRedis.GetInteger('testINCRBY2')
+);
+
+SEQUENTIAL(
+    myRedis.FlushDB(),
+    myRedis.SetUnsigned('testINCRBY3', 600),
+    myRedis.INCRBY('testINCRBY3', 11),
+    myRedis.GetUnsigned('testINCRBY3')
+);
+
 myRedis.FlushDB();
 myRedis2.FlushDB();