cassandraembed.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _CASSANDRAEMBED_INCL
  14. #define _CASSANDRAEMBED_INCL
  15. #ifdef CASSANDRAEMBED_EXPORTS
  16. #define CASSANDRAEMBED_API DECL_EXPORT
  17. #else
  18. #define CASSANDRAEMBED_API DECL_IMPORT
  19. #endif
  20. namespace cassandraembed {
  21. __declspec(noreturn) extern void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  22. __declspec(noreturn) extern void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
  23. __declspec(noreturn) extern void fail(const char *msg) __attribute__((noreturn));
  24. extern void check(CassError rc);
  25. extern bool isInteger(const CassValueType t);
  26. extern bool isString(CassValueType t);
  27. // Wrappers to Cassandra structures that require corresponding releases
  28. class CASSANDRAEMBED_API CassandraPrepared : public CInterfaceOf<IInterface>
  29. {
  30. public:
  31. inline CassandraPrepared(const CassPrepared *_prepared, const char *_queryString)
  32. : prepared(_prepared), queryString(_queryString)
  33. {
  34. }
  35. inline ~CassandraPrepared()
  36. {
  37. if (prepared)
  38. cass_prepared_free(prepared);
  39. }
  40. inline operator const CassPrepared *() const
  41. {
  42. return prepared;
  43. }
  44. inline const char *queryQueryString() const
  45. {
  46. return queryString;
  47. }
  48. private:
  49. CassandraPrepared(const CassandraPrepared &);
  50. StringAttr queryString;
  51. const CassPrepared *prepared;
  52. };
  53. class CASSANDRAEMBED_API CassandraSession : public CInterface
  54. {
  55. public:
  56. inline CassandraSession() : session(NULL) {}
  57. inline CassandraSession(CassSession *_session) : session(_session)
  58. {
  59. }
  60. inline ~CassandraSession()
  61. {
  62. set(NULL);
  63. }
  64. void set(CassSession *_session);
  65. inline operator CassSession *() const
  66. {
  67. return session;
  68. }
  69. private:
  70. CassandraSession(const CassandraSession &);
  71. CassSession *session;
  72. };
  73. class CassandraStatementInfo;
  74. class CassandraStatement;
  75. class CASSANDRAEMBED_API CassandraClusterSession : public CInterface
  76. {
  77. public:
  78. inline CassandraClusterSession(CassCluster *_cluster)
  79. : cluster(_cluster), semaphore(NULL), maxFutures(0), maxRetries(0)
  80. {
  81. }
  82. void setOptions(const StringArray &options);
  83. inline ~CassandraClusterSession()
  84. {
  85. session.clear(); // Should do this before freeing cluster
  86. if (cluster)
  87. cass_cluster_free(cluster);
  88. if (semaphore)
  89. delete semaphore;
  90. }
  91. inline CassSession *querySession() const
  92. {
  93. return *session;
  94. }
  95. inline const CassCluster *queryCluster() const
  96. {
  97. return cluster;
  98. }
  99. inline const char *queryKeySpace() const
  100. {
  101. return keyspace;
  102. }
  103. inline void setKeySpace(const char *val)
  104. {
  105. keyspace.set(val);
  106. }
  107. void connect();
  108. void disconnect();
  109. CassandraPrepared *prepareStatement(const char *query, bool trace) const;
  110. CassandraStatementInfo *createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const;
  111. void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const;
  112. private:
  113. void checkSetOption(CassError rc, const char *name);
  114. cass_bool_t getBoolOption(const char *val, const char *option);
  115. unsigned getUnsignedOption(const char *val, const char *option);
  116. unsigned getDoubleOption(const char *val, const char *option);
  117. __uint64 getUnsigned64Option(const char *val, const char *option);
  118. CassandraClusterSession(const CassandraClusterSession &);
  119. CassCluster *cluster;
  120. Owned<CassandraSession> session;
  121. mutable MapStringToMyClass<CassandraPrepared> preparedCache;
  122. mutable CriticalSection cacheCrit;
  123. Semaphore *semaphore;
  124. unsigned maxFutures;
  125. unsigned maxRetries;
  126. StringAttr keyspace;
  127. };
  128. class CASSANDRAEMBED_API CassandraFuture : public CInterface
  129. {
  130. public:
  131. inline CassandraFuture(CassFuture *_future) : future(_future)
  132. {
  133. }
  134. inline ~CassandraFuture()
  135. {
  136. if (future)
  137. cass_future_free(future);
  138. }
  139. inline operator CassFuture *() const
  140. {
  141. return future;
  142. }
  143. void wait(const char *why) const;
  144. inline void set(CassFuture *_future)
  145. {
  146. if (future)
  147. cass_future_free(future);
  148. future = _future;
  149. }
  150. protected:
  151. CassandraFuture(const CassandraFuture &);
  152. CassFuture *future;
  153. };
  154. class CASSANDRAEMBED_API CassandraFutureResult : public CassandraFuture
  155. {
  156. public:
  157. inline CassandraFutureResult(CassFuture *_future) : CassandraFuture(_future)
  158. {
  159. result = NULL;
  160. }
  161. inline ~CassandraFutureResult()
  162. {
  163. if (result)
  164. cass_result_free(result);
  165. }
  166. inline operator const CassResult *() const
  167. {
  168. if (!result)
  169. {
  170. wait("FutureResult");
  171. result = cass_future_get_result(future);
  172. }
  173. return result;
  174. }
  175. private:
  176. CassandraFutureResult(const CassandraFutureResult &);
  177. mutable const CassResult *result;
  178. };
  179. class CASSANDRAEMBED_API CassandraBatch : public CInterface
  180. {
  181. public:
  182. inline CassandraBatch(CassBatchType _type)
  183. {
  184. batch = cass_batch_new(_type);
  185. }
  186. inline ~CassandraBatch()
  187. {
  188. if (batch)
  189. cass_batch_free(batch);
  190. }
  191. inline operator CassBatch *() const
  192. {
  193. return batch;
  194. }
  195. void execute(CassSession *session, const char *what)
  196. {
  197. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  198. futureBatch.wait(what);
  199. }
  200. private:
  201. CassandraBatch(const CassandraBatch &);
  202. CassBatch *batch;
  203. };
  204. class CASSANDRAEMBED_API CassandraStatement : public CInterface
  205. {
  206. public:
  207. inline CassandraStatement(CassStatement *_statement) : statement(_statement)
  208. {
  209. }
  210. inline CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
  211. {
  212. }
  213. inline CassandraStatement(const CassandraPrepared *_prepared) : statement(cass_prepared_bind(*_prepared))
  214. {
  215. const char *queryString = _prepared->queryQueryString(); // Only set when tracing..
  216. if (queryString)
  217. query.set(queryString);
  218. _prepared->Release();
  219. }
  220. inline ~CassandraStatement()
  221. {
  222. if (statement)
  223. cass_statement_free(statement);
  224. }
  225. operator CassStatement *() const
  226. {
  227. if (query.length())
  228. DBGLOG("Executing %s", query.str());
  229. return statement;
  230. }
  231. inline CassStatement *getClear()
  232. {
  233. CassStatement *ret = statement;
  234. statement = NULL;
  235. return ret;
  236. }
  237. void bindNull(unsigned idx)
  238. {
  239. if (query.length())
  240. traceBind(idx, "null");
  241. check(cass_statement_bind_null(statement, idx));
  242. }
  243. void bindBool(unsigned idx, cass_bool_t value)
  244. {
  245. if (query.length())
  246. traceBind(idx, "%s", value ? "true" : "false");
  247. check(cass_statement_bind_bool(statement, idx, value));
  248. }
  249. void bindInt32(unsigned idx, __int32 value)
  250. {
  251. if (query.length())
  252. traceBind(idx, "%d", value);
  253. check(cass_statement_bind_int32(statement, idx, value));
  254. }
  255. void bindInt64(unsigned idx, __int64 value)
  256. {
  257. if (query.length())
  258. traceBind(idx, "%" I64F "d", value);
  259. check(cass_statement_bind_int64(statement, idx, value));
  260. }
  261. void bindString(unsigned idx, const char *value)
  262. {
  263. if (query.length())
  264. {
  265. unsigned l = strlen(value);
  266. if (l > 100)
  267. l = 100;
  268. traceBind(idx, "'%.*s'", l, value);
  269. }
  270. check(cass_statement_bind_string(statement, idx, value));
  271. }
  272. void bindString_n(unsigned idx, const char *value, unsigned len)
  273. {
  274. if (strlen(value)<len)
  275. {
  276. if (query.length())
  277. traceBind(idx, "'%s'", value);
  278. check(cass_statement_bind_string(statement, idx, value));
  279. }
  280. else
  281. {
  282. if (query.length())
  283. traceBind(idx, "'%.*s'", len>100?100:len, value);
  284. check(cass_statement_bind_string_n(statement, idx, value, len));
  285. }
  286. }
  287. void bindBytes(unsigned idx, const cass_byte_t *value, unsigned len)
  288. {
  289. if (query.length())
  290. traceBind(idx, "(bytes)");
  291. check(cass_statement_bind_bytes(statement, idx, value, len));
  292. }
  293. void bindCollection(unsigned idx, const CassCollection *value)
  294. {
  295. if (query.length())
  296. traceBind(idx, "(collection)");
  297. check(cass_statement_bind_collection(statement, idx, value));
  298. }
  299. private:
  300. void traceBind(unsigned idx, const char *format, ...) __attribute__((format(printf,3,4)))
  301. {
  302. assert(query.length());
  303. StringBuffer bound;
  304. va_list args;
  305. va_start(args, format);
  306. bound.valist_appendf(format, args);
  307. va_end(args);
  308. const char *pos = query;
  309. do
  310. {
  311. pos = strchr(pos, '?');
  312. assert(pos);
  313. if (!pos)
  314. return;
  315. pos++;
  316. } while (idx--);
  317. query.insert(pos-query, bound);
  318. }
  319. CassandraStatement(const CassandraStatement &);
  320. StringBuffer query;
  321. CassStatement *statement;
  322. };
  323. class CASSANDRAEMBED_API CassandraRetryingFuture : public CInterface
  324. {
  325. public:
  326. CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries);
  327. ~CassandraRetryingFuture();
  328. inline operator CassFuture *() const
  329. {
  330. return future;
  331. }
  332. void wait(const char *why);
  333. private:
  334. bool retry(const char *why);
  335. void execute();
  336. static void signaller(CassFuture *future, void *data);
  337. CassandraRetryingFuture(const CassandraFuture &);
  338. CassFuture *future;
  339. CassSession *session;
  340. CassandraStatement statement;
  341. unsigned retries;
  342. Semaphore *limiter;
  343. };
  344. class CASSANDRAEMBED_API CassandraResult : public CInterfaceOf<IInterface>
  345. {
  346. public:
  347. inline CassandraResult(const CassResult *_result) : result(_result)
  348. {
  349. }
  350. inline ~CassandraResult()
  351. {
  352. if (result)
  353. cass_result_free(result);
  354. }
  355. inline operator const CassResult *() const
  356. {
  357. return result;
  358. }
  359. private:
  360. CassandraResult(const CassandraResult &);
  361. const CassResult *result;
  362. };
  363. class CASSANDRAEMBED_API CassandraIterator : public CInterfaceOf<IInterface>
  364. {
  365. public:
  366. inline CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  367. {
  368. }
  369. inline ~CassandraIterator()
  370. {
  371. if (iterator)
  372. cass_iterator_free(iterator);
  373. }
  374. inline void set(CassIterator *_iterator)
  375. {
  376. if (iterator)
  377. cass_iterator_free(iterator);
  378. iterator = _iterator;
  379. }
  380. inline operator CassIterator *() const
  381. {
  382. return iterator;
  383. }
  384. protected:
  385. CassandraIterator(const CassandraIterator &);
  386. CassIterator *iterator;
  387. };
  388. class CASSANDRAEMBED_API CassandraCollection : public CInterface
  389. {
  390. public:
  391. inline CassandraCollection(CassCollection *_collection) : collection(_collection)
  392. {
  393. }
  394. inline ~CassandraCollection()
  395. {
  396. if (collection)
  397. cass_collection_free(collection);
  398. }
  399. inline operator CassCollection *() const
  400. {
  401. return collection;
  402. }
  403. private:
  404. CassandraCollection(const CassandraCollection &);
  405. CassCollection *collection;
  406. };
  407. class CASSANDRAEMBED_API CassandraStatementInfo : public CInterface
  408. {
  409. public:
  410. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries);
  411. ~CassandraStatementInfo();
  412. void stop();
  413. bool next();
  414. void startStream();
  415. void endStream();
  416. void execute();
  417. inline size_t rowCount() const
  418. {
  419. return cass_result_row_count(*result);
  420. }
  421. inline bool hasResult() const
  422. {
  423. return result != NULL;
  424. }
  425. inline const CassRow *queryRow() const
  426. {
  427. assertex(iterator && *iterator);
  428. return cass_iterator_get_row(*iterator);
  429. }
  430. inline CassStatement *queryStatement() const
  431. {
  432. assertex(statement && *statement);
  433. return *statement;
  434. }
  435. protected:
  436. Linked<CassandraSession> session;
  437. Linked<CassandraPrepared> prepared;
  438. Owned<CassandraBatch> batch;
  439. Owned<CassandraStatement> statement;
  440. Owned<CassandraFutureResult> result;
  441. Owned<CassandraIterator> iterator;
  442. unsigned numBindings;
  443. CIArrayOf<CassandraRetryingFuture> futures;
  444. Semaphore *semaphore;
  445. unsigned maxRetries;
  446. bool inBatch;
  447. CassBatchType batchMode;
  448. };
  449. extern CASSANDRAEMBED_API bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value);
  450. extern CASSANDRAEMBED_API void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result);
  451. extern CASSANDRAEMBED_API __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  452. extern CASSANDRAEMBED_API unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  453. extern CASSANDRAEMBED_API double getRealResult(const RtlFieldInfo *field, const CassValue *value);
  454. extern CASSANDRAEMBED_API void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
  455. extern CASSANDRAEMBED_API void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
  456. extern CASSANDRAEMBED_API void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result);
  457. extern CASSANDRAEMBED_API void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result);
  458. } // namespace
  459. #endif