memcachedplugin.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "memcachedplugin.hpp"
  15. #include "eclrtl.hpp"
  16. #include "jexcept.hpp"
  17. #include "jstring.hpp"
  18. #include "workunit.hpp"
  19. #include <libmemcached/memcached.hpp>
  20. #include <libmemcached/util.h>
  21. #define MEMCACHED_VERSION "memcached plugin 1.0.0"
  22. ECL_MEMCACHED_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  23. {
  24. if (pb->size != sizeof(ECLPluginDefinitionBlock))
  25. return false;
  26. pb->magicVersion = PLUGIN_VERSION;
  27. pb->version = MEMCACHED_VERSION;
  28. pb->moduleName = "lib_memcached";
  29. pb->ECL = NULL;
  30. pb->flags = PLUGIN_IMPLICIT_MODULE;
  31. pb->description = "ECL plugin library for the C/C++ API libmemcached (http://libmemcached.org/)\n";
  32. return true;
  33. }
  34. namespace MemCachedPlugin {
  35. IPluginContext * parentCtx = NULL;
  36. static const unsigned unitExpire = 86400;//1 day (secs)
  37. enum eclDataType {
  38. ECL_BOOLEAN,
  39. ECL_DATA,
  40. ECL_INTEGER,
  41. ECL_REAL,
  42. ECL_STRING,
  43. ECL_UTF8,
  44. ECL_UNICODE,
  45. ECL_UNSIGNED,
  46. ECL_NONE
  47. };
  48. const char * enumToStr(eclDataType type)
  49. {
  50. switch(type)
  51. {
  52. case ECL_BOOLEAN:
  53. return "BOOLEAN";
  54. case ECL_INTEGER:
  55. return "INTEGER";
  56. case ECL_UNSIGNED:
  57. return "UNSIGNED";
  58. case ECL_REAL:
  59. return "REAL";
  60. case ECL_STRING:
  61. return "STRING";
  62. case ECL_UTF8:
  63. return "UTF8";
  64. case ECL_UNICODE:
  65. return "UNICODE";
  66. case ECL_DATA:
  67. return "DATA";
  68. case ECL_NONE:
  69. return "Nonexistent";
  70. default:
  71. return "UNKNOWN";
  72. }
  73. }
  74. class MCached : public CInterface
  75. {
  76. public :
  77. MCached(ICodeContext * ctx, const char * servers);
  78. ~MCached();
  79. //set
  80. template <class type> bool set(ICodeContext * ctx, const char * partitionKey, const char * key, type value, unsigned expire, eclDataType eclType);
  81. template <class type> bool set(ICodeContext * ctx, const char * partitionKey, const char * key, size32_t valueLength, const type * value, unsigned expire, eclDataType eclType);
  82. //get
  83. template <class type> void get(ICodeContext * ctx, const char * partitionKey, const char * key, type & value, eclDataType eclType);
  84. template <class type> void get(ICodeContext * ctx, const char * partitionKey, const char * key, size_t & valueLength, type * & value, eclDataType eclType);
  85. void getVoidPtrLenPair(ICodeContext * ctx, const char * partitionKey, const char * key, size_t & valueLength, void * & value, eclDataType eclType);
  86. bool clear(ICodeContext * ctx, unsigned when);
  87. bool exist(ICodeContext * ctx, const char * key, const char * partitionKey);
  88. eclDataType getKeyType(const char * key, const char * partitionKey);
  89. bool isSameConnection(const char * _servers) const;
  90. private :
  91. void checkServersUp(ICodeContext * ctx);
  92. void assertOnError(memcached_return_t error, const char * msgSuffix = "");
  93. const char * keyNotFoundMsg(memcached_return_t error, const char * key, StringBuffer & target) const;
  94. void connect(ICodeContext * ctx);
  95. bool reportErrorOnFail(ICodeContext * ctx, memcached_return_t error);
  96. void reportKeyTypeMismatch(ICodeContext * ctx, const char * key, uint32_t flag, eclDataType eclType);
  97. void * cpy(const char * src, size_t length);
  98. void logServerStats(ICodeContext * ctx);
  99. void init(ICodeContext * ctx);
  100. void invokePoolSecurity(ICodeContext * ctx);
  101. void invokeConnectionSecurity(ICodeContext * ctx);
  102. void setPoolSettings();
  103. void assertPool();//For internal purposes to insure correct order of the above processes and instantiation.
  104. private :
  105. memcached_st * connection;
  106. memcached_pool_st * pool;
  107. StringAttr options;
  108. bool alreadyInitialized;
  109. unsigned typeMismatchCount;
  110. };
  111. #define OwnedMCached Owned<MemCachedPlugin::MCached>
  112. #define MAX_TYPEMISMATCHCOUNT 10
  113. static CriticalSection crit;
  114. static OwnedMCached cachedConnection;
  115. MCached * createConnection(ICodeContext * ctx, const char * servers)
  116. {
  117. CriticalBlock block(crit);
  118. if (!cachedConnection)
  119. {
  120. cachedConnection.setown(new MemCachedPlugin::MCached(ctx, servers));
  121. return LINK(cachedConnection);
  122. }
  123. if (cachedConnection->isSameConnection(servers))
  124. return LINK(cachedConnection);
  125. cachedConnection.setown(new MemCachedPlugin::MCached(ctx, servers));
  126. return LINK(cachedConnection);
  127. }
  128. //-------------------------------------------SET-----------------------------------------
  129. template<class type> bool MSet(ICodeContext * ctx, const char * _servers, const char * partitionKey, const char * key, type value, unsigned expire, eclDataType eclType)
  130. {
  131. OwnedMCached serverPool = createConnection(ctx, _servers);
  132. bool success = serverPool->set(ctx, partitionKey, key, value, expire, eclType);
  133. return success;
  134. }
  135. //Set pointer types
  136. template<class type> bool MSet(ICodeContext * ctx, const char * _servers, const char * partitionKey, const char * key, size32_t valueLength, const type * value, unsigned expire, eclDataType eclType)
  137. {
  138. OwnedMCached serverPool = createConnection(ctx, _servers);
  139. bool success = serverPool->set(ctx, partitionKey, key, valueLength, value, expire, eclType);
  140. return success;
  141. }
  142. //-------------------------------------------GET-----------------------------------------
  143. template<class type> void MGet(ICodeContext * ctx, const char * servers, const char * partitionKey, const char * key, type & returnValue, eclDataType eclType)
  144. {
  145. OwnedMCached serverPool = createConnection(ctx, servers);
  146. serverPool->get(ctx, partitionKey, key, returnValue, eclType);
  147. }
  148. template<class type> void MGet(ICodeContext * ctx, const char * servers, const char * partitionKey, const char * key, size_t & returnLength, type * & returnValue, eclDataType eclType)
  149. {
  150. OwnedMCached serverPool = createConnection(ctx, servers);
  151. serverPool->get(ctx, partitionKey, key, returnLength, returnValue, eclType);
  152. }
  153. void MGetVoidPtrLenPair(ICodeContext * ctx, const char * servers, const char * partitionKey, const char * key, size_t & returnLength, void * & returnValue, eclDataType eclType)
  154. {
  155. OwnedMCached serverPool = createConnection(ctx, servers);
  156. serverPool->getVoidPtrLenPair(ctx, partitionKey, key, returnLength, returnValue, eclType);
  157. }
  158. }//close namespace
  159. //----------------------------------SET----------------------------------------
  160. template<class type> bool MemCachedPlugin::MCached::set(ICodeContext * ctx, const char * partitionKey, const char * key, type value, unsigned expire, eclDataType eclType)
  161. {
  162. const char * _value = reinterpret_cast<const char *>(&value);//Do this even for char * to prevent compiler complaining
  163. size_t partitionKeyLength = strlen(partitionKey);
  164. if (partitionKeyLength)
  165. return !reportErrorOnFail(ctx, memcached_set_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), _value, sizeof(value), (time_t)(expire*unitExpire), (uint32_t)eclType));
  166. else
  167. return !reportErrorOnFail(ctx, memcached_set(connection, key, strlen(key), _value, sizeof(value), (time_t)(expire*unitExpire), (uint32_t)eclType));
  168. }
  169. template<class type> bool MemCachedPlugin::MCached::set(ICodeContext * ctx, const char * partitionKey, const char * key, size32_t valueLength, const type * value, unsigned expire, eclDataType eclType)
  170. {
  171. const char * _value = reinterpret_cast<const char *>(value);//Do this even for char * to prevent compiler complaining
  172. size_t partitionKeyLength = strlen(partitionKey);
  173. if (partitionKeyLength)
  174. return !reportErrorOnFail(ctx, memcached_set_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), _value, (size_t)(valueLength), (time_t)(expire*unitExpire), (uint32_t)eclType));
  175. else
  176. return !reportErrorOnFail(ctx, memcached_set(connection, key, strlen(key), _value, (size_t)(valueLength), (time_t)(expire*unitExpire), (uint32_t)eclType));
  177. }
  178. //----------------------------------GET----------------------------------------
  179. template<class type> void MemCachedPlugin::MCached::get(ICodeContext * ctx, const char * partitionKey, const char * key, type & returnValue, eclDataType eclType)
  180. {
  181. uint32_t flag = 0;
  182. size_t returnLength = 0;
  183. memcached_return_t error;
  184. OwnedMalloc<char> value;
  185. size_t partitionKeyLength = strlen(partitionKey);
  186. if (partitionKeyLength)
  187. value.setown(memcached_get_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), &returnLength, &flag, &error));
  188. else
  189. value.setown(memcached_get(connection, key, strlen(key), &returnLength, &flag, &error));
  190. StringBuffer keyMsg;
  191. assertOnError(error, keyNotFoundMsg(error, key, keyMsg));
  192. reportKeyTypeMismatch(ctx, key, flag, eclType);
  193. if (sizeof(type)!=returnLength)
  194. {
  195. VStringBuffer msg("MemCachedPlugin: ERROR - Requested type of different size (%uB) from that stored (%uB). Check logs for more information.", (unsigned)sizeof(type), (unsigned)returnLength);
  196. rtlFail(0, msg.str());
  197. }
  198. memcpy(&returnValue, value, returnLength);
  199. }
  200. template<class type> void MemCachedPlugin::MCached::get(ICodeContext * ctx, const char * partitionKey, const char * key, size_t & returnLength, type * & returnValue, eclDataType eclType)
  201. {
  202. uint32_t flag = 0;
  203. memcached_return_t error;
  204. OwnedMalloc<char> value;
  205. size_t partitionKeyLength = strlen(partitionKey);
  206. if (partitionKeyLength)
  207. value.setown(memcached_get_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), &returnLength, &flag, &error));
  208. else
  209. value.setown(memcached_get(connection, key, strlen(key), &returnLength, &flag, &error));
  210. StringBuffer keyMsg;
  211. assertOnError(error, keyNotFoundMsg(error, key, keyMsg));
  212. reportKeyTypeMismatch(ctx, key, flag, eclType);
  213. returnValue = reinterpret_cast<type*>(cpy(value, returnLength));
  214. }
  215. void MemCachedPlugin::MCached::getVoidPtrLenPair(ICodeContext * ctx, const char * partitionKey, const char * key, size_t & returnLength, void * & returnValue, eclDataType eclType)
  216. {
  217. uint32_t flag = 0;
  218. size_t returnValueLength = 0;
  219. memcached_return_t error;
  220. OwnedMalloc<char> value;
  221. size_t partitionKeyLength = strlen(partitionKey);
  222. if (partitionKeyLength)
  223. value.setown(memcached_get_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), &returnValueLength, &flag, &error));
  224. else
  225. value.setown(memcached_get(connection, key, strlen(key), &returnValueLength, &flag, &error));
  226. StringBuffer keyMsg;
  227. assertOnError(error, keyNotFoundMsg(error, key, keyMsg));
  228. reportKeyTypeMismatch(ctx, key, flag, eclType);
  229. returnLength = (size32_t)(returnValueLength);
  230. returnValue = reinterpret_cast<void*>(cpy(value, returnLength));
  231. }
  232. ECL_MEMCACHED_API void setPluginContext(IPluginContext * ctx) { MemCachedPlugin::parentCtx = ctx; }
  233. MemCachedPlugin::MCached::MCached(ICodeContext * ctx, const char * _options)
  234. {
  235. alreadyInitialized = false;
  236. connection = NULL;
  237. pool = NULL;
  238. options.set(_options);
  239. typeMismatchCount = 0;
  240. #if (LIBMEMCACHED_VERSION_HEX<0x53000)
  241. memcached_st *memc = memcached_create(NULL);
  242. memcached_return_t error;
  243. memcached_server_st *servers = NULL;
  244. try
  245. {
  246. unsigned pool_min = 1;
  247. unsigned pool_max = 1;
  248. StringArray optionStrings;
  249. optionStrings.appendList(_options, " ");
  250. ForEachItemIn(idx, optionStrings)
  251. {
  252. const char *opt = optionStrings.item(idx);
  253. if (strncmp(opt, "--SERVER=", 9) ==0)
  254. {
  255. opt += 9;
  256. StringArray splitPort;
  257. splitPort.appendList(opt, ":");
  258. unsigned port;
  259. if (splitPort.ordinality()==2)
  260. port = atoi(splitPort.item(1));
  261. else
  262. port = 11211;
  263. servers = memcached_server_list_append(NULL, splitPort.item(0), port, &error);
  264. assertOnError(error, "memcached_server_list_append failed");
  265. }
  266. else if (strncmp(opt, "--POOL-MIN=", 11) ==0)
  267. pool_min = atoi(opt+11);
  268. else if (strncmp(opt, "--POOL-MAX=", 11) ==0)
  269. pool_max = atoi(opt+11);
  270. else
  271. {
  272. VStringBuffer err("MemCachedPlugin: unsupported option string %s", opt);
  273. rtlFail(0, err.str());
  274. }
  275. }
  276. if (!servers)
  277. rtlFail(0, "No servers specified");
  278. error = memcached_server_push(memc, servers);
  279. memcached_server_list_free(servers);
  280. assertOnError(error, "memcached_server_push failed");
  281. pool = memcached_pool_create(memc, pool_min, pool_max); // takes ownership of memc
  282. }
  283. catch (...)
  284. {
  285. if (servers)
  286. memcached_server_list_free(servers);
  287. if (memc)
  288. memcached_free(memc);
  289. throw;
  290. }
  291. #else
  292. pool = memcached_pool(_options, strlen(_options));
  293. #endif
  294. assertPool();
  295. setPoolSettings();
  296. invokePoolSecurity(ctx);
  297. connect(ctx);
  298. checkServersUp(ctx);
  299. }
  300. //-----------------------------------------------------------------------------
  301. MemCachedPlugin::MCached::~MCached()
  302. {
  303. if (pool)
  304. {
  305. #if (LIBMEMCACHED_VERSION_HEX<0x53000)
  306. memcached_pool_push(pool, connection);
  307. #else
  308. memcached_pool_release(pool, connection);
  309. #endif
  310. connection = NULL;//For safety (from changing this destructor) as not implicit in either the above or below.
  311. memcached_st *memc = memcached_pool_destroy(pool);
  312. if (memc)
  313. memcached_free(memc);
  314. }
  315. else if (connection)//This should never be needed but just in case.
  316. {
  317. memcached_free(connection);
  318. }
  319. }
  320. bool MemCachedPlugin::MCached::isSameConnection(const char * _servers) const
  321. {
  322. if (!_servers)
  323. return false;
  324. return stricmp(options.get(), _servers) == 0;
  325. }
  326. void MemCachedPlugin::MCached::assertPool()
  327. {
  328. if (!pool)
  329. {
  330. StringBuffer msg = "Memcached Plugin: Failed to instantiate server pool with:";
  331. msg.newline().append(options);
  332. rtlFail(0, msg.str());
  333. }
  334. }
  335. void * MemCachedPlugin::MCached::cpy(const char * src, size_t length)
  336. {
  337. void * value = rtlMalloc(length);
  338. return memcpy(value, src, length);
  339. }
  340. void MemCachedPlugin::MCached::checkServersUp(ICodeContext * ctx)
  341. {
  342. memcached_return_t error;
  343. char * args = NULL;
  344. OwnedMalloc<memcached_stat_st> stats;
  345. stats.setown(memcached_stat(connection, args, &error));
  346. assertex(stats);
  347. unsigned int numberOfServers = memcached_server_count(connection);
  348. unsigned int numberOfServersDown = 0;
  349. for (unsigned i = 0; i < numberOfServers; ++i)
  350. {
  351. if (stats[i].pid == -1)//perhaps not the best test?
  352. {
  353. numberOfServersDown++;
  354. VStringBuffer msg("Memcached Plugin: Failed connecting to entry %u\nwithin the server list: %s", i+1, options.str());
  355. ctx->addWuException(msg.str(), WRN_FROM_PLUGIN, ExceptionSeverityWarning, "");
  356. }
  357. }
  358. if (numberOfServersDown == numberOfServers)
  359. rtlFail(0,"Memcached Plugin: Failed connecting to ALL servers. Check memcached on all servers and \"memcached -B ascii\" not used.");
  360. //check memcached version homogeneity
  361. for (unsigned i = 0; i < numberOfServers-1; ++i)
  362. {
  363. if (strcmp(stats[i].version, stats[i+1].version) != 0)
  364. ctx->addWuException("Memcached Plugin: Inhomogeneous versions of memcached across servers.", WRN_FROM_PLUGIN, ExceptionSeverityInformation, "");
  365. }
  366. }
  367. bool MemCachedPlugin::MCached::reportErrorOnFail(ICodeContext * ctx, memcached_return_t error)
  368. {
  369. if (error == MEMCACHED_SUCCESS)
  370. return false;
  371. VStringBuffer msg("Memcached Plugin: %s", memcached_strerror(connection, error));
  372. ctx->addWuException(msg.str(), ERR_FROM_PLUGIN, ExceptionSeverityInformation, "");
  373. return true;
  374. }
  375. void MemCachedPlugin::MCached::assertOnError(memcached_return_t error, const char * msgSuffix)
  376. {
  377. if (error != MEMCACHED_SUCCESS)
  378. {
  379. VStringBuffer msg("Memcached Plugin: %s%s", memcached_strerror(connection, error), msgSuffix);
  380. rtlFail(0, msg.str());
  381. }
  382. }
  383. const char * MemCachedPlugin::MCached::keyNotFoundMsg(memcached_return_t error, const char * key, StringBuffer & target) const
  384. {
  385. target.clear();
  386. if (error == MEMCACHED_NOTFOUND)
  387. {
  388. target = " (key: '";
  389. target.append(key).append("') ");
  390. }
  391. return target.str();
  392. }
  393. bool MemCachedPlugin::MCached::clear(ICodeContext * ctx, unsigned when)
  394. {
  395. //NOTE: memcached_flush is the actual cache flush/clear/delete and not an io buffer flush.
  396. return !reportErrorOnFail(ctx, memcached_flush(connection, (time_t)(when)));
  397. }
  398. bool MemCachedPlugin::MCached::exist(ICodeContext * ctx, const char * key, const char * partitionKey)
  399. {
  400. #if (LIBMEMCACHED_VERSION_HEX<0x53000)
  401. throw makeStringException(0, "memcached_exist not supported in this version of libmemcached");
  402. #else
  403. memcached_return_t error;
  404. size_t partitionKeyLength = strlen(partitionKey);
  405. if (partitionKeyLength)
  406. error = memcached_exist_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key));
  407. else
  408. error = memcached_exist(connection, key, strlen(key));
  409. if (error == MEMCACHED_SUCCESS)
  410. return true;
  411. else if (error == MEMCACHED_NOTFOUND)
  412. return false;
  413. reportErrorOnFail(ctx, error);
  414. return false;
  415. #endif
  416. }
  417. MemCachedPlugin::eclDataType MemCachedPlugin::MCached::getKeyType(const char * key, const char * partitionKey)
  418. {
  419. size_t returnValueLength;
  420. uint32_t flag;
  421. memcached_return_t error;
  422. size_t partitionKeyLength = strlen(partitionKey);
  423. if (partitionKeyLength)
  424. memcached_get_by_key(connection, partitionKey, partitionKeyLength, key, strlen(key), &returnValueLength, &flag, &error);
  425. else
  426. memcached_get(connection, key, strlen(key), &returnValueLength, &flag, &error);
  427. if (error == MEMCACHED_SUCCESS)
  428. return (MemCachedPlugin::eclDataType)(flag);
  429. else if (error == MEMCACHED_NOTFOUND)
  430. return ECL_NONE;
  431. else
  432. {
  433. StringBuffer msg = "Memcached Plugin: ";
  434. rtlFail(0, msg.append(memcached_strerror(connection, error)).str());
  435. }
  436. }
  437. void MemCachedPlugin::MCached::reportKeyTypeMismatch(ICodeContext * ctx, const char * key, uint32_t flag, eclDataType eclType)
  438. {
  439. if (flag && eclType != ECL_DATA && flag != eclType)
  440. {
  441. VStringBuffer msg("Memcached Plugin: The requested key '%s' is of type %s, not %s as requested.", key, enumToStr((eclDataType)(flag)), enumToStr(eclType));
  442. if (++typeMismatchCount <= MAX_TYPEMISMATCHCOUNT)
  443. ctx->logString(msg.str());//NOTE: logging locally, rather than calling ctx->addWuException, to prevent flooding the WU if this is called multiple times by every node
  444. }
  445. }
  446. void MemCachedPlugin::MCached::logServerStats(ICodeContext * ctx)
  447. {
  448. //NOTE: errors are ignored here so that at least some info is reported, such as non-connection related libmemcached version numbers
  449. memcached_return_t error;
  450. char * args = NULL;
  451. OwnedMalloc<memcached_stat_st> stats;
  452. stats.setown(memcached_stat(connection, args, &error));
  453. OwnedMalloc<char*> keys;
  454. keys.setown(memcached_stat_get_keys(connection, stats, &error));
  455. unsigned int numberOfServers = memcached_server_count(connection);
  456. for (unsigned int i = 0; i < numberOfServers; ++i)
  457. {
  458. StringBuffer statsStr;
  459. unsigned j = 0;
  460. do
  461. {
  462. OwnedMalloc<char> value;
  463. value.setown(memcached_stat_get_value(connection, &stats[i], keys[j], &error));
  464. statsStr.newline().append("libmemcached server stat - ").append(keys[j]).append(":").append(value);
  465. } while (keys[++j]);
  466. statsStr.newline().append("libmemcached client stat - libmemcached version:").append(memcached_lib_version());
  467. ctx->logString(statsStr.str());
  468. }
  469. }
  470. void MemCachedPlugin::MCached::init(ICodeContext * ctx)
  471. {
  472. logServerStats(ctx);
  473. }
  474. void MemCachedPlugin::MCached::setPoolSettings()
  475. {
  476. assertPool();
  477. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, 1));//key set in invokeConnectionSecurity. Only hashed with keys and not partitionKeys
  478. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_KETAMA, 1));//NOTE: alias of MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA amongst others.
  479. memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_USE_UDP, 0); // Note that this fails on early versions of libmemcached, so ignore result
  480. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT, 1));
  481. #if (LIBMEMCACHED_VERSION_HEX>=0x50000)
  482. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS, 1));
  483. #endif
  484. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_NO_BLOCK, 0));
  485. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT, 100));//units of ms MORE: What should I set this to or get from?
  486. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0));// Buffering does not work with the ecl runtime paradigm
  487. }
  488. void MemCachedPlugin::MCached::invokePoolSecurity(ICodeContext * ctx)
  489. {
  490. assertPool();
  491. assertOnError(memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1));
  492. }
  493. void MemCachedPlugin::MCached::invokeConnectionSecurity(ICodeContext * ctx)
  494. {
  495. //NOTE: Whether to assert or just report? This depends on when this is called. If before checkServersUp() and
  496. //a server is down, it will cause the following to fail if asserted with only a 'poor' libmemcached error message.
  497. //Reporting means that these 'security' measures may not be carried out. Moving checkServersUp() to here is probably the best
  498. //soln. however, this comes with extra overhead.
  499. reportErrorOnFail(ctx, memcached_verbosity(connection, (uint32_t)(0)));
  500. //reportErrorOnFail(ctx, memcached_callback_set(connection, MEMCACHED_CALLBACK_PREFIX_KEY, "ecl"));//NOTE: MEMCACHED_CALLBACK_PREFIX_KEY is an alias of MEMCACHED_CALLBACK_NAMESPACE
  501. }
  502. void MemCachedPlugin::MCached::connect(ICodeContext * ctx)
  503. {
  504. assertPool();
  505. if (connection)
  506. #if (LIBMEMCACHED_VERSION_HEX<0x53000)
  507. memcached_pool_push(pool, connection);
  508. #else
  509. memcached_pool_release(pool, connection);
  510. #endif
  511. memcached_return_t error;
  512. #if (LIBMEMCACHED_VERSION_HEX<0x53000)
  513. connection = memcached_pool_pop(pool, (struct timespec *)0 , &error);
  514. #else
  515. connection = memcached_pool_fetch(pool, (struct timespec *)0 , &error);
  516. #endif
  517. invokeConnectionSecurity(ctx);
  518. if (!alreadyInitialized)//Do this now rather than after assert. Better to have something even if it could be jiberish.
  519. {
  520. init(ctx);//doesn't necessarily initialize anything, instead outputs specs etc for debugging
  521. alreadyInitialized = true;
  522. }
  523. assertOnError(error);
  524. }
  525. //--------------------------------------------------------------------------------
  526. // ECL SERVICE ENTRYPOINTS
  527. //--------------------------------------------------------------------------------
  528. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MClear(ICodeContext * ctx, const char * servers)
  529. {
  530. OwnedMCached serverPool = MemCachedPlugin::createConnection(ctx, servers);
  531. bool returnValue = serverPool->clear(ctx, 0);
  532. return returnValue;
  533. }
  534. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MExist(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  535. {
  536. OwnedMCached serverPool = MemCachedPlugin::createConnection(ctx, servers);
  537. bool returnValue = serverPool->exist(ctx, key, partitionKey);
  538. return returnValue;
  539. }
  540. ECL_MEMCACHED_API const char * ECL_MEMCACHED_CALL MKeyType(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  541. {
  542. OwnedMCached serverPool = MemCachedPlugin::createConnection(ctx, servers);
  543. const char * keyType = enumToStr(serverPool->getKeyType(key, partitionKey));
  544. return keyType;
  545. }
  546. //-----------------------------------SET------------------------------------------
  547. //NOTE: These were all overloaded by 'value' type, however; this caused problems since ecl implicitly casts and doesn't type check.
  548. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, size32_t valueLength, const char * value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  549. {
  550. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, valueLength, value, expire, MemCachedPlugin::ECL_STRING);
  551. }
  552. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, size32_t valueLength, const UChar * value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  553. {
  554. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, (valueLength)*sizeof(UChar), value, expire, MemCachedPlugin::ECL_UNICODE);
  555. }
  556. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, signed __int64 value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  557. {
  558. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, value, expire, MemCachedPlugin::ECL_INTEGER);
  559. }
  560. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, unsigned __int64 value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  561. {
  562. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, value, expire, MemCachedPlugin::ECL_UNSIGNED);
  563. }
  564. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, double value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  565. {
  566. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, value, expire, MemCachedPlugin::ECL_REAL);
  567. }
  568. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSet(ICodeContext * ctx, const char * servers, const char * key, bool value, const char * partitionKey, unsigned expire)
  569. {
  570. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, value, expire, MemCachedPlugin::ECL_BOOLEAN);
  571. }
  572. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSetData(ICodeContext * ctx, const char * servers, const char * key, size32_t valueLength, const void * value, const char * partitionKey, unsigned expire)
  573. {
  574. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, valueLength, value, expire, MemCachedPlugin::ECL_DATA);
  575. }
  576. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MSetUtf8(ICodeContext * ctx, const char * servers, const char * key, size32_t valueLength, const char * value, const char * partitionKey, unsigned expire /* = 0 (ECL default)*/)
  577. {
  578. return MemCachedPlugin::MSet(ctx, servers, partitionKey, key, rtlUtf8Size(valueLength, value), value, expire, MemCachedPlugin::ECL_UTF8);
  579. }
  580. //-------------------------------------GET----------------------------------------
  581. ECL_MEMCACHED_API bool ECL_MEMCACHED_CALL MGetBool(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  582. {
  583. bool value;
  584. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, value, MemCachedPlugin::ECL_BOOLEAN);
  585. return value;
  586. }
  587. ECL_MEMCACHED_API double ECL_MEMCACHED_CALL MGetDouble(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  588. {
  589. double value;
  590. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, value, MemCachedPlugin::ECL_REAL);
  591. return value;
  592. }
  593. ECL_MEMCACHED_API signed __int64 ECL_MEMCACHED_CALL MGetInt8(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  594. {
  595. signed __int64 value;
  596. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, value, MemCachedPlugin::ECL_INTEGER);
  597. return value;
  598. }
  599. ECL_MEMCACHED_API unsigned __int64 ECL_MEMCACHED_CALL MGetUint8(ICodeContext * ctx, const char * servers, const char * key, const char * partitionKey)
  600. {
  601. unsigned __int64 value;
  602. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, value, MemCachedPlugin::ECL_UNSIGNED);
  603. return value;
  604. }
  605. ECL_MEMCACHED_API void ECL_MEMCACHED_CALL MGetStr(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * servers, const char * key, const char * partitionKey)
  606. {
  607. size_t _returnLength;
  608. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, _returnLength, returnValue, MemCachedPlugin::ECL_STRING);
  609. returnLength = static_cast<size32_t>(_returnLength);
  610. }
  611. ECL_MEMCACHED_API void ECL_MEMCACHED_CALL MGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue, const char * servers, const char * key, const char * partitionKey)
  612. {
  613. size_t _returnSize;
  614. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, _returnSize, returnValue, MemCachedPlugin::ECL_UNICODE);
  615. returnLength = static_cast<size32_t>(_returnSize/sizeof(UChar));
  616. }
  617. ECL_MEMCACHED_API void ECL_MEMCACHED_CALL MGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * servers, const char * key, const char * partitionKey)
  618. {
  619. size_t returnSize;
  620. MemCachedPlugin::MGet(ctx, servers, partitionKey, key, returnSize, returnValue, MemCachedPlugin::ECL_UTF8);
  621. returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
  622. }
  623. ECL_MEMCACHED_API void ECL_MEMCACHED_CALL MGetData(ICodeContext * ctx, size32_t & returnLength, void * & returnValue, const char * servers, const char * key, const char * partitionKey)
  624. {
  625. size_t _returnLength;
  626. MemCachedPlugin::MGetVoidPtrLenPair(ctx, servers, partitionKey, key, _returnLength, returnValue, MemCachedPlugin::ECL_DATA);
  627. returnLength = static_cast<size32_t>(_returnLength);
  628. }