|
@@ -143,15 +143,15 @@ public :
|
|
|
static Connection * createConnection(ICodeContext * ctx, Connection * & _cachedConnection, const char * options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool cachedConnectionRequested, bool isSubscription = false);
|
|
|
|
|
|
//set
|
|
|
- template <class type> void set(ICodeContext * ctx, const char * key, type value, unsigned expire);
|
|
|
- template <class type> void set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
|
|
|
- void setInt(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned);
|
|
|
- void setReal(ICodeContext * ctx, const char * key, double value, unsigned expire);
|
|
|
+ template <class type> void setKey(ICodeContext * ctx, const char * key, type value, unsigned expire);
|
|
|
+ template <class type> void setKey(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire);
|
|
|
+ void setIntKey(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned);
|
|
|
+ void setRealKey(ICodeContext * ctx, const char * key, double value, unsigned expire);
|
|
|
|
|
|
//get
|
|
|
- template <class type> void get(ICodeContext * ctx, const char * key, type & value);
|
|
|
- template <class type> void get(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
|
|
|
- template <class type> void getNumeric(ICodeContext * ctx, const char * key, type & value);
|
|
|
+ template <class type> void getKey(ICodeContext * ctx, const char * key, type & value);
|
|
|
+ template <class type> void getKey(ICodeContext * ctx, const char * key, size_t & valueSize, type * & value);
|
|
|
+ template <class type> void getNumericKey(ICodeContext * ctx, const char * key, type & value);
|
|
|
signed __int64 returnInt(const char * key, const char * cmd, const redisReply * reply);
|
|
|
|
|
|
//-------------------------------LOCKING------------------------------------------------
|
|
@@ -201,7 +201,7 @@ protected :
|
|
|
void assertOnErrorWithCmdMsg(const redisReply * reply, const char * cmd, const char * key = nullptr);
|
|
|
void assertConnection(const char * _msg);
|
|
|
void assertConnectionWithCmdMsg(const char * cmd, const char * key = nullptr);
|
|
|
- void fail(const char * cmd, const char * errmsg, const char * key = nullptr);
|
|
|
+ __declspec(noreturn) void fail(const char * cmd, const char * errmsg, const char * key = nullptr) __attribute__((noreturn));
|
|
|
void * redisCommand(const char * format, ...);
|
|
|
void fromStr(const char * str, const char * key, double & ret);
|
|
|
void fromStr(const char * str, const char * key, signed __int64 & ret);
|
|
@@ -237,6 +237,7 @@ protected :
|
|
|
class ConnectionContainer : public CInterface
|
|
|
{
|
|
|
public :
|
|
|
+ ConnectionContainer() { }
|
|
|
ConnectionContainer(Connection * _connection)
|
|
|
{
|
|
|
connection.setown(_connection);
|
|
@@ -247,6 +248,13 @@ public :
|
|
|
connection->unsubscribe();
|
|
|
}
|
|
|
inline Connection * operator -> () const { return connection.get(); }
|
|
|
+ inline void setown(Connection * _connection) { connection.setown(_connection); }
|
|
|
+ __declspec(noreturn) void handleException(IException * error) __attribute__((noreturn))
|
|
|
+ {
|
|
|
+ if (connection)
|
|
|
+ connection->freeContext();
|
|
|
+ throw error;
|
|
|
+ }
|
|
|
|
|
|
Owned<Connection> connection;
|
|
|
};
|
|
@@ -438,6 +446,7 @@ void Connection::freeContext()
|
|
|
{
|
|
|
redisFree(context);
|
|
|
context = nullptr;
|
|
|
+ database = 0;
|
|
|
}
|
|
|
}
|
|
|
const char * Connection::encodeChannel(StringBuffer & buffer, const char * keyOrChannel, int _database, bool lockedKey) const
|
|
@@ -908,7 +917,7 @@ signed __int64 Connection::incrBy(ICodeContext * ctx, const char * key, signed _
|
|
|
return returnInt(key, "INCRBY", reply->query());
|
|
|
}
|
|
|
//-------------------------------------------SET-----------------------------------------
|
|
|
-void Connection::setInt(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned)
|
|
|
+void Connection::setIntKey(ICodeContext * ctx, const char * key, signed __int64 value, unsigned expire, bool _unsigned)
|
|
|
{
|
|
|
StringBuffer cmd("SET %b %" I64F);
|
|
|
if (_unsigned)
|
|
@@ -920,7 +929,7 @@ void Connection::setInt(ICodeContext * ctx, const char * key, signed __int64 val
|
|
|
OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), value));
|
|
|
assertOnErrorWithCmdMsg(reply->query(), "SET", key);
|
|
|
}
|
|
|
-void Connection::setReal(ICodeContext * ctx, const char * key, double value, unsigned expire)
|
|
|
+void Connection::setRealKey(ICodeContext * ctx, const char * key, double value, unsigned expire)
|
|
|
{
|
|
|
StringBuffer cmd("SET %b %.16g");
|
|
|
appendExpire(cmd, expire);
|
|
@@ -930,17 +939,33 @@ void Connection::setReal(ICodeContext * ctx, const char * key, double value, uns
|
|
|
//--OUTER--
|
|
|
template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, type value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->set(ctx, key, value, expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->setKey(ctx, key, value, expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
//Set pointer types
|
|
|
template<class type> void SyncRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const type * value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->set(ctx, key, valueSize, value, expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->setKey(ctx, key, valueSize, value, expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
//--INNER--
|
|
|
-template<class type> void Connection::set(ICodeContext * ctx, const char * key, type value, unsigned expire)
|
|
|
+template<class type> void Connection::setKey(ICodeContext * ctx, const char * key, type value, unsigned expire)
|
|
|
{
|
|
|
const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
|
|
|
|
|
@@ -950,7 +975,7 @@ template<class type> void Connection::set(ICodeContext * ctx, const char * key,
|
|
|
OwnedReply reply = Reply::createReply(redisCommand(cmd.str(), key, strlen(key), _value, sizeof(type)));
|
|
|
assertOnErrorWithCmdMsg(reply->query(), "SET", key);
|
|
|
}
|
|
|
-template<class type> void Connection::set(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
|
|
|
+template<class type> void Connection::setKey(ICodeContext * ctx, const char * key, size32_t valueSize, const type * value, unsigned expire)
|
|
|
{
|
|
|
const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
|
|
|
|
|
@@ -973,18 +998,42 @@ signed __int64 Connection::returnInt(const char * key, const char * cmd, const r
|
|
|
//--OUTER--
|
|
|
template<class type> void SyncRGetNumeric(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->getNumeric(ctx, key, returnValue);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->getNumericKey(ctx, key, returnValue);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, type & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->get(ctx, key, returnValue);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->getKey(ctx, key, returnValue);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
template<class type> void SyncRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, type * & returnValue, int database, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->get(ctx, key, returnSize, returnValue);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->getKey(ctx, key, returnSize, returnValue);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
void Connection::fromStr(const char * str, const char * key, double & ret)
|
|
|
{
|
|
@@ -1008,7 +1057,7 @@ void Connection::fromStr(const char * str, const char * key, unsigned __int64 &
|
|
|
fail("GetUnsigned", "value returned out of range", key);
|
|
|
}
|
|
|
//--INNER--
|
|
|
-template<class type> void Connection::getNumeric(ICodeContext * ctx, const char * key, type & returnValue)
|
|
|
+template<class type> void Connection::getNumericKey(ICodeContext * ctx, const char * key, type & returnValue)
|
|
|
{
|
|
|
OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
|
|
|
|
|
@@ -1016,7 +1065,7 @@ template<class type> void Connection::getNumeric(ICodeContext * ctx, const char
|
|
|
assertKey(reply->query(), key);
|
|
|
fromStr(reply->query()->str, key, returnValue);
|
|
|
}
|
|
|
-template<class type> void Connection::get(ICodeContext * ctx, const char * key, type & returnValue)
|
|
|
+template<class type> void Connection::getKey(ICodeContext * ctx, const char * key, type & returnValue)
|
|
|
{
|
|
|
OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
|
|
|
|
|
@@ -1031,7 +1080,7 @@ template<class type> void Connection::get(ICodeContext * ctx, const char * key,
|
|
|
}
|
|
|
memcpy(&returnValue, reply->query()->str, returnSize);
|
|
|
}
|
|
|
-template<class type> void Connection::get(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
|
|
|
+template<class type> void Connection::getKey(ICodeContext * ctx, const char * key, size_t & returnSize, type * & returnValue)
|
|
|
{
|
|
|
OwnedReply reply = Reply::createReply(redisCommand("GET %b", key, strlen(key)));
|
|
|
|
|
@@ -1062,50 +1111,122 @@ unsigned __int64 Connection::publish(ICodeContext * ctx, const char * keyOrChann
|
|
|
//--------------------------------------------------------------------------------
|
|
|
ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL SyncRPub(ICodeContext * ctx, const char * keyOrChannel, size32_t messageSize, const char * message, const char * options, int database, const char * password, unsigned timeout, bool lockedKey, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedPubConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested);
|
|
|
- return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedPubConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested));
|
|
|
+ return master->publish(ctx, keyOrChannel, messageSize, message, database, lockedKey);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSub(ICodeContext * ctx, size32_t & messageSize, char * & message, const char * keyOrChannel, const char * options, int database, const char * password, unsigned timeout, bool lockedKey, bool cachedConnectionRequested)
|
|
|
{
|
|
|
size_t _messageSize = 0;
|
|
|
- ConnectionContainer master(Connection::createConnection(ctx, cachedSubscriptionConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested, true));
|
|
|
- master->subAndWaitForSinglePub(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
|
|
|
- messageSize = static_cast<size32_t>(_messageSize);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedSubscriptionConnection, options, DUMMY_IP, DUMMY_PORT, true, 0, password, timeout, cachedConnectionRequested, true));
|
|
|
+ master->subAndWaitForSinglePub(ctx, keyOrChannel, _messageSize, message, database, lockedKey);
|
|
|
+ messageSize = static_cast<size32_t>(_messageSize);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL RClear(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->clear(ctx);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->clear(ctx);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API bool ECL_REDIS_CALL RExist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- return master->exists(ctx, key);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ return master->exists(ctx, key);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL RDel(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->del(ctx, key);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->del(ctx, key);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL RPersist(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->persist(ctx, key);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->persist(ctx, key);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL RExpire(ICodeContext * ctx, const char * key, const char * options, int database, unsigned _expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->expire(ctx, key, _expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->expire(ctx, key, _expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API unsigned __int64 ECL_REDIS_CALL RDBSize(ICodeContext * ctx, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- return master->dbSize(ctx);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ return master->dbSize(ctx);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API signed __int64 ECL_REDIS_CALL SyncRINCRBY(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- return master->incrBy(ctx, key, value);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ return master->incrBy(ctx, key, value);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
//-----------------------------------SET------------------------------------------
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSetStr(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
@@ -1118,18 +1239,42 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUChar(ICodeContext * ctx, const char *
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSetInt(ICodeContext * ctx, const char * key, signed __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->setInt(ctx, key, value, expire, false);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->setIntKey(ctx, key, value, expire, false);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSetUInt(ICodeContext * ctx, const char * key, unsigned __int64 value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->setInt(ctx, key, value, expire, true);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->setIntKey(ctx, key, value, expire, true);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSetReal(ICodeContext * ctx, const char * key, double value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->setReal(ctx, key, value, expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->setRealKey(ctx, key, value, expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncRSetBool(ICodeContext * ctx, const char * key, bool value, const char * options, int database, unsigned expire, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
@@ -1197,8 +1342,16 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncRGetData(ICodeContext * ctx, size32_t & re
|
|
|
//Set pointer types
|
|
|
void SyncLockRSet(ICodeContext * ctx, const char * _options, const char * key, size32_t valueSize, const char * value, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->lockSet(ctx, key, valueSize, value, expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, _options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->lockSet(ctx, key, valueSize, value, expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
//--INNER--
|
|
|
void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire)
|
|
@@ -1210,8 +1363,16 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
|
|
|
//--OUTER--
|
|
|
void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, int database, unsigned expire, const char * password, unsigned _timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested);
|
|
|
- master->lockGet(ctx, key, returnSize, returnValue, password, expire);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, _timeout, cachedConnectionRequested));
|
|
|
+ master->lockGet(ctx, key, returnSize, returnValue, password, expire);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
//--INNER--
|
|
|
void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password, unsigned expire)
|
|
@@ -1284,77 +1445,86 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
|
|
|
int _timeLeft = (int) timeLeft();//createConnection requires a timeout value, so create it here.
|
|
|
if (_timeLeft == 0 && timeout.getTimeout() != 0)//Disambiguate between zero time left and timeout = 0 => infinity.
|
|
|
rtlFail(0, "Redis Plugin: ERROR - function timed out internally.");
|
|
|
- ConnectionContainer subscriptionConnection(createConnection(ctx, cachedSubscriptionConnection, options.str(), ip.str(), port, false, 0, password, _timeLeft, canCacheConnections(isCachedConnection(), true), true));
|
|
|
- subscriptionConnection->subscribe(ctx, channel.str());
|
|
|
|
|
|
-#if(0)//Test publish before GET.
|
|
|
+ ConnectionContainer subscriptionConnection;
|
|
|
+ try
|
|
|
{
|
|
|
- OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
|
|
|
- assertOnError(pubReply->query(), "pub fail");
|
|
|
- }
|
|
|
+ subscriptionConnection.setown(createConnection(ctx, cachedSubscriptionConnection, options.str(), ip.str(), port, false, 0, password, _timeLeft, canCacheConnections(isCachedConnection(), true), true));
|
|
|
+ subscriptionConnection->subscribe(ctx, channel.str());
|
|
|
+
|
|
|
+#if(0)//Test publish before GET.
|
|
|
+ {
|
|
|
+ OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
|
|
|
+ assertOnError(pubReply->query(), "pub fail");
|
|
|
+ }
|
|
|
#endif
|
|
|
|
|
|
- //Now GET
|
|
|
- OwnedReply getReply = Reply::createReply((redisReply*)redisCommand("GET %b", key, strlen(key)));
|
|
|
- assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
|
|
|
+ //Now GET
|
|
|
+ OwnedReply getReply = Reply::createReply((redisReply*)redisCommand("GET %b", key, strlen(key)));
|
|
|
+ assertOnErrorWithCmdMsg(getReply->query(), "GetOrLock<type>", key);
|
|
|
|
|
|
#if(0)//Test publish after GET.
|
|
|
- {
|
|
|
- OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
|
|
|
- assertOnError(pubReply->query(), "pub fail");
|
|
|
- }
|
|
|
+ {
|
|
|
+ OwnedReply pubReply = Reply::createReply(redisCommand("PUBLISH %b %b", channel.str(), (size_t)channel.length(), "foo", (size_t)3));
|
|
|
+ assertOnError(pubReply->query(), "pub fail");
|
|
|
+ }
|
|
|
#endif
|
|
|
|
|
|
- //Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
|
|
|
- //failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
|
|
|
- if (getReply->query()->type != REDIS_REPLY_NIL && getReply->query()->str && strncmp(getReply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
- {
|
|
|
- retVal->set(getReply->query()->len, getReply->query()->str);
|
|
|
- return;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
|
- if (getReply->query()->str && strcmp(getReply->query()->str, channel.str()) !=0 )
|
|
|
+ //Only return an actual value, i.e. neither the lock value nor an empty string. The latter is unlikely since we know that lock()
|
|
|
+ //failed, indicating that the key existed. If this is an actual value, it is however, possible for it to have been DELeted in the interim.
|
|
|
+ if (getReply->query()->type != REDIS_REPLY_NIL && getReply->query()->str && strncmp(getReply->query()->str, REDIS_LOCK_PREFIX, strlen(REDIS_LOCK_PREFIX)) != 0)
|
|
|
{
|
|
|
- VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", getReply->query()->str, channel.str());
|
|
|
- fail("GetOrLock<type>", msg.str(), key);
|
|
|
- //MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
- //However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
|
- //More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
|
- //this plugin, rather than the string retrieved as the lock value (the value of the locked key).
|
|
|
+ retVal->set(getReply->query()->len, getReply->query()->str);
|
|
|
+ return;
|
|
|
}
|
|
|
- getReply.clear();
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //Check that the lock was set by this plugin and thus that we subscribed to the expected channel.
|
|
|
+ if (getReply->query()->str && strcmp(getReply->query()->str, channel.str()) !=0 )
|
|
|
+ {
|
|
|
+ VStringBuffer msg("key locked with a channel ('%s') different to that subscribed to (%s).", getReply->query()->str, channel.str());
|
|
|
+ fail("GetOrLock<type>", msg.str(), key);
|
|
|
+ //MORE: In theory, it is possible to recover at this stage by subscribing to the channel that the key was actually locked with.
|
|
|
+ //However, we may have missed the massage publication already or by then, but could SUB again in case we haven't.
|
|
|
+ //More importantly and furthermore, the publication (in SetAndPublish<type>) will only publish to the channel encoded by
|
|
|
+ //this plugin, rather than the string retrieved as the lock value (the value of the locked key).
|
|
|
+ }
|
|
|
+ getReply.clear();
|
|
|
|
|
|
#if(0)//Added to allow for manual pub testing via redis-cli
|
|
|
- struct timeval to = { 10, 0 };//10secs
|
|
|
- ::redisSetTimeout(subscriptionConnection->context, to);
|
|
|
+ struct timeval to = { 10, 0 };//10secs
|
|
|
+ ::redisSetTimeout(subscriptionConnection->context, to);
|
|
|
#endif
|
|
|
|
|
|
- OwnedReply subReply = new Reply();
|
|
|
- subscriptionConnection->readReply(subReply);
|
|
|
- subscriptionConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
|
|
|
+ OwnedReply subReply = new Reply();
|
|
|
+ subscriptionConnection->readReply(subReply);
|
|
|
+ subscriptionConnection->assertOnErrorWithCmdMsg(subReply->query(), "GetOrLock<type>", key);
|
|
|
|
|
|
- if (subscriptionConnection->isCorrectChannel(subReply->query(), "message"))
|
|
|
- {
|
|
|
- //We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
|
- //Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
|
- //string is equivalent to a non-existent key.
|
|
|
- //More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
|
- //which indicates to the caller that the key was successfully locked.
|
|
|
- //NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
|
- if (subReply->query()->element[2]->len > 0)
|
|
|
+ if (subscriptionConnection->isCorrectChannel(subReply->query(), "message"))
|
|
|
{
|
|
|
- retVal->set(subReply->query()->element[2]->len, subReply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
- return;
|
|
|
+ //We are about to return a value, to conform with other Get<type> functions, fail if the key did not exist.
|
|
|
+ //Since the value is sent via a published message, there is no direct reply struct so assume that an empty
|
|
|
+ //string is equivalent to a non-existent key.
|
|
|
+ //More importantly, it is paramount that this routine only return an empty string under one condition, that
|
|
|
+ //which indicates to the caller that the key was successfully locked.
|
|
|
+ //NOTE: it is possible for an empty message to have been PUBLISHed.
|
|
|
+ if (subReply->query()->element[2]->len > 0)
|
|
|
+ {
|
|
|
+ retVal->set(subReply->query()->element[2]->len, subReply->query()->element[2]->str);//return the published value rather than another (WATCHed) GET.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //fail that key does not exist
|
|
|
+ redisReply fakeReply;
|
|
|
+ fakeReply.type = REDIS_REPLY_NIL;
|
|
|
+ assertKey(&fakeReply, key);
|
|
|
}
|
|
|
- //fail that key does not exist
|
|
|
- redisReply fakeReply;
|
|
|
- fakeReply.type = REDIS_REPLY_NIL;
|
|
|
- assertKey(&fakeReply, key);
|
|
|
}
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ subscriptionConnection.handleException(error);
|
|
|
}
|
|
|
- throwUnexpected();
|
|
|
}
|
|
|
void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire)
|
|
|
{
|
|
@@ -1471,7 +1641,15 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t
|
|
|
}
|
|
|
ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, int database, const char * password, unsigned timeout, bool cachedConnectionRequested)
|
|
|
{
|
|
|
- Owned<Connection> master = Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested);
|
|
|
- master->unlock(ctx, key);
|
|
|
+ ConnectionContainer master;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ master.setown(Connection::createConnection(ctx, cachedConnection, options, DUMMY_IP, DUMMY_PORT, true, database, password, timeout, cachedConnectionRequested));
|
|
|
+ master->unlock(ctx, key);
|
|
|
+ }
|
|
|
+ catch (IException * error)
|
|
|
+ {
|
|
|
+ master.handleException(error);
|
|
|
+ }
|
|
|
}
|
|
|
}//close namespace
|