Parcourir la source

HPCC-14085 Redis Plugin - Add independent lock expiration

Currently a locked key will expire with the same duration as that passed
to the function as its timeout.

Signed-off-by: James Noss <james.noss@lexisnexis.com>
James Noss il y a 9 ans
Parent
commit
af359a8fd9

+ 1 - 1
plugins/redis/README.md

@@ -17,7 +17,7 @@ The redis server and client software can be obtained via either - [binaries](htt
 sudo apt-get install redis-server
 ```
 
-*Note:* redis-server 2.6.12 or greater is required to use this plugin as intended. For efficiency, such version requirments are not checked as this is a runtime check only. The use of a
+*Note:* redis-server 2.6.12 or greater is required to use this plugin as intended. For efficiency, such version requirements are not checked as this is a runtime check only. The use of a
 lesser version will result in an exception, normally indicating that either a given command does not exist or that the wrong number of arguments was passed to it. The Set<type>
 plugin functions will not work when setting with an expiration for a version less than 2.6.12. In addition, whilst it is possible to use `Expire` with a version less than
 2.1.3 it is not advised due to [the change in its semantics](http://redis.io/commands/expire).

+ 6 - 6
plugins/redis/lib_redis.ecllib

@@ -47,9 +47,9 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
 
-  STRING      GetOrLockString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetStr';
-  UNICODE    GetOrLockUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetUChar';
-  UTF8          GetOrLockUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetUtf8';
+  STRING      GetOrLockString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetStr';
+  UNICODE    GetOrLockUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetUChar';
+  UTF8          GetOrLockUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetUtf8';
 
   Unlock(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,action,context,entrypoint='SyncLockRUnlock';
 END;
@@ -85,9 +85,9 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED timeout
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
 
-  EXPORT  GetOrLockUnicode(VARSTRING key, UNSIGNED database = 0) :=  redis.GetOrLockUnicode(key, options, database, password, timeout);
-  EXPORT   GetOrLockString(VARSTRING key, UNSIGNED database = 0) :=   redis.GetOrLockString(key, options, database, password, timeout);
-  EXPORT     GetOrLockUtf8(VARSTRING key, UNSIGNED database = 0) :=     redis.GetOrLockUtf8(key, options, database, password, timeout);
+  EXPORT  GetOrLockUnicode(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=  redis.GetOrLockUnicode(key, options, database, password, timeout, expire);
+  EXPORT   GetOrLockString(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=   redis.GetOrLockString(key, options, database, password, timeout, expire);
+  EXPORT     GetOrLockUtf8(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=     redis.GetOrLockUtf8(key, options, database, password, timeout, expire);
 
   EXPORT Unlock(VARSTRING key, UNSIGNED database = 0) := redis.Unlock(key, options, database, password, timeout);
 END;

+ 17 - 17
plugins/redis/redis.cpp

@@ -102,7 +102,7 @@ public :
 
     //-------------------------------LOCKING------------------------------------------------
     void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
-    void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password);
+    void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password, unsigned expire);
     void unlock(ICodeContext * ctx, const char * key);
     //--------------------------------------------------------------------------------------
 
@@ -134,10 +134,10 @@ protected :
 
     //-------------------------------LOCKING------------------------------------------------
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
-    void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password);
+    void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
     void encodeChannel(StringBuffer & channel, const char * key) const;
     bool noScript(const redisReply * reply) const;
-    bool lock(ICodeContext * ctx, const char * key, const char * channel);
+    bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
     //--------------------------------------------------------------------------------------
 
 protected :
@@ -672,16 +672,16 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
 }
 //-------------------------------------------GET-----------------------------------------
 //--OUTER--
-void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
+void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
 {
     Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
-    master->lockGet(ctx, key, returnSize, returnValue, password);
+    master->lockGet(ctx, key, returnSize, returnValue, password, expire);
 }
 //--INNER--
-void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password)
+void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password, unsigned expire)
 {
     MemoryAttr retVal;
-    handleLockOnGet(ctx, key, &retVal, password);
+    handleLockOnGet(ctx, key, &retVal, password, expire);
     returnSize = retVal.length();
     returnValue = reinterpret_cast<char*>(retVal.detach());
 }
@@ -690,10 +690,10 @@ void Connection::encodeChannel(StringBuffer & channel, const char * key) const
 {
     channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
 }
-bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel)
+bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
     StringBuffer cmd("SET %b %b NX EX ");
-    cmd.append(timeout/1000);
+    cmd.append(expire);
 
     OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
     assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
@@ -727,7 +727,7 @@ void Connection::unlock(ICodeContext * ctx, const char * key)
     }
     //If the above is aborted, let the lock expire.
 }
-void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password)
+void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire)
 {
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
 
@@ -735,7 +735,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     encodeChannel(channel, key);
 
     //Query key and set lock if non existent
-    if (lock(ctx, key, channel.str()))
+    if (lock(ctx, key, channel.str(), expire))
         return;
 
 #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
@@ -916,24 +916,24 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t
     returnValue = (char*)allocateAndCopy(value, valueSize);
 }
 //-------------------------------------GET----------------------------------------
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
     size_t _returnSize;
-    SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout);
     returnSize = static_cast<size32_t>(_returnSize);
 }
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
     size_t returnSize;
     char  * _returnValue;
-    SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, expire, password, timeout);
     returnValue = (UChar*)_returnValue;
     returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
 }
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
     size_t returnSize;
-    SyncLockRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout);
     returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)

+ 3 - 3
plugins/redis/redis.hpp

@@ -74,9 +74,9 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetStr  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     //--------------------------GET----------------------------------------
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
 
     ECL_REDIS_API bool ECL_REDIS_CALL SyncLockRMissThenLock(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout);
 }

+ 1 - 1
testing/regress/ecl/redislockingtest.ecl

@@ -111,7 +111,7 @@ SEQUENTIAL(
 //Test unlock
 SEQUENTIAL(
     myRedis.FlushDB(),
-    myRedis.GetOrLockString('testlock'),/*by default lock expires after 1s*/
+    myRedis.GetOrLockString('testlock',, 1000),
     myRedis.Exists('testlock'),
     Std.System.Debug.Sleep(2000),
     myRedis.Exists('testlock'),