|
@@ -81,17 +81,36 @@ private :
|
|
};
|
|
};
|
|
typedef Owned<RedisPlugin::Reply> OwnedReply;
|
|
typedef Owned<RedisPlugin::Reply> OwnedReply;
|
|
|
|
|
|
|
|
+class TimeoutHandler
|
|
|
|
+{
|
|
|
|
+public :
|
|
|
|
+ TimeoutHandler(unsigned _timeout) : timeout(_timeout), t0(msTick()) { }
|
|
|
|
+ inline void reset(unsigned _timeout) { timeout = _timeout; t0 = msTick(); }
|
|
|
|
+ unsigned timeLeft() const
|
|
|
|
+ {
|
|
|
|
+ unsigned dt = msTick() - t0;
|
|
|
|
+ if (dt < timeout)
|
|
|
|
+ return timeout - dt;
|
|
|
|
+ return 0;
|
|
|
|
+ }
|
|
|
|
+ inline unsigned getTimeout() { return timeout; }
|
|
|
|
+
|
|
|
|
+private :
|
|
|
|
+ unsigned timeout;
|
|
|
|
+ unsigned t0;
|
|
|
|
+};
|
|
|
|
+
|
|
class Connection : public CInterface
|
|
class Connection : public CInterface
|
|
{
|
|
{
|
|
public :
|
|
public :
|
|
- Connection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
|
|
|
|
- Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout);
|
|
|
|
|
|
+ Connection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
|
|
|
|
+ Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout);
|
|
~Connection()
|
|
~Connection()
|
|
{
|
|
{
|
|
if (context)
|
|
if (context)
|
|
redisFree(context);
|
|
redisFree(context);
|
|
}
|
|
}
|
|
- static Connection * createConnection(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 _timeout);
|
|
|
|
|
|
+ static Connection * createConnection(ICodeContext * ctx, const char * options, int database, const char * password, unsigned _timeout);
|
|
|
|
|
|
//set
|
|
//set
|
|
template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
|
|
template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
|
|
@@ -114,21 +133,24 @@ public :
|
|
bool exists(ICodeContext * ctx, const char * key);
|
|
bool exists(ICodeContext * ctx, const char * key);
|
|
|
|
|
|
protected :
|
|
protected :
|
|
|
|
+ void redisSetTimeout();
|
|
|
|
+ void redisConnect();
|
|
|
|
+ unsigned timeLeft();
|
|
void parseOptions(ICodeContext * ctx, const char * _options);
|
|
void parseOptions(ICodeContext * ctx, const char * _options);
|
|
- void connect(ICodeContext * ctx, unsigned __int64 _database, const char * password);
|
|
|
|
- void selectDB(ICodeContext * ctx, unsigned __int64 _database);
|
|
|
|
|
|
+ void connect(ICodeContext * ctx, int _database, const char * password);
|
|
|
|
+ void selectDB(ICodeContext * ctx, int _database);
|
|
void resetContextErr();
|
|
void resetContextErr();
|
|
void readReply(Reply * reply);
|
|
void readReply(Reply * reply);
|
|
void readReplyAndAssert(Reply * reply, const char * msg);
|
|
void readReplyAndAssert(Reply * reply, const char * msg);
|
|
- void readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key);
|
|
|
|
|
|
+ void readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key = NULL);
|
|
void assertKey(const redisReply * reply, const char * key);
|
|
void assertKey(const redisReply * reply, const char * key);
|
|
void assertAuthorization(const redisReply * reply);
|
|
void assertAuthorization(const redisReply * reply);
|
|
void assertOnError(const redisReply * reply, const char * _msg);
|
|
void assertOnError(const redisReply * reply, const char * _msg);
|
|
- void assertOnCommandError(const redisReply * reply, const char * cmd);
|
|
|
|
- void assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd);
|
|
|
|
- void assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key);
|
|
|
|
- void assertConnection();
|
|
|
|
- void updateTimeout(unsigned __int64 _timeout);
|
|
|
|
|
|
+ void assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key = NULL);
|
|
|
|
+ void assertConnection(const char * _msg);
|
|
|
|
+ void assertConnectionWithCmdMsg(const char * cmd, const char * key = NULL);
|
|
|
|
+ void fail(const char * cmd, const char * errmsg, const char * key = NULL);
|
|
|
|
+ void * redisCommand(redisContext * context, const char * format, ...);
|
|
static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
|
|
static unsigned hashServerIpPortPassword(ICodeContext * ctx, const char * _options, const char * password);
|
|
bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
|
|
bool isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const;
|
|
|
|
|
|
@@ -146,8 +168,8 @@ protected :
|
|
StringAttr ip;
|
|
StringAttr ip;
|
|
unsigned serverIpPortPasswordHash;
|
|
unsigned serverIpPortPasswordHash;
|
|
int port;
|
|
int port;
|
|
- unsigned __int64 timeout;
|
|
|
|
- unsigned __int64 database;
|
|
|
|
|
|
+ TimeoutHandler timeout;
|
|
|
|
+ int database; //NOTE: redis stores the maximum number of dbs as an 'int'.
|
|
};
|
|
};
|
|
|
|
|
|
//The following class is here to ensure destruction of the cachedConnection within the main thread
|
|
//The following class is here to ensure destruction of the cachedConnection within the main thread
|
|
@@ -180,26 +202,35 @@ static void releaseContext()
|
|
}
|
|
}
|
|
threadHooked = false;
|
|
threadHooked = false;
|
|
}
|
|
}
|
|
-Connection::Connection(ICodeContext * ctx, const char * _options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+Connection::Connection(ICodeContext * ctx, const char * _options, int _database, const char * password, unsigned _timeout)
|
|
: database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
|
|
: database(0), timeout(_timeout), port(0), serverIpPortPasswordHash(hashServerIpPortPassword(ctx, _options, password))
|
|
{
|
|
{
|
|
options.set(_options, strlen(_options));
|
|
options.set(_options, strlen(_options));
|
|
parseOptions(ctx, _options);
|
|
parseOptions(ctx, _options);
|
|
connect(ctx, _database, password);
|
|
connect(ctx, _database, password);
|
|
}
|
|
}
|
|
-Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, unsigned _serverIpPortPasswordHash, int _database, const char * password, unsigned _timeout)
|
|
: database(0), timeout(_timeout), serverIpPortPasswordHash(_serverIpPortPasswordHash), port(_port)
|
|
: database(0), timeout(_timeout), serverIpPortPasswordHash(_serverIpPortPasswordHash), port(_port)
|
|
{
|
|
{
|
|
options.set(_options, strlen(_options));
|
|
options.set(_options, strlen(_options));
|
|
ip.set(_ip, strlen(_ip));
|
|
ip.set(_ip, strlen(_ip));
|
|
connect(ctx, _database, password);
|
|
connect(ctx, _database, password);
|
|
}
|
|
}
|
|
-void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const char * password)
|
|
|
|
|
|
+void Connection::redisConnect()
|
|
{
|
|
{
|
|
- struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
|
|
- context = redisConnectWithTimeout(ip.str(), port, to);
|
|
|
|
- assertConnection();
|
|
|
|
- redisSetTimeout(context, to);
|
|
|
|
|
|
+ if (timeout.getTimeout() == 0)
|
|
|
|
+ context = ::redisConnect(ip.str(), port);
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ unsigned _timeLeft = timeLeft();
|
|
|
|
+ struct timeval to = { _timeLeft/1000, (_timeLeft%1000)*1000 };
|
|
|
|
+ context = ::redisConnectWithTimeout(ip.str(), port, to);
|
|
|
|
+ }
|
|
|
|
+ assertConnection("connection");
|
|
|
|
+}
|
|
|
|
+void Connection::connect(ICodeContext * ctx, int _database, const char * password)
|
|
|
|
+{
|
|
|
|
+ redisConnect();
|
|
|
|
|
|
//The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
|
|
//The following is the dissemination of the two methods authenticate(ctx, password) & selectDB(ctx, _database)
|
|
//such that they may be pipelined to save an extra round trip to the server and back.
|
|
//such that they may be pipelined to save an extra round trip to the server and back.
|
|
@@ -208,21 +239,54 @@ void Connection::connect(ICodeContext * ctx, unsigned __int64 _database, const c
|
|
|
|
|
|
if (database != _database)
|
|
if (database != _database)
|
|
{
|
|
{
|
|
- VStringBuffer cmd("SELECT %" I64F "u", _database);
|
|
|
|
|
|
+ VStringBuffer cmd("SELECT %d", _database);
|
|
redisAppendCommand(context, cmd.str());
|
|
redisAppendCommand(context, cmd.str());
|
|
}
|
|
}
|
|
|
|
|
|
//Now read replies.
|
|
//Now read replies.
|
|
OwnedReply reply = new Reply();
|
|
OwnedReply reply = new Reply();
|
|
if (password && *password)
|
|
if (password && *password)
|
|
- readReplyAndAssert(reply, "server authentication failed");
|
|
|
|
|
|
+ readReplyAndAssert(reply, "server authentication");
|
|
|
|
|
|
if (database != _database)
|
|
if (database != _database)
|
|
{
|
|
{
|
|
- readReplyAndAssert(reply, "request to SELECT database failed");
|
|
|
|
|
|
+ VStringBuffer cmd("SELECT %d", _database);
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply, cmd.str());
|
|
database = _database;
|
|
database = _database;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+void * Connection::redisCommand(redisContext * context, const char * format, ...)
|
|
|
|
+{
|
|
|
|
+ //Copied from https://github.com/redis/hiredis/blob/master/hiredis.c ~line:1008 void * redisCommand(redisContext * context, const char * format, ...)
|
|
|
|
+ //with redisSetTimeout(); added.
|
|
|
|
+ va_list parameters;
|
|
|
|
+ void * reply = NULL;
|
|
|
|
+ va_start(parameters, format);
|
|
|
|
+ redisSetTimeout();
|
|
|
|
+ reply = ::redisvCommand(context, format, parameters);
|
|
|
|
+ va_end(parameters);
|
|
|
|
+ return reply;
|
|
|
|
+}
|
|
|
|
+unsigned Connection::timeLeft()
|
|
|
|
+{
|
|
|
|
+ unsigned _timeLeft = timeout.timeLeft();
|
|
|
|
+ if (_timeLeft == 0 && timeout.getTimeout() != 0)
|
|
|
|
+ ::rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
|
|
|
|
+ return _timeLeft;
|
|
|
|
+}
|
|
|
|
+void Connection::redisSetTimeout()
|
|
|
|
+{
|
|
|
|
+ unsigned _timeLeft = timeLeft();
|
|
|
|
+ if (_timeLeft == 0)
|
|
|
|
+ return;
|
|
|
|
+ struct timeval to = { _timeLeft/1000, (_timeLeft%1000)*1000 };
|
|
|
|
+ assertex(context);
|
|
|
|
+ if (::redisSetTimeout(context, to) != REDIS_OK)
|
|
|
|
+ {
|
|
|
|
+ assertConnection("request to set timeout");
|
|
|
|
+ throwUnexpected();//In case there is a bug in hiredis such that the above err is not reflected in the 'context' (checked in assertConnection) as expected.
|
|
|
|
+ }
|
|
|
|
+}
|
|
bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
|
|
bool Connection::isSameConnection(ICodeContext * ctx, const char * _options, const char * password) const
|
|
{
|
|
{
|
|
return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
|
|
return (hashServerIpPortPassword(ctx, _options, password) == serverIpPortPasswordHash);
|
|
@@ -251,8 +315,8 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- VStringBuffer err("Redis Plugin: unsupported option string %s", opt);
|
|
|
|
- rtlFail(0, err.str());
|
|
|
|
|
|
+ VStringBuffer err("Redis Plugin: ERROR - unsupported option string '%s'", opt);
|
|
|
|
+ ::rtlFail(0, err.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (ip.isEmpty())
|
|
if (ip.isEmpty())
|
|
@@ -261,7 +325,7 @@ void Connection::parseOptions(ICodeContext * ctx, const char * _options)
|
|
port = 6379;
|
|
port = 6379;
|
|
if (ctx)
|
|
if (ctx)
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: WARNING - using default server (%s:%d)", ip.str(), port);
|
|
|
|
|
|
+ VStringBuffer msg("Redis Plugin: WARNING - using default cache (%s:%d)", ip.str(), port);
|
|
ctx->logString(msg.str());
|
|
ctx->logString(msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -274,23 +338,21 @@ void Connection::resetContextErr()
|
|
void Connection::readReply(Reply * reply)
|
|
void Connection::readReply(Reply * reply)
|
|
{
|
|
{
|
|
redisReply * nakedReply = NULL;
|
|
redisReply * nakedReply = NULL;
|
|
|
|
+ redisSetTimeout();
|
|
redisGetReply(context, (void**)&nakedReply);
|
|
redisGetReply(context, (void**)&nakedReply);
|
|
- assertex(reply);
|
|
|
|
reply->setClear(nakedReply);
|
|
reply->setClear(nakedReply);
|
|
}
|
|
}
|
|
void Connection::readReplyAndAssert(Reply * reply, const char * msg)
|
|
void Connection::readReplyAndAssert(Reply * reply, const char * msg)
|
|
{
|
|
{
|
|
readReply(reply);
|
|
readReply(reply);
|
|
- assertex(reply);
|
|
|
|
assertOnError(reply->query(), msg);
|
|
assertOnError(reply->query(), msg);
|
|
}
|
|
}
|
|
-void Connection::readReplyAndAssertWithKey(Reply * reply, const char * msg, const char * key)
|
|
|
|
|
|
+void Connection::readReplyAndAssertWithCmdMsg(Reply * reply, const char * msg, const char * key)
|
|
{
|
|
{
|
|
readReply(reply);
|
|
readReply(reply);
|
|
- assertex(reply);
|
|
|
|
- assertOnCommandErrorWithKey(reply->query(), msg, key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), msg, key);
|
|
}
|
|
}
|
|
-Connection * Connection::createConnection(ICodeContext * ctx, const char * options, unsigned __int64 _database, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+Connection * Connection::createConnection(ICodeContext * ctx, const char * options, int _database, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
if (!cachedConnection)
|
|
if (!cachedConnection)
|
|
{
|
|
{
|
|
@@ -306,9 +368,8 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
if (cachedConnection->isSameConnection(ctx, options, password))
|
|
if (cachedConnection->isSameConnection(ctx, options, password))
|
|
{
|
|
{
|
|
//MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
|
|
//MORE: should perhaps check that the connection has not expired (think hiredis REDIS_KEEPALIVE_INTERVAL is defaulted to 15s).
|
|
- //At present updateTimeout calls assertConnection.
|
|
|
|
cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
|
|
cachedConnection->resetContextErr();//reset the context err to allow reuse when an error previously occurred.
|
|
- cachedConnection->updateTimeout(_timeout);
|
|
|
|
|
|
+ cachedConnection->timeout.reset(_timeout);
|
|
cachedConnection->selectDB(ctx, _database);
|
|
cachedConnection->selectDB(ctx, _database);
|
|
return LINK(cachedConnection);
|
|
return LINK(cachedConnection);
|
|
}
|
|
}
|
|
@@ -318,122 +379,80 @@ Connection * Connection::createConnection(ICodeContext * ctx, const char * optio
|
|
cachedConnection = new Connection(ctx, options, _database, password, _timeout);
|
|
cachedConnection = new Connection(ctx, options, _database, password, _timeout);
|
|
return LINK(cachedConnection);
|
|
return LINK(cachedConnection);
|
|
}
|
|
}
|
|
-void Connection::selectDB(ICodeContext * ctx, unsigned __int64 _database)
|
|
|
|
|
|
+void Connection::selectDB(ICodeContext * ctx, int _database)
|
|
{
|
|
{
|
|
if (database == _database)
|
|
if (database == _database)
|
|
return;
|
|
return;
|
|
database = _database;
|
|
database = _database;
|
|
- VStringBuffer cmd("SELECT %" I64F "u", database);
|
|
|
|
|
|
+ VStringBuffer cmd("SELECT %d", database);
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str()));
|
|
- assertOnCommandError(reply->query(), "SELECT");
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), cmd.str());
|
|
}
|
|
}
|
|
-void Connection::updateTimeout(unsigned __int64 _timeout)
|
|
|
|
|
|
+void Connection::fail(const char * cmd, const char * errmsg, const char * key)
|
|
{
|
|
{
|
|
- if (timeout == _timeout)
|
|
|
|
- return;
|
|
|
|
- assertConnection();
|
|
|
|
- timeout = _timeout;
|
|
|
|
- struct timeval to = { timeout/1000, (timeout%1000)*1000 };
|
|
|
|
- assertex(context);
|
|
|
|
- if (redisSetTimeout(context, to) != REDIS_OK)
|
|
|
|
|
|
+ if (key)
|
|
{
|
|
{
|
|
- if (context->err)
|
|
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: failed to set timeout - %s", context->errstr);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- rtlFail(0, "Redis Plugin: failed to set timeout - no message available");
|
|
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %d for %s:%d failed : %s", cmd, key, database, ip.str(), port, errmsg);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s on database %d for %s:%d failed : %s", cmd, database, ip.str(), port, errmsg);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
void Connection::assertOnError(const redisReply * reply, const char * _msg)
|
|
{
|
|
{
|
|
- if (!reply)//MORE: should this be assertex(reply) instead?
|
|
|
|
- {
|
|
|
|
- //There should always be a context error if no reply error
|
|
|
|
- assertConnection();
|
|
|
|
- VStringBuffer msg("Redis Plugin: %s - %s", _msg, "neither 'reply' nor connection error available");
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ if (!reply)
|
|
|
|
+ assertConnection(_msg);
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
assertAuthorization(reply);
|
|
assertAuthorization(reply);
|
|
VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
VStringBuffer msg("Redis Plugin: %s - %s", _msg, reply->str);
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-void Connection::assertOnCommandErrorWithKey(const redisReply * reply, const char * cmd, const char * key)
|
|
|
|
-{
|
|
|
|
- if (!reply)//MORE: should this be assertex(reply) instead?
|
|
|
|
- {
|
|
|
|
- //There should always be a context error if no reply error
|
|
|
|
- assertConnection();
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, key, database);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
|
- {
|
|
|
|
- assertAuthorization(reply);
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s '%s' on database %" I64F "u failed : %s", cmd, key, database, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-void Connection::assertOnCommandErrorWithDatabase(const redisReply * reply, const char * cmd)
|
|
|
|
-{
|
|
|
|
- if (!reply)//assertex(reply)?
|
|
|
|
- {
|
|
|
|
- //There should always be a context error if no reply error
|
|
|
|
- assertConnection();
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed with neither 'reply' nor connection error available", cmd, database);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
- else if (reply->type == REDIS_REPLY_ERROR)
|
|
|
|
- {
|
|
|
|
- assertAuthorization(reply);
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s on database %" I64F "u failed : %s", cmd, database, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-void Connection::assertOnCommandError(const redisReply * reply, const char * cmd)
|
|
|
|
|
|
+void Connection::assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key)
|
|
{
|
|
{
|
|
- if (!reply)//assertex(reply)?
|
|
|
|
- {
|
|
|
|
- //There should always be a context error if no reply error
|
|
|
|
- assertConnection();
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s failed with neither 'reply' nor connection error available", cmd);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ if (!reply)
|
|
|
|
+ assertConnectionWithCmdMsg(cmd, key);
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
else if (reply->type == REDIS_REPLY_ERROR)
|
|
{
|
|
{
|
|
assertAuthorization(reply);
|
|
assertAuthorization(reply);
|
|
- VStringBuffer msg("Redis Plugin: ERROR - %s failed : %s", cmd, reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ fail(cmd, reply->str, key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertAuthorization(const redisReply * reply)
|
|
void Connection::assertAuthorization(const redisReply * reply)
|
|
{
|
|
{
|
|
if (reply && reply->str && ( strncmp(reply->str, "NOAUTH", 6) == 0 || strncmp(reply->str, "ERR operation not permitted", 27) == 0 ))
|
|
if (reply && reply->str && ( strncmp(reply->str, "NOAUTH", 6) == 0 || strncmp(reply->str, "ERR operation not permitted", 27) == 0 ))
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: server authentication failed - %s", reply->str);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - authentication for %s:%d failed : %s", ip.str(), port, reply->str);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::assertKey(const redisReply * reply, const char * key)
|
|
void Connection::assertKey(const redisReply * reply, const char * key)
|
|
{
|
|
{
|
|
if (reply && reply->type == REDIS_REPLY_NIL)
|
|
if (reply && reply->type == REDIS_REPLY_NIL)
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %" I64F "u", key, database);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %d on %s:%d", key, database, ip.str(), port);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-void Connection::assertConnection()
|
|
|
|
|
|
+void Connection::assertConnectionWithCmdMsg(const char * cmd, const char * key)
|
|
|
|
+{
|
|
|
|
+ if (!context)
|
|
|
|
+ fail(cmd, "neither 'reply' nor connection error available", key);
|
|
|
|
+ else if (context->err)
|
|
|
|
+ fail(cmd, context->errstr, key);
|
|
|
|
+}
|
|
|
|
+void Connection::assertConnection(const char * _msg)
|
|
{
|
|
{
|
|
if (!context)
|
|
if (!context)
|
|
- rtlFail(0, "Redis Plugin: 'redisConnect' failed - no error available.");
|
|
|
|
|
|
+ {
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : neither 'reply' nor connection error available", _msg, ip.str(), port);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
|
|
+ }
|
|
else if (context->err)
|
|
else if (context->err)
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: Connection failed - %s for %s:%u", context->errstr, ip.str(), port);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ VStringBuffer msg("Redis Plugin: ERROR - %s for %s:%d failed : %s", _msg, ip.str(), port, context->errstr);
|
|
|
|
+ ::rtlFail(0, msg.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Connection::clear(ICodeContext * ctx)
|
|
void Connection::clear(ICodeContext * ctx)
|
|
@@ -441,44 +460,44 @@ void Connection::clear(ICodeContext * ctx)
|
|
//NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
|
|
//NOTE: flush is the actual cache flush/clear/delete and not an io buffer flush.
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "FLUSHDB"));//NOTE: FLUSHDB deletes current database where as FLUSHALL deletes all dbs.
|
|
//NOTE: documented as never failing, but in case
|
|
//NOTE: documented as never failing, but in case
|
|
- assertOnCommandErrorWithDatabase(reply->query(), "FlushDB");
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "FlushDB");
|
|
}
|
|
}
|
|
void Connection::del(ICodeContext * ctx, const char * key)
|
|
void Connection::del(ICodeContext * ctx, const char * key)
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "DEL %b", key, strlen(key)));
|
|
- assertOnCommandErrorWithKey(reply->query(), "Del", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "Del", key);
|
|
}
|
|
}
|
|
void Connection::persist(ICodeContext * ctx, const char * key)
|
|
void Connection::persist(ICodeContext * ctx, const char * key)
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "PERSIST %b", key, strlen(key)));
|
|
- assertOnCommandErrorWithKey(reply->query(), "Persist", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "Persist", key);
|
|
}
|
|
}
|
|
void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
|
|
void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
|
|
- assertOnCommandErrorWithKey(reply->query(), "Expire", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "Expire", key);
|
|
}
|
|
}
|
|
bool Connection::exists(ICodeContext * ctx, const char * key)
|
|
bool Connection::exists(ICodeContext * ctx, const char * key)
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "EXISTS %b", key, strlen(key)));
|
|
- assertOnCommandErrorWithKey(reply->query(), "Exists", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "Exists", key);
|
|
return (reply->query()->integer != 0);
|
|
return (reply->query()->integer != 0);
|
|
}
|
|
}
|
|
unsigned __int64 Connection::dbSize(ICodeContext * ctx)
|
|
unsigned __int64 Connection::dbSize(ICodeContext * ctx)
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "DBSIZE"));
|
|
- assertOnCommandErrorWithDatabase(reply->query(), "DBSIZE");
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "DBSIZE");
|
|
return reply->query()->integer;
|
|
return reply->query()->integer;
|
|
}
|
|
}
|
|
//-------------------------------------------SET-----------------------------------------
|
|
//-------------------------------------------SET-----------------------------------------
|
|
//--OUTER--
|
|
//--OUTER--
|
|
-template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
master->set(ctx, key, value, expire);
|
|
master->set(ctx, key, value, expire);
|
|
}
|
|
}
|
|
//Set pointer types
|
|
//Set pointer types
|
|
-template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
master->set(ctx, key, valueSize, value, expire);
|
|
master->set(ctx, key, valueSize, value, expire);
|
|
@@ -492,7 +511,7 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
|
|
appendExpire(cmd, expire);
|
|
appendExpire(cmd, expire);
|
|
|
|
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, sizeof(type)));
|
|
- assertOnCommandErrorWithKey(reply->query(), "SET", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "SET", key);
|
|
}
|
|
}
|
|
template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
|
|
template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
|
|
{
|
|
{
|
|
@@ -501,16 +520,16 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
|
|
StringBuffer cmd("SET %b %b");
|
|
StringBuffer cmd("SET %b %b");
|
|
appendExpire(cmd, expire);
|
|
appendExpire(cmd, expire);
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), _value, (size_t)valueSize));
|
|
- assertOnCommandErrorWithKey(reply->query(), "SET", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "SET", key);
|
|
}
|
|
}
|
|
//-------------------------------------------GET-----------------------------------------
|
|
//-------------------------------------------GET-----------------------------------------
|
|
//--OUTER--
|
|
//--OUTER--
|
|
-template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
master->get(ctx, key, returnValue);
|
|
master->get(ctx, key, returnValue);
|
|
}
|
|
}
|
|
-template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
master->get(ctx, key, returnSize, returnValue);
|
|
master->get(ctx, key, returnSize, returnValue);
|
|
@@ -520,14 +539,14 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
|
|
|
|
|
|
- assertOnError(reply->query(), "GET");
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "GET", key);
|
|
assertKey(reply->query(), key);
|
|
assertKey(reply->query(), key);
|
|
|
|
|
|
size_t returnSize = reply->query()->len;
|
|
size_t returnSize = reply->query()->len;
|
|
if (sizeof(type)!=returnSize)
|
|
if (sizeof(type)!=returnSize)
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: ERROR - Requested type of different size (%uB) from that stored (%uB).", (unsigned)sizeof(type), (unsigned)returnSize);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ VStringBuffer msg("requested type of different size (%uB) from that stored (%uB)", (unsigned)sizeof(type), (unsigned)returnSize);
|
|
|
|
+ fail("GET", msg.str(), key);
|
|
}
|
|
}
|
|
memcpy(&returnValue, reply->query()->str, returnSize);
|
|
memcpy(&returnValue, reply->query()->str, returnSize);
|
|
}
|
|
}
|
|
@@ -535,7 +554,7 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
|
|
{
|
|
{
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, "GET %b", key, strlen(key)));
|
|
|
|
|
|
- assertOnError(reply->query(), "GET");
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "GET", key);
|
|
assertKey(reply->query(), key);
|
|
assertKey(reply->query(), key);
|
|
|
|
|
|
returnSize = reply->query()->len;
|
|
returnSize = reply->query()->len;
|
|
@@ -544,113 +563,113 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
|
|
//--------------------------------------------------------------------------------
|
|
//--------------------------------------------------------------------------------
|
|
// ECL SERVICE ENTRYPOINTS
|
|
// ECL SERVICE ENTRYPOINTS
|
|
//--------------------------------------------------------------------------------
|
|
//--------------------------------------------------------------------------------
|
|
-ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
master->clear(ctx);
|
|
master->clear(ctx);
|
|
}
|
|
}
|
|
-ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
return master->exists(ctx, key);
|
|
return master->exists(ctx, key);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
master->del(ctx, key);
|
|
master->del(ctx, key);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
master->persist(ctx, key);
|
|
master->persist(ctx, key);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, unsigned _expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
master->expire(ctx, key, _expire);
|
|
master->expire(ctx, key, _expire);
|
|
}
|
|
}
|
|
-ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
return master->dbSize(ctx);
|
|
return master->dbSize(ctx);
|
|
}
|
|
}
|
|
//-----------------------------------SET------------------------------------------
|
|
//-----------------------------------SET------------------------------------------
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, (valueLength)*sizeof(UChar), value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetData(ICodeContext * ctx, const char * key, size32_t valueSize, const void * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUtf8(ICodeContext * ctx, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout);
|
|
SyncRSet(ctx, options, key, rtlUtf8Size(valueLength, value), value, database, expire, password, timeout);
|
|
}
|
|
}
|
|
//-------------------------------------GET----------------------------------------
|
|
//-------------------------------------GET----------------------------------------
|
|
-ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API bool ECL_REDIS_CALL SyncRGetBool(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
bool value;
|
|
bool value;
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
return value;
|
|
return value;
|
|
}
|
|
}
|
|
-ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API double ECL_REDIS_CALL SyncRGetDouble(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
double value;
|
|
double value;
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
return value;
|
|
return value;
|
|
}
|
|
}
|
|
-ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRGetInt8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
signed __int64 value;
|
|
signed __int64 value;
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
return value;
|
|
return value;
|
|
}
|
|
}
|
|
-ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRGetUint8(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
unsigned __int64 value;
|
|
unsigned __int64 value;
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
SyncRGet(ctx, options, key, value, database, password, timeout);
|
|
return value;
|
|
return value;
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
size_t _returnSize;
|
|
size_t _returnSize;
|
|
SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
|
|
SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
|
|
returnSize = static_cast<size32_t>(_returnSize);
|
|
returnSize = static_cast<size32_t>(_returnSize);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
size_t returnSize;
|
|
size_t returnSize;
|
|
SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
|
|
SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
|
|
returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
|
|
returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
size_t returnSize;
|
|
size_t returnSize;
|
|
SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
|
|
SyncRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
|
|
returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
|
|
returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & returnSize, void * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
size_t _returnSize;
|
|
size_t _returnSize;
|
|
SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
|
|
SyncRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
|
|
@@ -659,7 +678,7 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & re
|
|
//----------------------------------LOCK------------------------------------------
|
|
//----------------------------------LOCK------------------------------------------
|
|
//-----------------------------------SET-----------------------------------------
|
|
//-----------------------------------SET-----------------------------------------
|
|
//Set pointer types
|
|
//Set pointer types
|
|
-void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, _options, database, password, _timeout);
|
|
master->lockSet(ctx, key, valueSize, value, expire);
|
|
master->lockSet(ctx, key, valueSize, value, expire);
|
|
@@ -672,7 +691,7 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
|
|
}
|
|
}
|
|
//-------------------------------------------GET-----------------------------------------
|
|
//-------------------------------------------GET-----------------------------------------
|
|
//--OUTER--
|
|
//--OUTER--
|
|
-void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
|
|
|
|
|
|
+void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
|
|
master->lockGet(ctx, key, returnSize, returnValue, password, expire);
|
|
master->lockGet(ctx, key, returnSize, returnValue, password, expire);
|
|
@@ -693,12 +712,12 @@ void Connection::encodeChannel(StringBuffer & channel, const char * key) const
|
|
bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
|
|
bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
|
|
{
|
|
{
|
|
if (expire == 0)
|
|
if (expire == 0)
|
|
- rtlFail(0, "Redis Plugin: ERROR - invalid value for 'expire', persistent locks not allowed.");
|
|
|
|
|
|
+ fail("GetOrLock<type>", "invalid value for 'expire', persistent locks not allowed.", key);
|
|
StringBuffer cmd("SET %b %b NX PX ");
|
|
StringBuffer cmd("SET %b %b NX PX ");
|
|
cmd.append(expire);
|
|
cmd.append(expire);
|
|
|
|
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
|
|
OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
|
|
- assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), cmd.str(), key);
|
|
|
|
|
|
return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
|
|
return (reply->query()->type == REDIS_REPLY_STATUS && strcmp(reply->query()->str, "OK") == 0);
|
|
}
|
|
}
|
|
@@ -710,8 +729,8 @@ void Connection::unlock(ICodeContext * ctx, const char * key)
|
|
|
|
|
|
//Read replies
|
|
//Read replies
|
|
OwnedReply reply = new Reply();
|
|
OwnedReply reply = new Reply();
|
|
- readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//WATCH reply
|
|
|
|
- readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//GET reply
|
|
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//WATCH reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//GET reply
|
|
|
|
|
|
//check if locked
|
|
//check if locked
|
|
if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
|
|
if (strncmp(reply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) == 0)
|
|
@@ -723,9 +742,9 @@ void Connection::unlock(ICodeContext * ctx, const char * key)
|
|
#if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
|
|
#if(0)//Quick draw! You have 10s to manually send (via redis-cli) "set testlock foobar". The second myRedis.Exists('testlock') in redislockingtest.ecl should now return TRUE.
|
|
sleep(10);
|
|
sleep(10);
|
|
#endif
|
|
#endif
|
|
- readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//MULTI reply
|
|
|
|
- readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//DEL reply
|
|
|
|
- readReplyAndAssertWithKey(reply.get(), "manual unlock", key);//EXEC reply
|
|
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//MULTI reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//DEL reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply.get(), "manual unlock", key);//EXEC reply
|
|
}
|
|
}
|
|
//If the above is aborted, let the lock expire.
|
|
//If the above is aborted, let the lock expire.
|
|
}
|
|
}
|
|
@@ -749,14 +768,11 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
|
|
|
//SUB before GET
|
|
//SUB before GET
|
|
//Requires separate connection from GET so that the replies are not mangled. This could be averted
|
|
//Requires separate connection from GET so that the replies are not mangled. This could be averted
|
|
- Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeout);
|
|
|
|
|
|
+ Owned<Connection> subConnection = new Connection(ctx, options.str(), ip.str(), port, serverIpPortPasswordHash, database, password, timeLeft());
|
|
OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
|
|
OwnedReply reply = Reply::createReply(redisCommand(subConnection->context, "SUBSCRIBE %b", channel.str(), (size_t)channel.length()));
|
|
- assertOnCommandErrorWithKey(reply->query(), "GET", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
|
|
if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("subscribe", reply->query()->element[0]->str) != 0 )
|
|
- {
|
|
|
|
- VStringBuffer msg("Redis Plugin: ERROR - GET '%s' on database %" I64F "u failed : failed to register SUB", key, database);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
- }
|
|
|
|
|
|
+ fail("GetOrLock<type>", "failed to register SUB", key);
|
|
|
|
|
|
#if(0)//Test publish before GET.
|
|
#if(0)//Test publish before GET.
|
|
{
|
|
{
|
|
@@ -767,7 +783,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
|
|
|
//Now GET
|
|
//Now GET
|
|
reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
|
|
reply->setClear((redisReply*)redisCommand(context, "GET %b", key, strlen(key)));
|
|
- assertOnCommandErrorWithKey(reply->query(), "GET", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
|
|
|
|
#if(0)//Test publish after GET.
|
|
#if(0)//Test publish after GET.
|
|
{
|
|
{
|
|
@@ -788,8 +804,8 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
//Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
//Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
if (reply->query()->str && strcmp(reply->query()->str, channel.str()) !=0 )
|
|
if (reply->query()->str && strcmp(reply->query()->str, channel.str()) !=0 )
|
|
{
|
|
{
|
|
- VStringBuffer msg("Redis Plugin: ERROR - the key '%s', on database %" I64F "u, is locked with a channel ('%s') different to that subscribed to (%s).", key, database, reply->query()->str, channel.str());
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", reply->query()->str, channel.str());
|
|
|
|
+ fail("GetOrLock<type>", msg.str(), key);
|
|
//MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
//MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
//However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
//However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
//More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
//More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
@@ -797,16 +813,13 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
}
|
|
}
|
|
#if(0)//Added to allow for manual pub testing via redis-cli
|
|
#if(0)//Added to allow for manual pub testing via redis-cli
|
|
struct timeval to = { 10, 0 };//10secs
|
|
struct timeval to = { 10, 0 };//10secs
|
|
- redisSetTimeout(subConnection->context, to);
|
|
|
|
|
|
+ ::redisSetTimeout(subConnection->context, to);
|
|
#endif
|
|
#endif
|
|
//Locked so SUBSCRIBE
|
|
//Locked so SUBSCRIBE
|
|
- redisReply * nakedReply = NULL;
|
|
|
|
- bool err = redisGetReply(subConnection->context, (void**)&nakedReply);
|
|
|
|
- reply->setClear(nakedReply);
|
|
|
|
- if (err != REDIS_OK)
|
|
|
|
- rtlFail(0, "Redis Plugin: ERROR - GET timed out.");
|
|
|
|
- assertOnCommandErrorWithKey(nakedReply, "GET", key);
|
|
|
|
- if (nakedReply->type == REDIS_REPLY_ARRAY && strcmp("message", nakedReply->element[0]->str) == 0)
|
|
|
|
|
|
+ subConnection->readReply(reply);
|
|
|
|
+ subConnection->assertOnErrorWithCmdMsg(reply->query(), "GetOrLock<type>", key);
|
|
|
|
+
|
|
|
|
+ if (reply->query()->type == REDIS_REPLY_ARRAY && strcmp("message", reply->query()->element[0]->str) == 0)
|
|
{
|
|
{
|
|
//We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
//We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
//Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
//Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
@@ -814,13 +827,15 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
//More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
//More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
//which indicates to the caller that the key was successfully locked.
|
|
//which indicates to the caller that the key was successfully locked.
|
|
//NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
//NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
- if (nakedReply->element[2]->len > 0)
|
|
|
|
|
|
+ if (reply->query()->element[2]->len > 0)
|
|
{
|
|
{
|
|
- retVal->set(nakedReply->element[2]->len, nakedReply->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
|
|
|
+ retVal->set(reply->query()->element[2]->len, reply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- VStringBuffer msg("Redis Plugin: ERROR - the requested key '%s' does not exist on database %" I64F "u", key, database);
|
|
|
|
- rtlFail(0, msg.str());
|
|
|
|
|
|
+ //fail that key does not exist
|
|
|
|
+ redisReply fakeReply;
|
|
|
|
+ fakeReply.type = REDIS_REPLY_NIL;
|
|
|
|
+ assertKey(&fakeReply, key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
throwUnexpected();
|
|
throwUnexpected();
|
|
@@ -854,7 +869,7 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
|
|
replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
|
|
replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);
|
|
|
|
|
|
+ assertOnErrorWithCmdMsg(replyContainer->query(), "SET", key);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
@@ -867,10 +882,10 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
|
|
|
|
|
|
//Now read and assert replies
|
|
//Now read and assert replies
|
|
OwnedReply reply = new Reply();
|
|
OwnedReply reply = new Reply();
|
|
- readReplyAndAssertWithKey(reply, "SET", key);//MULTI reply
|
|
|
|
- readReplyAndAssertWithKey(reply, "SET", key);//SET reply
|
|
|
|
- readReplyAndAssertWithKey(reply, "PUB for the key", key);//PUB reply
|
|
|
|
- readReplyAndAssertWithKey(reply, "SET", key);//EXEC reply
|
|
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply, "SET", key);//MULTI reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply, "SET", key);//SET reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply, "PUB for the key", key);//PUB reply
|
|
|
|
+ readReplyAndAssertWithCmdMsg(reply, "SET", key);//EXEC reply
|
|
}
|
|
}
|
|
|
|
|
|
//NOTE: When setting and publishing the data with a pipelined MULTI-SET-PUB-EXEC, the data is sent twice, once with the SET and again with the PUBLISH.
|
|
//NOTE: When setting and publishing the data with a pipelined MULTI-SET-PUB-EXEC, the data is sent twice, once with the SET and again with the PUBLISH.
|
|
@@ -897,20 +912,20 @@ bool Connection::noScript(const redisReply * reply) const
|
|
// ECL SERVICE ENTRYPOINTS
|
|
// ECL SERVICE ENTRYPOINTS
|
|
//--------------------------------------------------------------------------------
|
|
//--------------------------------------------------------------------------------
|
|
//-----------------------------------SET------------------------------------------
|
|
//-----------------------------------SET------------------------------------------
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
SyncLockRSet(ctx, options, key, valueLength, value, database, expire, password, timeout);
|
|
SyncLockRSet(ctx, options, key, valueLength, value, database, expire, password, timeout);
|
|
returnLength = valueLength;
|
|
returnLength = valueLength;
|
|
returnValue = (char*)allocateAndCopy(value, valueLength);
|
|
returnValue = (char*)allocateAndCopy(value, valueLength);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
unsigned valueSize = (valueLength)*sizeof(UChar);
|
|
unsigned valueSize = (valueLength)*sizeof(UChar);
|
|
SyncLockRSet(ctx, options, key, valueSize, (char*)value, database, expire, password, timeout);
|
|
SyncLockRSet(ctx, options, key, valueSize, (char*)value, database, expire, password, timeout);
|
|
returnLength= valueLength;
|
|
returnLength= valueLength;
|
|
returnValue = (UChar*)allocateAndCopy(value, valueSize);
|
|
returnValue = (UChar*)allocateAndCopy(value, valueSize);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
unsigned valueSize = rtlUtf8Size(valueLength, value);
|
|
unsigned valueSize = rtlUtf8Size(valueLength, value);
|
|
SyncLockRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
SyncLockRSet(ctx, options, key, valueSize, value, database, expire, password, timeout);
|
|
@@ -918,13 +933,13 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t
|
|
returnValue = (char*)allocateAndCopy(value, valueSize);
|
|
returnValue = (char*)allocateAndCopy(value, valueSize);
|
|
}
|
|
}
|
|
//-------------------------------------GET----------------------------------------
|
|
//-------------------------------------GET----------------------------------------
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
|
|
{
|
|
{
|
|
size_t _returnSize;
|
|
size_t _returnSize;
|
|
SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout);
|
|
SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout);
|
|
returnSize = static_cast<size32_t>(_returnSize);
|
|
returnSize = static_cast<size32_t>(_returnSize);
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
|
|
{
|
|
{
|
|
size_t returnSize;
|
|
size_t returnSize;
|
|
char * _returnValue;
|
|
char * _returnValue;
|
|
@@ -932,13 +947,13 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t
|
|
returnValue = (UChar*)_returnValue;
|
|
returnValue = (UChar*)_returnValue;
|
|
returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
|
|
returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, int database, const char * password, unsigned timeout, unsigned expire)
|
|
{
|
|
{
|
|
size_t returnSize;
|
|
size_t returnSize;
|
|
SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout);
|
|
SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout);
|
|
returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
|
|
returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
|
|
}
|
|
}
|
|
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
|
|
|
|
|
|
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout)
|
|
{
|
|
{
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
Owned<Connection> master = Connection::createConnection(ctx, options, database, password, timeout);
|
|
master->unlock(ctx, key);
|
|
master->unlock(ctx, key);
|