|
@@ -401,7 +401,10 @@ void Connection::fail(const char * cmd, const char * errmsg, const char * key)
|
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
|
{
|
|
|
if (!reply)
|
|
|
+ {
|
|
|
assertConnection(_msg);
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
assertAuthorization(reply);
|
|
@@ -412,7 +415,10 @@ void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
|
void Connection::assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key)
|
|
|
{
|
|
|
if (!reply)
|
|
|
+ {
|
|
|
assertConnectionWithCmdMsg(cmd, key);
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
assertAuthorization(reply);
|
|
@@ -769,10 +775,8 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
//SUB before GET
|
|
|
//Requires separate connection from GET so that the replies are not mangled. This could be averted
|
|
|
Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeLeft());
|
|
|
- OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
|
|
|
- assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
|
- if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
|
|
|
- fail("GetOrLock<type>", "failed to register SUB", key);
|
|
|
+ OwnedReply subReply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
|
|
|
+ //Defer checking of reply/connection errors until actually needed.
|
|
|
|
|
|
#if(0)//Test publish before GET.
|
|
|
{
|
|
@@ -782,8 +786,8 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
#endif
|
|
|
|
|
|
//Now GET
|
|
|
- reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
|
|
|
- assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
|
+ OwnedReply getReply = Reply::createReply((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
|
|
|
+ assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
|
|
|
|
|
|
#if(0)//Test publish after GET.
|
|
|
{
|
|
@@ -794,32 +798,38 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
|
|
|
//Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
|
|
|
//failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
|
|
|
- if (reply->query()->type != REDIS_REPLY_NIL && reply->query()->str && strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
+ if (getReply->query()->type != REDIS_REPLY_NIL && getReply->query()->str && strncmp(getReply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
{
|
|
|
- retVal->set(reply->query()->len, reply->query()->str);
|
|
|
+ retVal->set(getReply->query()->len, getReply->query()->str);
|
|
|
return;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
//Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
|
- if (reply->query()->str && strcmp(reply->query()->str, channel.str()) !=0 )
|
|
|
+ if (getReply->query()->str && strcmp(getReply->query()->str, channel.str()) !=0 )
|
|
|
{
|
|
|
- VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", reply->query()->str, channel.str());
|
|
|
+ VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", getReply->query()->str, channel.str());
|
|
|
fail("GetOrLock<type>", msg.str(), key);
|
|
|
//MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
//However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
|
//More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
|
//this plugin, rather than the string retrieved as the lock value (the value of the locked key).
|
|
|
}
|
|
|
+ getReply.clear();
|
|
|
+
|
|
|
#if(0)//Added to allow for manual pub testing via redis-cli
|
|
|
struct timeval to = { 10, 0 };//10secs
|
|
|
::redisSetTimeout(subConnection->context, to);
|
|
|
#endif
|
|
|
+
|
|
|
//Locked so SUBSCRIBE
|
|
|
- subConnection->readReply(reply);
|
|
|
- subConnection->assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
|
+ subConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
|
|
|
+ if (subReply->query()->type != REDIS_REPLY_ARRAY || strcmp("subscribe", subReply->query()->element[0]->str) != 0 )
|
|
|
+ fail("GetOrLock<type>", "failed to register SUB", key);//NOTE: In this instance better to be this->fail rather than subConnection->fail - due to database reported in msg.
|
|
|
+ subConnection->readReply(subReply);
|
|
|
+ subConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
|
|
|
|
|
|
- if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0)
|
|
|
+ if (subReply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", subReply->query()->element[0]->str) == 0)
|
|
|
{
|
|
|
//We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
|
//Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
@@ -827,9 +837,9 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
//More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
|
//which indicates to the caller that the key was successfully locked.
|
|
|
//NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
|
- if (reply->query()->element[2]->len > 0)
|
|
|
+ if (subReply->query()->element[2]->len > 0)
|
|
|
{
|
|
|
- retVal->set(reply->query()->element[2]->len, reply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
+ retVal->set(subReply->query()->element[2]->len, subReply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
return;
|
|
|
}
|
|
|
//fail that key does not exist
|