cassandraembed.hpp 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _CASSANDRAEMBED_INCL
  14. #define _CASSANDRAEMBED_INCL
  15. namespace cassandraembed {
  16. extern void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  17. extern void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  18. extern void fail(const char *msg) __attribute__((noreturn));
  19. extern void check(CassError rc);
  20. extern bool isInteger(const CassValueType t);
  21. extern bool isString(CassValueType t);
  22. // Wrappers to Cassandra structures that require corresponding releases
  23. class CassandraCluster : public CInterface
  24. {
  25. public:
  26. inline CassandraCluster(CassCluster *_cluster)
  27. : cluster(_cluster), batchMode((CassBatchType) -1), pageSize(0), maxFutures(0), maxRetries(0)
  28. {
  29. }
  30. void setOptions(const StringArray &options);
  31. inline ~CassandraCluster()
  32. {
  33. if (cluster)
  34. cass_cluster_free(cluster);
  35. }
  36. inline operator CassCluster *() const
  37. {
  38. return cluster;
  39. }
  40. private:
  41. void checkSetOption(CassError rc, const char *name);
  42. cass_bool_t getBoolOption(const char *val, const char *option);
  43. unsigned getUnsignedOption(const char *val, const char *option);
  44. unsigned getDoubleOption(const char *val, const char *option);
  45. __uint64 getUnsigned64Option(const char *val, const char *option);
  46. CassandraCluster(const CassandraCluster &);
  47. CassCluster *cluster;
  48. public:
  49. // These are here as convenient to set from same options string. They are really properties of the session
  50. // or query rather than the cluster, but we have one session per cluster so we get away with it at the moment.
  51. CassBatchType batchMode;
  52. unsigned pageSize;
  53. unsigned maxFutures;
  54. unsigned maxRetries;
  55. StringAttr keyspace;
  56. };
  57. class CassandraFuture : public CInterface
  58. {
  59. public:
  60. inline CassandraFuture(CassFuture *_future) : future(_future)
  61. {
  62. }
  63. inline ~CassandraFuture()
  64. {
  65. if (future)
  66. cass_future_free(future);
  67. }
  68. inline operator CassFuture *() const
  69. {
  70. return future;
  71. }
  72. void wait(const char *why) const;
  73. inline void set(CassFuture *_future)
  74. {
  75. if (future)
  76. cass_future_free(future);
  77. future = _future;
  78. }
  79. protected:
  80. CassandraFuture(const CassandraFuture &);
  81. CassFuture *future;
  82. };
  83. class CassandraFutureResult : public CassandraFuture
  84. {
  85. public:
  86. inline CassandraFutureResult(CassFuture *_future) : CassandraFuture(_future)
  87. {
  88. result = NULL;
  89. }
  90. inline ~CassandraFutureResult()
  91. {
  92. if (result)
  93. cass_result_free(result);
  94. }
  95. inline operator const CassResult *() const
  96. {
  97. if (!result)
  98. {
  99. wait("FutureResult");
  100. result = cass_future_get_result(future);
  101. }
  102. return result;
  103. }
  104. private:
  105. CassandraFutureResult(const CassandraFutureResult &);
  106. mutable const CassResult *result;
  107. };
  108. class CassandraSession : public CInterface
  109. {
  110. public:
  111. inline CassandraSession() : session(NULL) {}
  112. inline CassandraSession(CassSession *_session) : session(_session)
  113. {
  114. }
  115. inline ~CassandraSession()
  116. {
  117. set(NULL);
  118. }
  119. void set(CassSession *_session);
  120. inline operator CassSession *() const
  121. {
  122. return session;
  123. }
  124. private:
  125. CassandraSession(const CassandraSession &);
  126. CassSession *session;
  127. };
  128. class CassandraBatch : public CInterface
  129. {
  130. public:
  131. inline CassandraBatch(CassBatch *_batch) : batch(_batch)
  132. {
  133. }
  134. inline ~CassandraBatch()
  135. {
  136. if (batch)
  137. cass_batch_free(batch);
  138. }
  139. inline operator CassBatch *() const
  140. {
  141. return batch;
  142. }
  143. private:
  144. CassandraBatch(const CassandraBatch &);
  145. CassBatch *batch;
  146. };
  147. class CassandraPrepared : public CInterfaceOf<IInterface>
  148. {
  149. public:
  150. inline CassandraPrepared(const CassPrepared *_prepared)
  151. : prepared(_prepared)
  152. {
  153. }
  154. inline ~CassandraPrepared()
  155. {
  156. if (prepared)
  157. cass_prepared_free(prepared);
  158. }
  159. inline operator const CassPrepared *() const
  160. {
  161. return prepared;
  162. }
  163. private:
  164. CassandraPrepared(const CassandraPrepared &);
  165. const CassPrepared *prepared;
  166. };
  167. class CassandraStatement : public CInterface
  168. {
  169. public:
  170. inline CassandraStatement(CassStatement *_statement) : statement(_statement)
  171. {
  172. }
  173. inline CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
  174. {
  175. }
  176. inline ~CassandraStatement()
  177. {
  178. if (statement)
  179. cass_statement_free(statement);
  180. }
  181. operator CassStatement *() const
  182. {
  183. return statement;
  184. }
  185. inline CassStatement *getClear()
  186. {
  187. CassStatement *ret = statement;
  188. statement = NULL;
  189. return ret;
  190. }
  191. private:
  192. CassandraStatement(const CassandraStatement &);
  193. CassStatement *statement;
  194. };
  195. class LinkedSemaphore : public CInterfaceOf<IInterface>, public Semaphore
  196. {
  197. public:
  198. LinkedSemaphore(unsigned initialCount) : Semaphore(initialCount) {}
  199. };
  200. class CassandraRetryingFuture : public CInterface
  201. {
  202. public:
  203. CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, LinkedSemaphore *_limiter = NULL, unsigned _retries = 10);
  204. ~CassandraRetryingFuture();
  205. inline operator CassFuture *() const
  206. {
  207. return future;
  208. }
  209. void wait(const char *why);
  210. private:
  211. bool retry(const char *why);
  212. void execute();
  213. static void signaller(CassFuture *future, void *data);
  214. CassandraRetryingFuture(const CassandraFuture &);
  215. CassFuture *future;
  216. CassSession *session;
  217. CassandraStatement statement;
  218. unsigned retries;
  219. LinkedSemaphore *limiter;
  220. };
  221. class CassandraResult : public CInterfaceOf<IInterface>
  222. {
  223. public:
  224. inline CassandraResult(const CassResult *_result) : result(_result)
  225. {
  226. }
  227. inline ~CassandraResult()
  228. {
  229. if (result)
  230. cass_result_free(result);
  231. }
  232. inline operator const CassResult *() const
  233. {
  234. return result;
  235. }
  236. private:
  237. CassandraResult(const CassandraResult &);
  238. const CassResult *result;
  239. };
  240. class CassandraIterator : public CInterfaceOf<IInterface>
  241. {
  242. public:
  243. inline CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  244. {
  245. }
  246. inline ~CassandraIterator()
  247. {
  248. if (iterator)
  249. cass_iterator_free(iterator);
  250. }
  251. inline void set(CassIterator *_iterator)
  252. {
  253. if (iterator)
  254. cass_iterator_free(iterator);
  255. iterator = _iterator;
  256. }
  257. inline operator CassIterator *() const
  258. {
  259. return iterator;
  260. }
  261. protected:
  262. CassandraIterator(const CassandraIterator &);
  263. CassIterator *iterator;
  264. };
  265. class CassandraCollection : public CInterface
  266. {
  267. public:
  268. inline CassandraCollection(CassCollection *_collection) : collection(_collection)
  269. {
  270. }
  271. inline ~CassandraCollection()
  272. {
  273. if (collection)
  274. cass_collection_free(collection);
  275. }
  276. inline operator CassCollection *() const
  277. {
  278. return collection;
  279. }
  280. private:
  281. CassandraCollection(const CassandraCollection &);
  282. CassCollection *collection;
  283. };
  284. class CassandraStatementInfo : public CInterface
  285. {
  286. public:
  287. IMPLEMENT_IINTERFACE;
  288. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, unsigned _maxFutures, unsigned _maxRetries);
  289. ~CassandraStatementInfo();
  290. void stop();
  291. bool next();
  292. void startStream();
  293. void endStream();
  294. void execute();
  295. inline size_t rowCount() const
  296. {
  297. return cass_result_row_count(*result);
  298. }
  299. inline bool hasResult() const
  300. {
  301. return result != NULL;
  302. }
  303. inline const CassRow *queryRow() const
  304. {
  305. assertex(iterator && *iterator);
  306. return cass_iterator_get_row(*iterator);
  307. }
  308. inline CassStatement *queryStatement() const
  309. {
  310. assertex(statement && *statement);
  311. return *statement;
  312. }
  313. protected:
  314. Linked<CassandraSession> session;
  315. Linked<CassandraPrepared> prepared;
  316. Owned<CassandraBatch> batch;
  317. Owned<CassandraStatement> statement;
  318. Owned<CassandraFutureResult> result;
  319. Owned<CassandraIterator> iterator;
  320. unsigned numBindings;
  321. CIArrayOf<CassandraRetryingFuture> futures;
  322. Owned<LinkedSemaphore> semaphore;
  323. unsigned maxFutures;
  324. unsigned maxRetries;
  325. bool inBatch;
  326. CassBatchType batchMode;
  327. };
  328. extern bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value);
  329. extern void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result);
  330. extern __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  331. extern unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  332. extern double getRealResult(const RtlFieldInfo *field, const CassValue *value);
  333. extern void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
  334. extern void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
  335. extern void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result);
  336. extern void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result);
  337. } // namespace
  338. #endif