|
@@ -818,8 +818,8 @@ protected:
|
|
|
class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
|
|
|
{
|
|
|
public:
|
|
|
- CassandraRecordBinder(const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
|
|
|
- : typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
|
|
|
+ CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
|
|
|
+ : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
|
|
|
{
|
|
|
}
|
|
|
int numFields()
|
|
@@ -986,7 +986,8 @@ public:
|
|
|
protected:
|
|
|
inline unsigned checkNextParam(const RtlFieldInfo * field)
|
|
|
{
|
|
|
- DBGLOG("Binding %s to %d", field->name->str(), thisParam);
|
|
|
+ if (logctx.queryTraceLevel() > 4)
|
|
|
+ logctx.CTXLOG("Binding %s to %d", field->name->str(), thisParam);
|
|
|
return thisParam++;
|
|
|
}
|
|
|
inline void checkBind(CassError rc, const RtlFieldInfo * field)
|
|
@@ -999,6 +1000,7 @@ protected:
|
|
|
const RtlTypeInfo *typeInfo;
|
|
|
const CassandraStatementInfo *stmtInfo;
|
|
|
Owned<CassandraCollection> collection;
|
|
|
+ const IContextLogger &logctx;
|
|
|
int firstParam;
|
|
|
RtlFieldStrInfo dummyField;
|
|
|
int thisParam;
|
|
@@ -1008,8 +1010,8 @@ protected:
|
|
|
class CassandraDatasetBinder : public CassandraRecordBinder
|
|
|
{
|
|
|
public:
|
|
|
- CassandraDatasetBinder(IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
|
|
|
- : input(_input), CassandraRecordBinder(_typeInfo, _stmt, _firstParam)
|
|
|
+ CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
|
|
|
+ : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
|
|
|
{
|
|
|
}
|
|
|
bool bindNext()
|
|
@@ -1106,17 +1108,18 @@ protected:
|
|
|
|
|
|
static void cassandraLogCallback(cass_uint64_t time, CassLogLevel severity, CassString message, void* data)
|
|
|
{
|
|
|
- DBGLOG("cassandra: %s: %.*s", cass_log_level_string(severity), (int) message.length, message.data);
|
|
|
+ const IContextLogger *logctx = (const IContextLogger *) data;
|
|
|
+ logctx->CTXLOG("cassandra: %s: %.*s", cass_log_level_string(severity), (int) message.length, message.data);
|
|
|
}
|
|
|
|
|
|
class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
|
|
|
{
|
|
|
public:
|
|
|
- CassandraEmbedFunctionContext(unsigned _flags, const char *options)
|
|
|
- : flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
|
|
|
+ CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
|
|
|
+ : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
|
|
|
{
|
|
|
cluster.setown(new CassandraCluster(cass_cluster_new()));
|
|
|
- cass_cluster_set_log_callback(*cluster, cassandraLogCallback, NULL);
|
|
|
+ cass_cluster_set_log_callback(*cluster, cassandraLogCallback, (void *) &logctx);
|
|
|
const char *contact_points = "localhost";
|
|
|
const char *user = "";
|
|
|
const char *password = "";
|
|
@@ -1413,7 +1416,7 @@ public:
|
|
|
}
|
|
|
virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
|
|
|
{
|
|
|
- CassandraRecordBinder binder(metaVal.queryTypeInfo(), stmtInfo, nextParam);
|
|
|
+ CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
|
|
|
binder.processRow(val);
|
|
|
nextParam += binder.numFields();
|
|
|
}
|
|
@@ -1425,7 +1428,7 @@ public:
|
|
|
{
|
|
|
fail("At most one dataset parameter supported");
|
|
|
}
|
|
|
- inputStream.setown(new CassandraDatasetBinder(LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
|
|
|
+ inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
|
|
|
nextParam += inputStream->numFields();
|
|
|
}
|
|
|
|
|
@@ -1796,6 +1799,7 @@ protected:
|
|
|
Owned<CassandraSession> session;
|
|
|
Owned<CassandraStatementInfo> stmtInfo;
|
|
|
Owned<CassandraDatasetBinder> inputStream;
|
|
|
+ const IContextLogger &logctx;
|
|
|
unsigned flags;
|
|
|
unsigned nextParam;
|
|
|
unsigned numParams;
|
|
@@ -1809,10 +1813,14 @@ class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
|
|
|
public:
|
|
|
virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
|
|
|
{
|
|
|
+ return createFunctionContextEx(NULL, flags, options);
|
|
|
+ }
|
|
|
+ virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
|
|
|
+ {
|
|
|
if (flags & EFimport)
|
|
|
UNSUPPORTED("IMPORT");
|
|
|
else
|
|
|
- return new CassandraEmbedFunctionContext(flags, options);
|
|
|
+ return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
|
|
|
}
|
|
|
};
|
|
|
|