|
@@ -135,6 +135,7 @@ protected :
|
|
|
void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
|
|
|
void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password);
|
|
|
void encodeChannel(StringBuffer & channel, const char * key) const;
|
|
|
+ bool noScript(const redisReply * reply) const;
|
|
|
bool lock(ICodeContext * ctx, const char * key, const char * channel);
|
|
|
//--------------------------------------------------------------------------------------
|
|
|
|
|
@@ -719,6 +720,8 @@ void Connection::unlock(ICodeContext * ctx, const char * key)
|
|
|
}
|
|
|
void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password)
|
|
|
{
|
|
|
+ //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
|
|
|
+
|
|
|
StringBuffer channel;
|
|
|
encodeChannel(channel, key);
|
|
|
|
|
@@ -726,10 +729,17 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
if (lock(ctx, key, channel.str()))
|
|
|
return;
|
|
|
|
|
|
+#if(0)
|
|
|
+ {
|
|
|
+ OwnedReply pubReply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
|
|
|
+ assertOnError(pubReply->query(), "del fail");
|
|
|
+ }
|
|
|
+#endif
|
|
|
+
|
|
|
//SUB before GET
|
|
|
//Requires separate connection from GET so that the replies are not mangled. This could be averted
|
|
|
Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeout);
|
|
|
- OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), channel.length()));
|
|
|
+ OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
|
|
|
assertOnCommandErrorWithKey(reply->query(), "GET", key);
|
|
|
if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
|
|
|
{
|
|
@@ -739,7 +749,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
|
|
|
#if(0)
|
|
|
{
|
|
|
- OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
|
|
|
+ OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", 3));
|
|
|
assertOnError(pubReply->query(), "pub fail");
|
|
|
}
|
|
|
#endif
|
|
@@ -747,34 +757,32 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
//Now GET
|
|
|
reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
|
|
|
assertOnCommandErrorWithKey(reply->query(), "GET", key);
|
|
|
- assertKey(reply->query(), key);
|
|
|
|
|
|
#if(0)
|
|
|
{
|
|
|
- OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), "foo", 3));
|
|
|
+ OwnedReply pubReply = Reply::createReply(redisCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", 3));
|
|
|
assertOnError(pubReply->query(), "pub fail");
|
|
|
}
|
|
|
#endif
|
|
|
|
|
|
- //Check if returned value is locked
|
|
|
- if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
+ //Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
|
|
|
+ //failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
|
|
|
+ if (reply->query()->type != REDIS_REPLY_NIL && reply->query()->str && strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
{
|
|
|
- //Not locked so return value
|
|
|
retVal->set(reply->query()->len, reply->query()->str);
|
|
|
return;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- //Check that we SUBSCRIBEd to the correct channel (which could have been manually SET).
|
|
|
- if (strcmp(reply->query()->str, channel.str()) !=0 )
|
|
|
+ //Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
|
+ if (reply->query()->str && strcmp(reply->query()->str, channel.str()) !=0 )
|
|
|
{
|
|
|
VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to (%s).", key, database, reply->query()->str, channel.str());
|
|
|
rtlFail(0, msg.str());
|
|
|
- //MORE: We could attempt to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
- //However, we may have missed the massage publication already or by then.
|
|
|
- //If we ever changed the semantics of the 'timeout' to be that of these plugin functions rather than each redis call, we might as well
|
|
|
- //subscribe again if there was time left on the clock.
|
|
|
- //Since they are not, we could, though is this desirable behaviour?
|
|
|
+ //MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
+ //However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
|
+ //More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
|
+ //this plugin, rather than the string retrieved as the lock value (the value of the locked key).
|
|
|
}
|
|
|
#if(0)//Added to allow for manual pub testing via redis-cli
|
|
|
struct timeval to = { 10, 0 };//10secs
|
|
@@ -789,37 +797,91 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
assertOnCommandErrorWithKey(nakedReply, "GET", key);
|
|
|
if (nakedReply->type == REDIS_REPLY_ARRAY && strcmp("message", nakedReply->element[0]->str) == 0)
|
|
|
{
|
|
|
- retVal->set(nakedReply->element[2]->len, nakedReply->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
- return;
|
|
|
+ //We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
|
+ //Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
|
+ //string is equivalent to a non-existent key.
|
|
|
+ //More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
|
+ //which indicates to the caller that the key was successfully locked.
|
|
|
+ //NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
|
+ const char * message = nakedReply->element[2]->str;
|
|
|
+ if (message && *message)
|
|
|
+ {
|
|
|
+ retVal->set(nakedReply->element[2]->len, nakedReply->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %" I64F "u", key, database);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
}
|
|
|
}
|
|
|
throwUnexpected();
|
|
|
}
|
|
|
void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
|
|
|
{
|
|
|
- StringBuffer cmd("SET %b %b");
|
|
|
- RedisPlugin::appendExpire(cmd, expire);
|
|
|
-
|
|
|
//Due to locking logic surfacing into ECL, any locking.set (such as this is) assumes that they own the lock and therefore go ahead and set regardless.
|
|
|
- //It is possible for a process/call to 'own' a lock and store this info in the LockObject, however, this prevents sharing between clients.
|
|
|
- redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
|
|
|
StringBuffer channel;
|
|
|
encodeChannel(channel, key);
|
|
|
- redisAppendCommand(context, "PUBLISH %b %b", channel.str(), channel.length(), value, size);//PUB
|
|
|
|
|
|
- //Now read and assert replies
|
|
|
- OwnedReply replyContainer = new Reply();
|
|
|
- readReplyAndAssertWithKey(replyContainer, "SET", key);//SET reply
|
|
|
- readReplyAndAssertWithKey(replyContainer, "PUB for the key", key);//PUB reply
|
|
|
+ if (size > 29)//c.f. 1st note below.
|
|
|
+ {
|
|
|
+ OwnedReply replyContainer = new Reply();
|
|
|
+ if (expire == 0)
|
|
|
+ {
|
|
|
+ const char * luaScriptSHA1 = "2a4a976d9bbd806756b2c7fc1e2bc2cb905e68c3"; //NOTE: update this if luaScript is updated!
|
|
|
+ replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
|
|
|
+ if (noScript(replyContainer->query()))
|
|
|
+ {
|
|
|
+ const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
|
|
|
+ replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ const char * luaScriptWithExpireSHA1 = "c68d1706d7dc6342d5fc1d651e238931bd75320d"; //NOTE: update this if luaScriptWithExpire is updated!
|
|
|
+ replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
|
|
|
+ if (noScript(replyContainer->query()))
|
|
|
+ {
|
|
|
+ const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'EX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
|
|
|
+ replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringBuffer cmd("SET %b %b");
|
|
|
+ RedisPlugin::appendExpire(cmd, expire);
|
|
|
+ redisAppendCommand(context, "MULTI");
|
|
|
+ redisAppendCommand(context, cmd.str(), key, strlen(key), value, size);//SET
|
|
|
+ redisAppendCommand(context, "PUBLISH %b %b", channel.str(), (size_t)channel.length(), value, size);//PUB
|
|
|
+ redisAppendCommand(context, "EXEC");
|
|
|
+
|
|
|
+ //Now read and assert replies
|
|
|
+ OwnedReply reply = new Reply();
|
|
|
+ readReplyAndAssertWithKey(reply, "SET", key);//MULTI reply
|
|
|
+ readReplyAndAssertWithKey(reply, "SET", key);//SET reply
|
|
|
+ readReplyAndAssertWithKey(reply, "PUB for the key", key);//PUB reply
|
|
|
+ readReplyAndAssertWithKey(reply, "SET", key);//EXEC reply
|
|
|
+ }
|
|
|
|
|
|
- //NOTE: Pipelining the above commands may not be the desired behaviour but instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
|
|
|
- //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. However, this may not be expected behaviour
|
|
|
- //as it's now possible for the key-value to actually exists in the cache when it was retrieved via redis plugin get function. This is documented in the README.
|
|
|
+ //NOTE: When setting and publishing the data with a pipelined MULTI-SET-PUB-EXEC, the data is sent twice, once with the SET and again with the PUBLISH.
|
|
|
+ //To prevent this, send the data to the server only once with a server-side lua script that then sets and publishes the data from the server.
|
|
|
+ //However, there is a transmission overhead for this method that may still be larger than sending the data twice if it is small enough.
|
|
|
+ //multi-set-pub-exec (via strings) has a transmission length of - "MULTI SET" + key + value + "PUBLISH" + channel + value = 5 + 3 + key + 7 + value + channel + value + 4
|
|
|
+ //The lua script (assuming the script already exists on the server) a length of - "EVALSHA" + digest + "1" + key + channel + value = 7 + 40 + 1 + key + channel + value
|
|
|
+ //Therefore, they have same length when: 19 + value = 48 => value = 29.
|
|
|
+
|
|
|
+ //NOTE: Pipelining the above commands may not be the expected behaviour, instead only PUBLISH upon a successful SET. Doing both regardless, does however ensure
|
|
|
+ //(assuming only the SET fails) that any subscribers do in fact get their requested key-value even if the SET fails. This may not be expected behaviour
|
|
|
+ //as it is now possible for the key-value to NOT actually exist in the cache though it was retrieved via a redis plugin get function. This is documented in the README.
|
|
|
//Further more, it is possible that the locked value and thus the channel stored within the key is not that expected, i.e. computed via encodeChannel() (e.g.
|
|
|
//if set by a non-conforming external client/process). It is however, possible to account for this via using a GETSET instead of just the SET. This returns the old
|
|
|
//value stored, this can then be checked if it is a lock (i.e. has at least the "redis_key_lock prefix"), if it doesn't, PUB on the channel from encodeChannel(),
|
|
|
//otherwise PUB on the value retrieved from GETSET or possibly only if it at least has the prefix "redis_key_lock".
|
|
|
- //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing.
|
|
|
+ //This would however, prevent the two commands from being pipelined, as the GETSET would need to return before publishing. It would also mean sending the data twice.
|
|
|
+}
|
|
|
+bool Connection::noScript(const redisReply * reply) const
|
|
|
+{
|
|
|
+ return (reply && reply->type == REDIS_REPLY_ERROR && strncmp(reply->str, "NOSCRIPT", 8) == 0);
|
|
|
}
|
|
|
//--------------------------------------------------------------------------------
|
|
|
// ECL SERVICE ENTRYPOINTS
|