|
@@ -44,7 +44,12 @@ static const char * REDIS_LOCK_PREFIX = "redis_ecl_lock";
|
|
static __thread Connection * cachedConnection;
|
|
static __thread Connection * cachedConnection;
|
|
static __thread ThreadTermFunc threadHookChain;
|
|
static __thread ThreadTermFunc threadHookChain;
|
|
|
|
|
|
-StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
|
|
|
|
|
|
+static void * allocateAndCopy(const char * src, size_t size)
|
|
|
|
+{
|
|
|
|
+ void * value = rtlMalloc(size);
|
|
|
|
+ return memcpy(value, src, size);
|
|
|
|
+}
|
|
|
|
+static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
|
|
{
|
|
{
|
|
if (expire > 0)
|
|
if (expire > 0)
|
|
buffer.append(" EX ").append(expire/1000);
|
|
buffer.append(" EX ").append(expire/1000);
|
|
@@ -112,20 +117,20 @@ protected :
|
|
void parseOptions(ICodeContext * ctx, const char * _options);
|
|
void parseOptions(ICodeContext * ctx, const char * _options);
|
|
void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
|
|
void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
|
|
void selectDB(ICodeContext * ctx, unsigned __int64 _database);
|
|
void selectDB(ICodeContext * ctx, unsigned __int64 _database);
|
|
- void authenticate(ICodeContext * ctx, const char * password);
|
|
|
|
void resetContextErr();
|
|
void resetContextErr();
|
|
void readReply(Reply * reply);
|
|
void readReply(Reply * reply);
|
|
void readReplyAndAssert(Reply * reply, const char * msg);
|
|
void readReplyAndAssert(Reply * reply, const char * msg);
|
|
void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
|
|
void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
|
|
void assertKey(const redisReply * reply, const char * key);
|
|
void assertKey(const redisReply * reply, const char * key);
|
|
|
|
+ void assertAuthorization(const redisReply * reply);
|
|
void assertOnError(const redisReply * reply, const char * _msg);
|
|
void assertOnError(const redisReply * reply, const char * _msg);
|
|
void assertOnCommandError(const redisReply * reply, const char * cmd);
|
|
void assertOnCommandError(const redisReply * reply, const char * cmd);
|
|
void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
|
|
void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
|
|
void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
|
|
void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
|
|
void assertConnection();
|
|
void assertConnection();
|
|
void updateTimeout(unsigned __int64 _timeout);
|
|
void updateTimeout(unsigned __int64 _timeout);
|
|
- void * allocateAndCopy(const char * src, size_t size);
|
|
|
|
- bool isSameConnection(ICodeContext * ctx, const char * password) const;
|
|
|
|
|
|
+ unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password) const;
|
|
|
|
+ bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
|
|
|
|
|
|
//-------------------------------LOCKING------------------------------------------------
|
|
//-------------------------------LOCKING------------------------------------------------
|
|
void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
|
|
void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
|
|
@@ -146,11 +151,11 @@ protected :
|
|
|
|
|
|
//The following class is here to ensure destruction of the cachedConnection within the main thread
|
|
//The following class is here to ensure destruction of the cachedConnection within the main thread
|
|
//as this is not handled by the thread hook mechanism.
|
|
//as this is not handled by the thread hook mechanism.
|
|
-static class mainThreadCachedConnection
|
|
|
|
|
|
+static class MainThreadCachedConnection
|
|
{
|
|
{
|
|
public :
|
|
public :
|
|
- mainThreadCachedConnection() { }
|
|
|
|
- ~mainThreadCachedConnection()
|
|
|
|
|
|
+ MainThreadCachedConnection() { }
|
|
|
|
+ ~MainThreadCachedConnection()
|
|
{
|
|
{
|
|
if (cachedConnection)
|
|
if (cachedConnection)
|
|
cachedConnection->Release();
|
|
cachedConnection->Release();
|
|
@@ -171,10 +176,8 @@ static void releaseContext()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
- : database(0), timeout(_timeout), port(0)
|
|
|
|
|
|
+ : database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
|
|
{
|
|
{
|
|
- serverIpPortPasswordHash = hashc((const unsigned char*)password, strlen(password), 0);
|
|
|
|
- serverIpPortPasswordHash = hashc((const unsigned char*)_options, strlen(_options), serverIpPortPasswordHash);
|
|
|
|
options.set(_options, strlen(_options));
|
|
options.set(_options, strlen(_options));
|
|
parseOptions(ctx, _options);
|
|
parseOptions(ctx, _options);
|
|
connect(ctx, _database, password);
|
|
connect(ctx, _database, password);
|
|
@@ -188,7 +191,7 @@ Connection::Connection(ICodeContext * ctx, const char * _options, const char * _
|
|
}
|
|
}
|
|
void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
|
|
void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
|
|
{
|
|
{
|
|
- struct timeval to = { timeout/1000, timeout%1000 };
|
|
|
|
|
|
+ struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
context = redisConnectWithTimeout(ip.str(), port, to);
|
|
context = redisConnectWithTimeout(ip.str(), port, to);
|
|
redisSetTimeout(context, to);
|
|
redisSetTimeout(context, to);
|
|
assertConnection();
|
|
assertConnection();
|
|
@@ -196,11 +199,11 @@ void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const c
|
|
//The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
|
|
//The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
|
|
//such that they may be pipelined to save an extra round trip to the server and back.
|
|
//such that they may be pipelined to save an extra round trip to the server and back.
|
|
if (password && *password)
|
|
if (password && *password)
|
|
- redisAppendCommand(context, "AUTH %b", password, strlen(password));
|
|
|
|
|
|
+ redisAppendCommand(context, "AUTH %b", password, strlen(password));
|
|
|
|
|
|
if (database != _database)
|
|
if (database != _database)
|
|
{
|
|
{
|
|
- VStringBuffer cmd("SELECT %" I64F "u", database);
|
|
|
|
|
|
+ VStringBuffer cmd("SELECT %" I64F "u", _database);
|
|
redisAppendCommand(context, cmd.str());
|
|
redisAppendCommand(context, cmd.str());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -215,15 +218,13 @@ void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const c
|
|
database = _database;
|
|
database = _database;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-bool Connection::isSameConnection(ICodeContext * ctx, const char * password) const
|
|
|
|
|
|
+bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
|
|
{
|
|
{
|
|
- unsigned hash = hashc((const unsigned char*)options.str(), options.length(), hashc((const unsigned char*)password, strlen(password), 0));
|
|
|
|
- return (serverIpPortPasswordHash == hash);
|
|
|
|
|
|
+ return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
|
|
}
|
|
}
|
|
-void * Connection::allocateAndCopy(const char * src, size_t size)
|
|
|
|
|
|
+unsigned Connection::hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password) const
|
|
{
|
|
{
|
|
- void * value = rtlMalloc(size);
|
|
|
|
- return memcpy(value, src, size);
|
|
|
|
|
|
+ return hashc((const unsigned char*)_options, strlen(_options), hashc((const unsigned char*)password, strlen(password), 0));
|
|
}
|
|
}
|
|
void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
{
|
|
{
|
|
@@ -259,15 +260,6 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
ctx->logString(msg.str());
|
|
ctx->logString(msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return;
|
|
|
|
-}
|
|
|
|
-void Connection::authenticate(ICodeContext * ctx, const char * password)
|
|
|
|
-{
|
|
|
|
- if (password && *password)
|
|
|
|
- {
|
|
|
|
- OwnedReply reply = Reply::createReply(redisCommand(context, "AUTH %b", password, strlen(password)));
|
|
|
|
- assertOnError(reply->query(), "server authentication failed");
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
void Connection::resetContextErr()
|
|
void Connection::resetContextErr()
|
|
{
|
|
{
|
|
@@ -302,7 +294,7 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
return LINK(cachedConnection);
|
|
return LINK(cachedConnection);
|
|
}
|
|
}
|
|
|
|
|
|
- if (cachedConnection->isSameConnection(ctx, password))
|
|
|
|
|
|
+ if (cachedConnection->isSameConnection(ctx, options, password))
|
|
{
|
|
{
|
|
//MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
|
|
//MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
|
|
//At present updateTimeout calls assertConnection.
|
|
//At present updateTimeout calls assertConnection.
|
|
@@ -313,6 +305,7 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
}
|
|
}
|
|
|
|
|
|
cachedConnection->Release();
|
|
cachedConnection->Release();
|
|
|
|
+ cachedConnection = NULL;
|
|
cachedConnection = new Connection(ctx, options, _database, password, _timeout);
|
|
cachedConnection = new Connection(ctx, options, _database, password, _timeout);
|
|
return LINK(cachedConnection);
|
|
return LINK(cachedConnection);
|
|
}
|
|
}
|
|
@@ -331,7 +324,7 @@ void Connection::updateTimeout(unsigned __int64 _timeout)
|
|
return;
|
|
return;
|
|
assertConnection();
|
|
assertConnection();
|
|
timeout = _timeout;
|
|
timeout = _timeout;
|
|
- struct timeval to = { timeout/1000, timeout%1000 };
|
|
|
|
|
|
+ struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
assertex(context);
|
|
assertex(context);
|
|
if (redisSetTimeout(context, to) != REDIS_OK)
|
|
if (redisSetTimeout(context, to) != REDIS_OK)
|
|
{
|
|
{
|
|
@@ -346,7 +339,7 @@ void Connection::updateTimeout(unsigned __int64 _timeout)
|
|
}
|
|
}
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
{
|
|
{
|
|
- if (!reply)//assertex(reply)?
|
|
|
|
|
|
+ if (!reply)//MORE: should this be assertex(reply) instead?
|
|
{
|
|
{
|
|
//There should always be a context error if no reply error
|
|
//There should always be a context error if no reply error
|
|
assertConnection();
|
|
assertConnection();
|
|
@@ -355,21 +348,14 @@ void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
}
|
|
}
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ assertAuthorization(reply);
|
|
|
|
+ VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
|
|
+ rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
|
|
void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
|
|
{
|
|
{
|
|
- if (!reply)//assertex(reply)?
|
|
|
|
|
|
+ if (!reply)//MORE: should this be assertex(reply) instead?
|
|
{
|
|
{
|
|
//There should always be a context error if no reply error
|
|
//There should always be a context error if no reply error
|
|
assertConnection();
|
|
assertConnection();
|
|
@@ -378,16 +364,9 @@ void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const cha
|
|
}
|
|
}
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ assertAuthorization(reply);
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
|
|
|
|
+ rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
|
|
void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
|
|
@@ -401,16 +380,9 @@ void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, cons
|
|
}
|
|
}
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ assertAuthorization(reply);
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
|
|
|
|
+ rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
|
|
void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
|
|
@@ -424,16 +396,17 @@ void Connection::assertOnCommandError(const redisReply * reply, const char * cmd
|
|
}
|
|
}
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
- if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ assertAuthorization(reply);
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
|
|
|
|
+ rtlFail(0, msg.str());
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+void Connection::assertAuthorization(const redisReply * reply)
|
|
|
|
+{
|
|
|
|
+ if (strncmp(reply->str, "NOAUTH", 6) == 0)
|
|
|
|
+ {
|
|
|
|
+ VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
+ rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertKey(const redisReply * reply, const char * key)
|
|
void Connection::assertKey(const redisReply * reply, const char * key)
|