123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #ifndef _CASSANDRAEMBED_INCL
- #define _CASSANDRAEMBED_INCL
- #ifdef CASSANDRAEMBED_EXPORTS
- #define CASSANDRAEMBED_API DECL_EXPORT
- #else
- #define CASSANDRAEMBED_API DECL_IMPORT
- #endif
- namespace cassandraembed {
- __declspec(noreturn) extern void UNSUPPORTED(const char *feature) __attribute__((noreturn));
- __declspec(noreturn) extern void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
- __declspec(noreturn) extern void fail(const char *msg) __attribute__((noreturn));
- extern void check(CassError rc);
- extern bool isInteger(const CassValueType t);
- extern bool isString(CassValueType t);
- // Wrappers to Cassandra structures that require corresponding releases
- class CASSANDRAEMBED_API CassandraPrepared : public CInterfaceOf<IInterface>
- {
- public:
- inline CassandraPrepared(const CassPrepared *_prepared, const char *_queryString)
- : prepared(_prepared), queryString(_queryString)
- {
- }
- inline ~CassandraPrepared()
- {
- if (prepared)
- cass_prepared_free(prepared);
- }
- inline operator const CassPrepared *() const
- {
- return prepared;
- }
- inline const char *queryQueryString() const
- {
- return queryString;
- }
- private:
- CassandraPrepared(const CassandraPrepared &);
- StringAttr queryString;
- const CassPrepared *prepared;
- };
- class CASSANDRAEMBED_API CassandraSession : public CInterface
- {
- public:
- inline CassandraSession() : session(NULL) {}
- inline CassandraSession(CassSession *_session) : session(_session)
- {
- }
- inline ~CassandraSession()
- {
- set(NULL);
- }
- void set(CassSession *_session);
- inline operator CassSession *() const
- {
- return session;
- }
- private:
- CassandraSession(const CassandraSession &);
- CassSession *session;
- };
- class CassandraStatementInfo;
- class CassandraStatement;
- class CASSANDRAEMBED_API CassandraClusterSession : public CInterface
- {
- public:
- inline CassandraClusterSession(CassCluster *_cluster)
- : cluster(_cluster), semaphore(NULL), maxFutures(0), maxRetries(0)
- {
- }
- void setOptions(const StringArray &options);
- inline ~CassandraClusterSession()
- {
- session.clear(); // Should do this before freeing cluster
- if (cluster)
- cass_cluster_free(cluster);
- if (semaphore)
- delete semaphore;
- }
- inline CassSession *querySession() const
- {
- return *session;
- }
- inline const CassCluster *queryCluster() const
- {
- return cluster;
- }
- inline const char *queryKeySpace() const
- {
- return keyspace;
- }
- inline void setKeySpace(const char *val)
- {
- keyspace.set(val);
- }
- void connect();
- void disconnect();
- CassandraPrepared *prepareStatement(const char *query, bool trace) const;
- CassandraStatementInfo *createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const;
- void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const;
- private:
- void checkSetOption(CassError rc, const char *name);
- cass_bool_t getBoolOption(const char *val, const char *option);
- unsigned getUnsignedOption(const char *val, const char *option);
- unsigned getDoubleOption(const char *val, const char *option);
- __uint64 getUnsigned64Option(const char *val, const char *option);
- CassandraClusterSession(const CassandraClusterSession &);
- CassCluster *cluster;
- Owned<CassandraSession> session;
- mutable MapStringToMyClass<CassandraPrepared> preparedCache;
- mutable CriticalSection cacheCrit;
- Semaphore *semaphore;
- unsigned maxFutures;
- unsigned maxRetries;
- StringAttr keyspace;
- };
- class CASSANDRAEMBED_API CassandraFuture : public CInterface
- {
- public:
- inline CassandraFuture(CassFuture *_future) : future(_future)
- {
- }
- inline ~CassandraFuture()
- {
- if (future)
- cass_future_free(future);
- }
- inline operator CassFuture *() const
- {
- return future;
- }
- void wait(const char *why) const;
- inline void set(CassFuture *_future)
- {
- if (future)
- cass_future_free(future);
- future = _future;
- }
- protected:
- CassandraFuture(const CassandraFuture &);
- CassFuture *future;
- };
- class CASSANDRAEMBED_API CassandraFutureResult : public CassandraFuture
- {
- public:
- inline CassandraFutureResult(CassFuture *_future) : CassandraFuture(_future)
- {
- result = NULL;
- }
- inline ~CassandraFutureResult()
- {
- if (result)
- cass_result_free(result);
- }
- inline operator const CassResult *() const
- {
- if (!result)
- {
- wait("FutureResult");
- result = cass_future_get_result(future);
- }
- return result;
- }
- private:
- CassandraFutureResult(const CassandraFutureResult &);
- mutable const CassResult *result;
- };
- class CASSANDRAEMBED_API CassandraBatch : public CInterface
- {
- public:
- inline CassandraBatch(CassBatchType _type)
- {
- batch = cass_batch_new(_type);
- }
- inline ~CassandraBatch()
- {
- if (batch)
- cass_batch_free(batch);
- }
- inline operator CassBatch *() const
- {
- return batch;
- }
- void execute(CassSession *session, const char *what)
- {
- CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
- futureBatch.wait(what);
- }
- private:
- CassandraBatch(const CassandraBatch &);
- CassBatch *batch;
- };
- class CASSANDRAEMBED_API CassandraStatement : public CInterface
- {
- public:
- inline CassandraStatement(CassStatement *_statement) : statement(_statement)
- {
- }
- inline CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
- {
- }
- inline CassandraStatement(const CassandraPrepared *_prepared) : statement(cass_prepared_bind(*_prepared))
- {
- const char *queryString = _prepared->queryQueryString(); // Only set when tracing..
- if (queryString)
- query.set(queryString);
- _prepared->Release();
- }
- inline ~CassandraStatement()
- {
- if (statement)
- cass_statement_free(statement);
- }
- operator CassStatement *() const
- {
- if (query.length())
- DBGLOG("Executing %s", query.str());
- return statement;
- }
- inline CassStatement *getClear()
- {
- CassStatement *ret = statement;
- statement = NULL;
- return ret;
- }
- void bindNull(unsigned idx)
- {
- if (query.length())
- traceBind(idx, "null");
- check(cass_statement_bind_null(statement, idx));
- }
- void bindBool(unsigned idx, cass_bool_t value)
- {
- if (query.length())
- traceBind(idx, "%s", value ? "true" : "false");
- check(cass_statement_bind_bool(statement, idx, value));
- }
- void bindInt32(unsigned idx, __int32 value)
- {
- if (query.length())
- traceBind(idx, "%d", value);
- check(cass_statement_bind_int32(statement, idx, value));
- }
- void bindInt64(unsigned idx, __int64 value)
- {
- if (query.length())
- traceBind(idx, "%" I64F "d", value);
- check(cass_statement_bind_int64(statement, idx, value));
- }
- void bindString(unsigned idx, const char *value)
- {
- if (query.length())
- {
- unsigned l = strlen(value);
- if (l > 100)
- l = 100;
- traceBind(idx, "'%.*s'", l, value);
- }
- check(cass_statement_bind_string(statement, idx, value));
- }
- void bindString_n(unsigned idx, const char *value, unsigned len)
- {
- if (strlen(value)<len)
- {
- if (query.length())
- traceBind(idx, "'%s'", value);
- check(cass_statement_bind_string(statement, idx, value));
- }
- else
- {
- if (query.length())
- traceBind(idx, "'%.*s'", len>100?100:len, value);
- check(cass_statement_bind_string_n(statement, idx, value, len));
- }
- }
- void bindBytes(unsigned idx, const cass_byte_t *value, unsigned len)
- {
- if (query.length())
- traceBind(idx, "(bytes)");
- check(cass_statement_bind_bytes(statement, idx, value, len));
- }
- void bindCollection(unsigned idx, const CassCollection *value)
- {
- if (query.length())
- traceBind(idx, "(collection)");
- check(cass_statement_bind_collection(statement, idx, value));
- }
- private:
- void traceBind(unsigned idx, const char *format, ...) __attribute__((format(printf,3,4)))
- {
- assert(query.length());
- StringBuffer bound;
- va_list args;
- va_start(args, format);
- bound.valist_appendf(format, args);
- va_end(args);
- const char *pos = query;
- do
- {
- pos = strchr(pos, '?');
- assert(pos);
- if (!pos)
- return;
- pos++;
- } while (idx--);
- query.insert(pos-query, bound);
- }
- CassandraStatement(const CassandraStatement &);
- StringBuffer query;
- CassStatement *statement;
- };
- class CASSANDRAEMBED_API CassandraRetryingFuture : public CInterface
- {
- public:
- CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries);
- ~CassandraRetryingFuture();
- inline operator CassFuture *() const
- {
- return future;
- }
- void wait(const char *why);
- private:
- bool retry(const char *why);
- void execute();
- static void signaller(CassFuture *future, void *data);
- CassandraRetryingFuture(const CassandraFuture &);
- CassFuture *future;
- CassSession *session;
- CassandraStatement statement;
- unsigned retries;
- Semaphore *limiter;
- };
- class CASSANDRAEMBED_API CassandraResult : public CInterfaceOf<IInterface>
- {
- public:
- inline CassandraResult(const CassResult *_result) : result(_result)
- {
- }
- inline ~CassandraResult()
- {
- if (result)
- cass_result_free(result);
- }
- inline operator const CassResult *() const
- {
- return result;
- }
- private:
- CassandraResult(const CassandraResult &);
- const CassResult *result;
- };
- class CASSANDRAEMBED_API CassandraIterator : public CInterfaceOf<IInterface>
- {
- public:
- inline CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
- {
- }
- inline ~CassandraIterator()
- {
- if (iterator)
- cass_iterator_free(iterator);
- }
- inline void set(CassIterator *_iterator)
- {
- if (iterator)
- cass_iterator_free(iterator);
- iterator = _iterator;
- }
- inline operator CassIterator *() const
- {
- return iterator;
- }
- protected:
- CassandraIterator(const CassandraIterator &);
- CassIterator *iterator;
- };
- class CASSANDRAEMBED_API CassandraCollection : public CInterface
- {
- public:
- inline CassandraCollection(CassCollection *_collection) : collection(_collection)
- {
- }
- inline ~CassandraCollection()
- {
- if (collection)
- cass_collection_free(collection);
- }
- inline operator CassCollection *() const
- {
- return collection;
- }
- private:
- CassandraCollection(const CassandraCollection &);
- CassCollection *collection;
- };
- class CASSANDRAEMBED_API CassandraStatementInfo : public CInterface
- {
- public:
- CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries);
- ~CassandraStatementInfo();
- void stop();
- bool next();
- void startStream();
- void endStream();
- void execute();
- inline size_t rowCount() const
- {
- return cass_result_row_count(*result);
- }
- inline bool hasResult() const
- {
- return result != NULL;
- }
- inline const CassRow *queryRow() const
- {
- assertex(iterator && *iterator);
- return cass_iterator_get_row(*iterator);
- }
- inline CassStatement *queryStatement() const
- {
- assertex(statement && *statement);
- return *statement;
- }
- protected:
- Linked<CassandraSession> session;
- Linked<CassandraPrepared> prepared;
- Owned<CassandraBatch> batch;
- Owned<CassandraStatement> statement;
- Owned<CassandraFutureResult> result;
- Owned<CassandraIterator> iterator;
- unsigned numBindings;
- CIArrayOf<CassandraRetryingFuture> futures;
- Semaphore *semaphore;
- unsigned maxRetries;
- bool inBatch;
- CassBatchType batchMode;
- };
- extern CASSANDRAEMBED_API bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value);
- extern CASSANDRAEMBED_API void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result);
- extern CASSANDRAEMBED_API __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
- extern CASSANDRAEMBED_API unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
- extern CASSANDRAEMBED_API double getRealResult(const RtlFieldInfo *field, const CassValue *value);
- extern CASSANDRAEMBED_API void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
- extern CASSANDRAEMBED_API void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result);
- extern CASSANDRAEMBED_API void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result);
- extern CASSANDRAEMBED_API void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result);
- } // namespace
- #endif
|