|
@@ -44,7 +44,11 @@ static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
|
|
|
static __thread Connection * cachedConnection;
|
|
|
static __thread ThreadTermFunc threadHookChain;
|
|
|
|
|
|
-StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
|
|
|
+static void * allocateAndCopy(const void * src, size_t size)
|
|
|
+{
|
|
|
+ return memcpy(rtlMalloc(size), src, size);
|
|
|
+}
|
|
|
+static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
|
|
|
{
|
|
|
if (expire > 0)
|
|
|
buffer.append(" EX ").append(expire/1000);
|
|
@@ -112,20 +116,20 @@ protected :
|
|
|
void parseOptions(ICodeContext * ctx, const char * _options);
|
|
|
void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
|
|
|
void selectDB(ICodeContext * ctx, unsigned __int64 _database);
|
|
|
- void authenticate(ICodeContext * ctx, const char * password);
|
|
|
void resetContextErr();
|
|
|
void readReply(Reply * reply);
|
|
|
void readReplyAndAssert(Reply * reply, const char * msg);
|
|
|
void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
|
|
|
void assertKey(const redisReply * reply, const char * key);
|
|
|
+ void assertAuthorization(const redisReply * reply);
|
|
|
void assertOnError(const redisReply * reply, const char * _msg);
|
|
|
void assertOnCommandError(const redisReply * reply, const char * cmd);
|
|
|
void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
|
|
|
void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
|
|
|
void assertConnection();
|
|
|
void updateTimeout(unsigned __int64 _timeout);
|
|
|
- void * allocateAndCopy(const char * src, size_t size);
|
|
|
- bool isSameConnection(ICodeContext * ctx, const char * password) const;
|
|
|
+ static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
|
|
|
+ bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
|
|
|
|
|
|
//-------------------------------LOCKING------------------------------------------------
|
|
|
void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
|
|
@@ -146,11 +150,11 @@ protected :
|
|
|
|
|
|
//The following class is here to ensure destruction of the cachedConnection within the main thread
|
|
|
//as this is not handled by the thread hook mechanism.
|
|
|
-static class mainThreadCachedConnection
|
|
|
+static class MainThreadCachedConnection
|
|
|
{
|
|
|
public :
|
|
|
- mainThreadCachedConnection() { }
|
|
|
- ~mainThreadCachedConnection()
|
|
|
+ MainThreadCachedConnection() { }
|
|
|
+ ~MainThreadCachedConnection()
|
|
|
{
|
|
|
if (cachedConnection)
|
|
|
cachedConnection->Release();
|
|
@@ -171,10 +175,8 @@ static void releaseContext()
|
|
|
}
|
|
|
}
|
|
|
Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
|
- : database(0), timeout(_timeout), port(0)
|
|
|
+ : database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
|
|
|
{
|
|
|
- serverIpPortPasswordHash = hashc((const unsigned char*)password, strlen(password), 0);
|
|
|
- serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
|
|
|
options.set(_options, strlen(_options));
|
|
|
parseOptions(ctx, _options);
|
|
|
connect(ctx, _database, password);
|
|
@@ -188,7 +190,7 @@ Connection::Connection(ICodeContext * ctx, const char * _options, const char * _
|
|
|
}
|
|
|
void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
|
|
|
{
|
|
|
- struct timeval to = { timeout/1000, timeout%1000 };
|
|
|
+ struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
|
context = redisConnectWithTimeout(ip.str(), port, to);
|
|
|
redisSetTimeout(context, to);
|
|
|
assertConnection();
|
|
@@ -196,11 +198,11 @@ void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const c
|
|
|
//The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
|
|
|
//such that they may be pipelined to save an extra round trip to the server and back.
|
|
|
if (password && *password)
|
|
|
- redisAppendCommand(context, "AUTH %b", password, strlen(password));
|
|
|
+ redisAppendCommand(context, "AUTH %b", password, strlen(password));
|
|
|
|
|
|
if (database != _database)
|
|
|
{
|
|
|
- VStringBuffer cmd("SELECT %" I64F "u", database);
|
|
|
+ VStringBuffer cmd("SELECT %" I64F "u", _database);
|
|
|
redisAppendCommand(context, cmd.str());
|
|
|
}
|
|
|
|
|
@@ -215,15 +217,13 @@ void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const c
|
|
|
database = _database;
|
|
|
}
|
|
|
}
|
|
|
-bool Connection::isSameConnection(ICodeContext * ctx, const char * password) const
|
|
|
+bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
|
|
|
{
|
|
|
- unsigned hash = hashc((const unsigned char*)options.str(), options.length(), hashc((const unsigned char*)password, strlen(password), 0));
|
|
|
- return (serverIpPortPasswordHash == hash);
|
|
|
+ return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
|
|
|
}
|
|
|
-void * Connection::allocateAndCopy(const char * src, size_t size)
|
|
|
+unsigned Connection::hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password)
|
|
|
{
|
|
|
- void * value = rtlMalloc(size);
|
|
|
- return memcpy(value, src, size);
|
|
|
+ return hashc((const unsigned char*)_options, strlen(_options), hashc((const unsigned char*)password, strlen(password), 0));
|
|
|
}
|
|
|
void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
|
{
|
|
@@ -259,15 +259,6 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
|
ctx->logString(msg.str());
|
|
|
}
|
|
|
}
|
|
|
- return;
|
|
|
-}
|
|
|
-void Connection::authenticate(ICodeContext * ctx, const char * password)
|
|
|
-{
|
|
|
- if (password && *password)
|
|
|
- {
|
|
|
- OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", password, strlen(password)));
|
|
|
- assertOnError(reply->query(), "server authentication failed");
|
|
|
- }
|
|
|
}
|
|
|
void Connection::resetContextErr()
|
|
|
{
|
|
@@ -302,7 +293,7 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
|
return LINK(cachedConnection);
|
|
|
}
|
|
|
|
|
|
- if (cachedConnection->isSameConnection(ctx, password))
|
|
|
+ if (cachedConnection->isSameConnection(ctx, options, password))
|
|
|
{
|
|
|
//MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
|
|
|
//At present updateTimeout calls assertConnection.
|
|
@@ -313,6 +304,7 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
|
}
|
|
|
|
|
|
cachedConnection->Release();
|
|
|
+ cachedConnection = NULL;
|
|
|
cachedConnection = new Connection(ctx, options, _database, password, _timeout);
|
|
|
return LINK(cachedConnection);
|
|
|
}
|
|
@@ -331,7 +323,7 @@ void Connection::updateTimeout(unsigned __int64 _timeout)
|
|
|
return;
|
|
|
assertConnection();
|
|
|
timeout = _timeout;
|
|
|
- struct timeval to = { timeout/1000, timeout%1000 };
|
|
|
+ struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
|
assertex(context);
|
|
|
if (redisSetTimeout(context, to) != REDIS_OK)
|
|
|
{
|
|
@@ -346,7 +338,7 @@ void Connection::updateTimeout(unsigned __int64 _timeout)
|
|
|
}
|
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
|
{
|
|
|
- if (!reply)//assertex(reply)?
|
|
|
+ if (!reply)//MORE: should this be assertex(reply) instead?
|
|
|
{
|
|
|
//There should always be a context error if no reply error
|
|
|
assertConnection();
|
|
@@ -355,21 +347,14 @@ void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
|
}
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
+ assertAuthorization(reply);
|
|
|
+ VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
}
|
|
|
}
|
|
|
void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
|
|
|
{
|
|
|
- if (!reply)//assertex(reply)?
|
|
|
+ if (!reply)//MORE: should this be assertex(reply) instead?
|
|
|
{
|
|
|
//There should always be a context error if no reply error
|
|
|
assertConnection();
|
|
@@ -378,16 +363,9 @@ void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const cha
|
|
|
}
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
+ assertAuthorization(reply);
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
}
|
|
|
}
|
|
|
void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
|
|
@@ -401,16 +379,9 @@ void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, cons
|
|
|
}
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
+ assertAuthorization(reply);
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
}
|
|
|
}
|
|
|
void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
|
|
@@ -424,16 +395,17 @@ void Connection::assertOnCommandError(const redisReply * reply, const char * cmd
|
|
|
}
|
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
{
|
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
|
|
|
- rtlFail(0, msg.str());
|
|
|
- }
|
|
|
+ assertAuthorization(reply);
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
+ }
|
|
|
+}
|
|
|
+void Connection::assertAuthorization(const redisReply * reply)
|
|
|
+{
|
|
|
+ if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
+ rtlFail(0, msg.str());
|
|
|
}
|
|
|
}
|
|
|
void Connection::assertKey(const redisReply * reply, const char * key)
|
|
@@ -706,7 +678,7 @@ void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSi
|
|
|
//---------------------------------------------------------------------------------------
|
|
|
void Connection::encodeChannel(StringBuffer & channel, const char * key) const
|
|
|
{
|
|
|
- channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database).append("_").append(ip.str()).append("_").append(port);
|
|
|
+ channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
|
|
|
}
|
|
|
bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel)
|
|
|
{
|
|
@@ -716,9 +688,7 @@ bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel
|
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
|
|
|
assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
|
|
|
|
|
|
- if (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0)
|
|
|
- return true;
|
|
|
- return false;
|
|
|
+ return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
|
|
|
}
|
|
|
void Connection::unlock(ICodeContext * ctx, const char * key)
|
|
|
{
|
|
@@ -796,9 +766,9 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
else
|
|
|
{
|
|
|
//Check that we SUBSCRIBEd to the correct channel (which could have been manually SET).
|
|
|
- if (strcmp(reply->query()->str, channel) !=0 )
|
|
|
+ if (strcmp(reply->query()->str, channel.str()) !=0 )
|
|
|
{
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to.", key, database, reply->query()->str);
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to (%s).", key, database, reply->query()->str, channel.str());
|
|
|
rtlFail(0, msg.str());
|
|
|
//MORE: We could attempt to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
//However, we may have missed the massage publication already or by then.
|
|
@@ -855,25 +825,25 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
|
|
|
// ECL SERVICE ENTRYPOINTS
|
|
|
//--------------------------------------------------------------------------------
|
|
|
//-----------------------------------SET------------------------------------------
|
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
{
|
|
|
- SyncLockRSet(ctx, options, key, valueSize, value, expire, database, password, timeout);
|
|
|
- returnSize = valueSize;
|
|
|
- returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
|
|
|
+ SyncLockRSet(ctx, options, key, valueLength, value, expire, database, password, timeout);
|
|
|
+ returnLength = valueLength;
|
|
|
+ returnValue = (char*)allocateAndCopy(value, valueLength);
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
{
|
|
|
unsigned valueSize = (valueLength)*sizeof(UChar);
|
|
|
SyncLockRSet(ctx, options, key, valueSize, (char*)value, expire, database, password, timeout);
|
|
|
returnLength= valueLength;
|
|
|
- returnValue = (UChar*)memcpy(rtlMalloc(valueSize), (void*)value, valueSize);
|
|
|
+ returnValue = (UChar*)allocateAndCopy(value, valueSize);
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
{
|
|
|
unsigned valueSize = rtlUtf8Size(valueLength, value);
|
|
|
SyncLockRSet(ctx, options, key, valueSize, value, expire, database, password, timeout);
|
|
|
returnLength = valueLength;
|
|
|
- returnValue = (char*)memcpy(rtlMalloc(valueSize), value, valueSize);
|
|
|
+ returnValue = (char*)allocateAndCopy(value, valueSize);
|
|
|
}
|
|
|
//-------------------------------------GET----------------------------------------
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|