|
@@ -19,9 +19,7 @@
|
|
|
#include "esploggingservice_esp.ipp"
|
|
|
#include "cassandralogagent.hpp"
|
|
|
|
|
|
-const int DefaultMaxTriesGTS = -1;
|
|
|
-const char* const DefaultloggingTransactionApp = "CassandraloggingTransaction";
|
|
|
-const char* const DefaultTransactionApp = "DefauleTransaction";
|
|
|
+static const int defaultMaxTriesGTS = -1;
|
|
|
|
|
|
static void setCassandraLogAgentOption(StringArray& opts, const char* opt, const char* val)
|
|
|
{
|
|
@@ -38,54 +36,6 @@ static const CassValue* getSingleResult(const CassResult* result)
|
|
|
return row ? cass_row_get_column(row, 0) : NULL;
|
|
|
}
|
|
|
|
|
|
-void ensureInputString(const char* input, bool lowerCase, StringBuffer& inputStr, int code, const char* msg)
|
|
|
-{
|
|
|
- inputStr.set(input).trim();
|
|
|
- if (inputStr.isEmpty())
|
|
|
- throw MakeStringException(code, "%s", msg);
|
|
|
- if (lowerCase)
|
|
|
- inputStr.toLowerCase();
|
|
|
-}
|
|
|
-
|
|
|
-void CLogTable::loadMappings(IPropertyTree& fieldList)
|
|
|
-{
|
|
|
- StringBuffer name, mapTo, fieldType, defaultValue;
|
|
|
- Owned<IPropertyTreeIterator> itr = fieldList.getElements("Field");
|
|
|
- ForEach(*itr)
|
|
|
- {
|
|
|
- IPropertyTree &map = itr->query();
|
|
|
-
|
|
|
- ensureInputString(map.queryProp("@name"), false, name, -1, "Field @name required");
|
|
|
- ensureInputString(map.queryProp("@mapto"), true, mapTo, -1, "Field @mapto required");
|
|
|
- ensureInputString(map.queryProp("@type"), true, fieldType, -1, "Field @type required");
|
|
|
- defaultValue = map.queryProp("@default");
|
|
|
- defaultValue.trim();
|
|
|
-
|
|
|
- Owned<CLogField> field = new CLogField(name.str(), mapTo.str(), fieldType.str());
|
|
|
- if (!defaultValue.isEmpty())
|
|
|
- field->setDefault(defaultValue.str());
|
|
|
- logFields.append(*field.getClear());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-void CLogGroup::loadMappings(IPropertyTree& fieldList)
|
|
|
-{
|
|
|
- StringBuffer tableName;
|
|
|
- Owned<IPropertyTreeIterator> itr = fieldList.getElements("Fieldmap");
|
|
|
- ForEach(*itr)
|
|
|
- {
|
|
|
- ensureInputString(itr->query().queryProp("@table"), true, tableName, -1, "Fieldmap @table required");
|
|
|
-
|
|
|
- Owned<CLogTable> table = new CLogTable(tableName.str());
|
|
|
- table->loadMappings(itr->query());
|
|
|
- CIArrayOf<CLogField>& logFields = table->getLogFields();
|
|
|
- if (logFields.length() < 1)
|
|
|
- throw MakeStringException(-1,"No Fieldmap for %s", tableName.str());
|
|
|
-
|
|
|
- logTables.append(*table.getClear());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
bool CCassandraLogAgent::init(const char* name, const char* type, IPropertyTree* cfg, const char* process)
|
|
|
{
|
|
|
if (!name || !*name || !type || !*type)
|
|
@@ -98,53 +48,20 @@ bool CCassandraLogAgent::init(const char* name, const char* type, IPropertyTree*
|
|
|
if(!cassandra)
|
|
|
throw MakeStringException(-1, "Unable to find Cassandra settings for log agent %s:%s", name, type);
|
|
|
|
|
|
- agentName.set(name);
|
|
|
- ensureInputString(cassandra->queryProp("@server"), true, dbServer, -1, "Cassandra server required");
|
|
|
- ensureInputString(cassandra->queryProp("@dbName"), true, defaultDB, -1, "Database name required");
|
|
|
+ readDBCfg(cassandra, dbServer, dbUserID, dbPassword);
|
|
|
|
|
|
- const char* userID = cassandra->queryProp("@dbUser");
|
|
|
- if (userID && *userID)
|
|
|
- {
|
|
|
- const char* encodedPassword = cassandra->queryProp("@dbPassWord");
|
|
|
- if (!encodedPassword || !*encodedPassword)
|
|
|
- throw MakeStringException(-1, "Cassandra Database password required");
|
|
|
-
|
|
|
- decrypt(dbPassword, encodedPassword);
|
|
|
- dbUserID.set(userID);
|
|
|
- }
|
|
|
+ //Read information about data mapping for every log groups
|
|
|
+ readLogGroupCfg(cfg, defaultLogGroup, logGroups);
|
|
|
+ if (defaultLogGroup.isEmpty())
|
|
|
+ throw MakeStringException(-1,"LogGroup not defined");
|
|
|
|
|
|
- {
|
|
|
- //Read information about data mapping
|
|
|
- StringBuffer groupName;
|
|
|
- Owned<IPropertyTreeIterator> iter = cfg->getElements("LogGroup");
|
|
|
- ForEach(*iter)
|
|
|
- {
|
|
|
- ensureInputString(iter->query().queryProp("@name"), true, groupName, -1, "LogGroup @name required");
|
|
|
- Owned<CLogGroup> logGroup = new CLogGroup(groupName.str());
|
|
|
- logGroup->loadMappings(iter->query());
|
|
|
- logGroups.setValue(groupName.str(), logGroup);
|
|
|
- if (defaultLogGroup.isEmpty())
|
|
|
- defaultLogGroup.set(groupName.str());
|
|
|
- }
|
|
|
- }
|
|
|
+ //Read mapping between log sources and log groups
|
|
|
+ readLogSourceCfg(cfg, logSourceCount, logSourcePath, logSources);
|
|
|
|
|
|
- logSourceCount = 0;
|
|
|
- Owned<IPropertyTreeIterator> iter2 = cfg->getElements("LogSourceMap/LogSource");
|
|
|
- ForEach(*iter2)
|
|
|
- {
|
|
|
- StringBuffer name, groupName, dbName;
|
|
|
- ensureInputString(iter2->query().queryProp("@name"), false, name, -1, "LogSource @name required");
|
|
|
- ensureInputString(iter2->query().queryProp("@maptologgroup"), true, groupName, -1, "LogSource @maptologgroup required");
|
|
|
- ensureInputString(iter2->query().queryProp("@maptodb"), true, dbName, -1, "LogSource @maptodb required");
|
|
|
- Owned<CLogSource> logSource = new CLogSource(name.str(), groupName.str(), dbName.str());
|
|
|
- logSources.setValue(name.str(), logSource);
|
|
|
- logSourceCount++;
|
|
|
- }
|
|
|
+ //Read transactions settings
|
|
|
+ readTransactionCfg(cfg);
|
|
|
|
|
|
- maxTriesGTS = cfg->getPropInt("MaxTriesGTS", DefaultMaxTriesGTS);
|
|
|
- loggingTransactionApp.set(cfg->hasProp("loggingTransaction") ? cfg->queryProp("loggingTransaction") : DefaultloggingTransactionApp);
|
|
|
- defaultTransactionApp.set(cfg->hasProp("defaultTransaction") ? cfg->queryProp("defaultTransaction") : DefaultTransactionApp);
|
|
|
- loggingTransactionCount = 0;
|
|
|
+ maxTriesGTS = cfg->getPropInt("MaxTriesGTS", defaultMaxTriesGTS);
|
|
|
|
|
|
//Setup Cassandra
|
|
|
initKeySpace();
|
|
@@ -158,27 +75,19 @@ void CCassandraLogAgent::initKeySpace()
|
|
|
if (!cassSession)
|
|
|
throw MakeStringException(-1,"Unable to create cassandra cassSession session");
|
|
|
|
|
|
- StringArray opts;
|
|
|
- setCassandraLogAgentOption(opts, "contact_points", dbServer.str());
|
|
|
- if (!dbUserID.isEmpty())
|
|
|
- {
|
|
|
- setCassandraLogAgentOption(opts, "user", dbUserID.str());
|
|
|
- setCassandraLogAgentOption(opts, "password", dbPassword.str());
|
|
|
- }
|
|
|
-
|
|
|
- cassSession->setOptions(opts);
|
|
|
+ setSessionOptions(NULL);
|
|
|
|
|
|
- //prepare defaultDB
|
|
|
- ensureKeySpace();
|
|
|
+ //ensure defaultDB
|
|
|
+ ensureDefaultKeySpace();
|
|
|
|
|
|
- //prepare transSeed tables
|
|
|
- initTransSeedTable();
|
|
|
+ //ensure transSeed tables
|
|
|
+ ensureTransSeedTable();
|
|
|
|
|
|
//Read logging transaction seed
|
|
|
- queryTransactionSeed(loggingTransactionApp.get(), transactionSeed);
|
|
|
+ queryTransactionSeed(loggingTransactionApp.get(), loggingTransactionSeed);
|
|
|
}
|
|
|
|
|
|
-void CCassandraLogAgent::ensureKeySpace()
|
|
|
+void CCassandraLogAgent::ensureDefaultKeySpace()
|
|
|
{
|
|
|
CassandraSession s(cass_session_new());
|
|
|
CassandraFuture future1(cass_session_connect(s, cassSession->queryCluster()));
|
|
@@ -193,33 +102,36 @@ void CCassandraLogAgent::ensureKeySpace()
|
|
|
s.set(NULL);
|
|
|
}
|
|
|
|
|
|
-void CCassandraLogAgent::initTransSeedTable()
|
|
|
+void CCassandraLogAgent::ensureTransSeedTable()
|
|
|
{
|
|
|
//Create transaction seed table as needed
|
|
|
StringBuffer transSeedTableKeys;
|
|
|
StringArray transSeedTableColumnNames, transSeedTableColumnTypes;
|
|
|
transSeedTableColumnNames.append("id");
|
|
|
transSeedTableColumnTypes.append("int");
|
|
|
- transSeedTableColumnNames.append("agent_name");
|
|
|
- transSeedTableColumnTypes.append("varchar");
|
|
|
transSeedTableColumnNames.append("application");
|
|
|
transSeedTableColumnTypes.append("varchar");
|
|
|
- transSeedTableColumnNames.append("update_time");
|
|
|
- transSeedTableColumnTypes.append("timestamp");
|
|
|
- transSeedTableKeys.set("application, agent_name"); //primary keys
|
|
|
+ transSeedTableKeys.set("application"); //primary keys
|
|
|
|
|
|
- setKeySpace(defaultDB.str());
|
|
|
+ //The defaultDB has transactions table.
|
|
|
+ setSessionOptions(defaultDB.str());
|
|
|
cassSession->connect();
|
|
|
- createTable(defaultDB.str(), "transactions", transSeedTableColumnNames, transSeedTableColumnTypes, transSeedTableKeys.str());
|
|
|
+ createTable(defaultDB.str(), transactionTable.str(), transSeedTableColumnNames, transSeedTableColumnTypes, transSeedTableKeys.str());
|
|
|
|
|
|
- unsigned transactionCount = 0;
|
|
|
- if (executeSimpleSelectStatement("SELECT COUNT(*) FROM transactions", transactionCount) == 0)
|
|
|
+ unsigned id = 0;
|
|
|
+ VStringBuffer st("SELECT id FROM %s LIMIT 1;", transactionTable.str());
|
|
|
+ if (!executeSimpleSelectStatement(st.str(), id))
|
|
|
{
|
|
|
- VStringBuffer st("INSERT INTO transactions (id, agent_name, application, update_time) values ( 10000, '%s', '%s', toUnixTimestamp(now()));", agentName.get(), loggingTransactionApp.get());
|
|
|
+ st.setf("INSERT INTO %s (id, application) values ( 10000, '%s');",
|
|
|
+ transactionTable.str(), loggingTransactionApp.get());
|
|
|
executeSimpleStatement(st.str());
|
|
|
|
|
|
- st.setf("INSERT INTO transactions (id, agent_name, application, update_time) values ( 10000, '%s', '%s', toUnixTimestamp(now()));", agentName.get(), defaultTransactionApp.get());
|
|
|
- executeSimpleStatement(st.str());
|
|
|
+ if (!strieq(defaultTransactionApp.get(), loggingTransactionApp.get()))
|
|
|
+ {
|
|
|
+ st.setf("INSERT INTO %s (id, application) values ( 10000, '%s');",
|
|
|
+ transactionTable.str(), defaultTransactionApp.get());
|
|
|
+ executeSimpleStatement(st.str());
|
|
|
+ }
|
|
|
}
|
|
|
cassSession->disconnect();
|
|
|
}
|
|
@@ -229,209 +141,42 @@ void CCassandraLogAgent::queryTransactionSeed(const char* appName, StringBuffer&
|
|
|
CriticalBlock b(transactionSeedCrit);
|
|
|
|
|
|
unsigned seedInt = 0;
|
|
|
- VStringBuffer st("SELECT id FROM transactions WHERE agent_name ='%s' AND application = '%s'", agentName.get(), appName);
|
|
|
- setKeySpace(defaultDB.str()); //Switch to defaultDB since it may not be the current keyspace.
|
|
|
+ VStringBuffer st("SELECT id FROM %s WHERE application = '%s'", transactionTable.str(), appName);
|
|
|
+ setSessionOptions(defaultDB.str()); //Switch to defaultDB since it may not be the current keyspace.
|
|
|
cassSession->connect();
|
|
|
executeSimpleSelectStatement(st.str(), seedInt);
|
|
|
seed.setf("%d", seedInt);
|
|
|
|
|
|
//update transactions for the next seed
|
|
|
- VStringBuffer updateQuery("UPDATE transactions SET id=%d WHERE agent_name ='%s' AND application = '%s'", ++seedInt, agentName.get(), appName);
|
|
|
+ VStringBuffer updateQuery("UPDATE %s SET id=%d WHERE application = '%s'",
|
|
|
+ transactionTable.str(), ++seedInt, appName);
|
|
|
executeSimpleStatement(updateQuery.str());
|
|
|
cassSession->disconnect();
|
|
|
}
|
|
|
|
|
|
-bool CCassandraLogAgent::getTransactionSeed(IEspGetTransactionSeedRequest& req, IEspGetTransactionSeedResponse& resp)
|
|
|
+void CCassandraLogAgent::setSessionOptions(const char *keyspace)
|
|
|
{
|
|
|
- unsigned retry = 1;
|
|
|
- while (1)
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- const char* appName = req.getApplication();
|
|
|
- if (!appName || !*appName)
|
|
|
- appName = defaultTransactionApp.get();
|
|
|
-
|
|
|
- StringBuffer logSeed;
|
|
|
- queryTransactionSeed(appName, logSeed);
|
|
|
-
|
|
|
- if (!logSeed.length())
|
|
|
- throw MakeStringException(EspLoggingErrors::GetTransactionSeedFailed, "Failed to get TransactionSeed");
|
|
|
-
|
|
|
- resp.setSeedId(logSeed.str());
|
|
|
- resp.setStatusCode(0);
|
|
|
- return true;
|
|
|
- }
|
|
|
- catch (IException* e)
|
|
|
- {
|
|
|
- StringBuffer errorStr, errorMessage;
|
|
|
- errorMessage.append("Failed to get TransactionSeed: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
|
|
|
- ERRLOG("%s -- try %d", errorMessage.str(), retry);
|
|
|
- e->Release();
|
|
|
- if (retry < maxTriesGTS)
|
|
|
- {
|
|
|
- Sleep(retry*3000);
|
|
|
- retry++;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- resp.setStatusCode(-1);
|
|
|
- resp.setStatusMessage(errorMessage.str());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
-}
|
|
|
-
|
|
|
-bool CCassandraLogAgent::updateLog(IEspUpdateLogRequestWrap& req, IEspUpdateLogResponse& resp)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- StringBuffer requestBuf = req.getUpdateLogRequest();
|
|
|
- if (requestBuf.isEmpty())
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Failed to read log request.");
|
|
|
-
|
|
|
- StringBuffer logDB, logSource;
|
|
|
- requestBuf.insert(0, "<LogRequest>");
|
|
|
- requestBuf.append("</LogRequest>");
|
|
|
- Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(requestBuf.length(), requestBuf.str());
|
|
|
- if (!logRequestTree)
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Failed to read log request.");
|
|
|
-
|
|
|
- CLogGroup* logGroup = checkLogSource(logRequestTree, logSource, logDB);
|
|
|
- if (!logGroup)
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Log Group %s undefined.", logSource.str());
|
|
|
-
|
|
|
- StringBuffer logID;
|
|
|
- getLoggingTransactionID(logID);
|
|
|
-
|
|
|
- CIArrayOf<CLogTable>& logTables = logGroup->getLogTables();
|
|
|
- setKeySpace(logDB.str());
|
|
|
- ForEachItemIn(i, logTables)
|
|
|
- {
|
|
|
- CLogTable& table = logTables.item(i);
|
|
|
-
|
|
|
- StringBuffer cqlStatement;
|
|
|
- if(!buildUpdateLogStatement(logRequestTree, logDB.str(), table, logID, cqlStatement))
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Failed in creating SQL statement.");
|
|
|
-
|
|
|
- if (getEspLogLevel() >= LogMax)
|
|
|
- DBGLOG("CQL: %s\n", cqlStatement.str());
|
|
|
-
|
|
|
- cassSession->connect();
|
|
|
- executeSimpleStatement(cqlStatement);
|
|
|
- cassSession->disconnect();
|
|
|
- }
|
|
|
- resp.setStatusCode(0);
|
|
|
- return true;
|
|
|
- }
|
|
|
- catch (IException* e)
|
|
|
+ StringArray opts;
|
|
|
+ setCassandraLogAgentOption(opts, "contact_points", dbServer.str());
|
|
|
+ if (!dbUserID.isEmpty())
|
|
|
{
|
|
|
- StringBuffer errorStr, errorMessage;
|
|
|
- errorMessage.append("Failed to update log: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
|
|
|
- ERRLOG("%s", errorMessage.str());
|
|
|
- e->Release();
|
|
|
- resp.setStatusCode(-1);
|
|
|
- resp.setStatusMessage(errorMessage.str());
|
|
|
- }
|
|
|
- return false;
|
|
|
-}
|
|
|
-
|
|
|
-CLogGroup* CCassandraLogAgent::checkLogSource(IPropertyTree* logRequest, StringBuffer& source, StringBuffer& logDB)
|
|
|
-{
|
|
|
- if (logSourceCount == 0)
|
|
|
- {//if no log source is configured, use default Log Group and DB
|
|
|
- logDB.set(defaultDB.str());
|
|
|
- return logGroups.getValue(defaultLogGroup.get());
|
|
|
+ setCassandraLogAgentOption(opts, "user", dbUserID.str());
|
|
|
+ if (!dbPassword.isEmpty())
|
|
|
+ setCassandraLogAgentOption(opts, "password", dbPassword.str());
|
|
|
}
|
|
|
- source = logRequest->queryProp("Source");
|
|
|
- if (source.isEmpty())
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Failed to read log Source from request.");
|
|
|
- CLogSource* logSource = logSources.getValue(source.str());
|
|
|
- if (!logSource)
|
|
|
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Log Source %s undefined.", source.str());
|
|
|
-
|
|
|
- logDB.set(logSource->getDBName());
|
|
|
- return logGroups.getValue(logSource->getGroupName());
|
|
|
-}
|
|
|
-
|
|
|
-void CCassandraLogAgent::getLoggingTransactionID(StringBuffer& id)
|
|
|
-{
|
|
|
- CriticalBlock b(uniqueIDCrit);
|
|
|
- id.set(transactionSeed.str()).append("-").append(++loggingTransactionCount);
|
|
|
+ if (keyspace && *keyspace)
|
|
|
+ setCassandraLogAgentOption(opts, "keyspace", keyspace);
|
|
|
+ cassSession->setOptions(opts);
|
|
|
}
|
|
|
|
|
|
-bool CCassandraLogAgent::buildUpdateLogStatement(IPropertyTree* logRequest, const char* logDB, CLogTable& table, StringBuffer& logID, StringBuffer& cqlStatement)
|
|
|
+void CCassandraLogAgent::createTable(const char *dbName, const char *tableName, StringArray& columnNames, StringArray& columnTypes, const char* keys)
|
|
|
{
|
|
|
- StringBuffer fields, values;
|
|
|
- BoolHash handledFields;
|
|
|
- CIArrayOf<CLogField>& logFields = table.getLogFields();
|
|
|
- ForEachItemIn(i, logFields) //Go through data items to be logged
|
|
|
- {
|
|
|
- CLogField& logField = logFields.item(i);
|
|
|
-
|
|
|
- StringBuffer colName = logField.getMapTo();
|
|
|
- bool* found = handledFields.getValue(colName.str());
|
|
|
- if (found && *found)
|
|
|
- continue;
|
|
|
-
|
|
|
- StringBuffer path = logField.getName();
|
|
|
- if (path.charAt(path.length() - 1) == ']')
|
|
|
- {//Attr filter. Separate the last [] from the path.
|
|
|
- const char* pTr = path.str();
|
|
|
- const char* ppTr = strrchr(pTr, '[');
|
|
|
- if (!ppTr)
|
|
|
- continue;
|
|
|
-
|
|
|
- StringBuffer attr;
|
|
|
- attr.set(ppTr+1);
|
|
|
- attr.setLength(attr.length() - 1);
|
|
|
- path.setLength(ppTr - pTr);
|
|
|
-
|
|
|
- StringBuffer colValue;
|
|
|
- Owned<IPropertyTreeIterator> itr = logRequest->getElements(path.str());
|
|
|
- ForEach(*itr)
|
|
|
- {//Log the first valid match just in case more than one matches.
|
|
|
- IPropertyTree& ppTree = itr->query();
|
|
|
- colValue.set(ppTree.queryProp(attr.str()));
|
|
|
- if (colValue.length())
|
|
|
- {
|
|
|
- addField(logField, colName.str(), colValue, fields, values);
|
|
|
- handledFields.setValue(colName.str(), true);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- Owned<IPropertyTreeIterator> itr = logRequest->getElements(path.str());
|
|
|
- ForEach(*itr)
|
|
|
- {
|
|
|
- IPropertyTree& ppTree = itr->query();
|
|
|
-
|
|
|
- StringBuffer colValue;
|
|
|
- if (ppTree.hasChildren()) //This is a tree branch.
|
|
|
- toXML(&ppTree, colValue);
|
|
|
- else
|
|
|
- ppTree.getProp(NULL, colValue);
|
|
|
-
|
|
|
- if (colValue.length())
|
|
|
- {
|
|
|
- addField(logField, colName.str(), colValue, fields, values);
|
|
|
- handledFields.setValue(colName.str(), true);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //add any default fields that may be required but not in request.
|
|
|
- addMissingFields(logFields, handledFields, fields, values);
|
|
|
- appendFieldInfo("log_id", logID, fields, values, true);
|
|
|
+ StringBuffer fields;
|
|
|
+ ForEachItemIn(i, columnNames)
|
|
|
+ fields.appendf("%s %s,", columnNames.item(i), columnTypes.item(i));
|
|
|
|
|
|
- cqlStatement.setf("INSERT INTO %s.%s (%s, date_added) values (%s, toUnixTimestamp(now()));",
|
|
|
- logDB, table.getTableName(), fields.str(), values.str());
|
|
|
- return true;
|
|
|
+ VStringBuffer createTableSt("CREATE TABLE IF NOT EXISTS %s.%s (%s PRIMARY KEY (%s));", dbName, tableName, fields.str(), keys);
|
|
|
+ executeSimpleStatement(createTableSt.str());
|
|
|
}
|
|
|
|
|
|
void CCassandraLogAgent::addField(CLogField& logField, const char* name, StringBuffer& value, StringBuffer& fields, StringBuffer& values)
|
|
@@ -466,6 +211,8 @@ void CCassandraLogAgent::addField(CLogField& logField, const char* name, StringB
|
|
|
unsigned char c = str[i];
|
|
|
if(c == '\t' || c == '\n' || c== '\r')
|
|
|
values.append(' ');
|
|
|
+ else if(c == '\'')
|
|
|
+ values.append('"');
|
|
|
else if(c < 32 || c > 126)
|
|
|
values.append('?');
|
|
|
else
|
|
@@ -478,55 +225,10 @@ void CCassandraLogAgent::addField(CLogField& logField, const char* name, StringB
|
|
|
DBGLOG("Unknown format %s", fieldType);
|
|
|
}
|
|
|
|
|
|
-void CCassandraLogAgent::appendFieldInfo(const char* field, StringBuffer& value, StringBuffer& fields, StringBuffer& values, bool quoted)
|
|
|
-{
|
|
|
- if(values.length() != 0)
|
|
|
- values.append(',');
|
|
|
- if (quoted)
|
|
|
- values.append('\'').append(value.length(), value.str()).append('\'');
|
|
|
- else
|
|
|
- values.append(value.length(), value.str());
|
|
|
-
|
|
|
- if(fields.length() != 0)
|
|
|
- fields.append(',');
|
|
|
- fields.append(field);
|
|
|
-}
|
|
|
-
|
|
|
-void CCassandraLogAgent::addMissingFields(CIArrayOf<CLogField>& logFields, BoolHash& handledFields, StringBuffer& fields, StringBuffer& values)
|
|
|
-{
|
|
|
- ForEachItemIn(i, logFields) //Go through data items to be logged
|
|
|
- {
|
|
|
- CLogField& logField = logFields.item(i);
|
|
|
- const char* colName = logField.getMapTo();
|
|
|
- bool* found = handledFields.getValue(colName);
|
|
|
- if (found && *found)
|
|
|
- continue;
|
|
|
- StringBuffer value = logField.getDefault();
|
|
|
- if (!value.isEmpty())
|
|
|
- addField(logField, colName, value, fields, values);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-void CCassandraLogAgent::filterLogContent(IEspUpdateLogRequestWrap* req)
|
|
|
-{
|
|
|
- return; //No filter in CCassandraLogAgent for now
|
|
|
-}
|
|
|
-
|
|
|
-void CCassandraLogAgent::setKeySpace(const char *keyspace)
|
|
|
-{
|
|
|
- StringArray opts;
|
|
|
- setCassandraLogAgentOption(opts, "keyspace", keyspace);
|
|
|
- cassSession->setOptions(opts);
|
|
|
-}
|
|
|
-
|
|
|
-void CCassandraLogAgent::createTable(const char *dbName, const char *tableName, StringArray& columnNames, StringArray& columnTypes, const char* keys)
|
|
|
+void CCassandraLogAgent::setUpdateLogStatement(const char* dbName, const char* tableName,
|
|
|
+ const char* fields, const char* values, StringBuffer& statement)
|
|
|
{
|
|
|
- StringBuffer fields;
|
|
|
- ForEachItemIn(i, columnNames)
|
|
|
- fields.appendf("%s %s,", columnNames.item(i), columnTypes.item(i));
|
|
|
-
|
|
|
- VStringBuffer createTableSt("CREATE TABLE IF NOT EXISTS %s.%s (%s PRIMARY KEY (%s));", dbName, tableName, fields.str(), keys);
|
|
|
- executeSimpleStatement(createTableSt.str());
|
|
|
+ statement.setf("INSERT INTO %s.%s (%s, date_added) values (%s, toUnixTimestamp(now()));", dbName, tableName, fields, values);
|
|
|
}
|
|
|
|
|
|
void CCassandraLogAgent::executeSimpleStatement(const char* st)
|
|
@@ -536,8 +238,9 @@ void CCassandraLogAgent::executeSimpleStatement(const char* st)
|
|
|
future.wait("execute");
|
|
|
}
|
|
|
|
|
|
-void CCassandraLogAgent::executeSimpleStatement(StringBuffer& st)
|
|
|
+void CCassandraLogAgent::executeUpdateLogStatement(StringBuffer& st)
|
|
|
{
|
|
|
+ cassSession->connect();
|
|
|
CassandraFuture futurePrep(cass_session_prepare_n(cassSession->querySession(), st.str(), st.length()));
|
|
|
futurePrep.wait("prepare statement");
|
|
|
|
|
@@ -545,16 +248,20 @@ void CCassandraLogAgent::executeSimpleStatement(StringBuffer& st)
|
|
|
CassandraStatement statement(prepared.getClear());
|
|
|
CassandraFuture future(cass_session_execute(cassSession->querySession(), statement));
|
|
|
future.wait("execute");
|
|
|
+ cassSession->disconnect();
|
|
|
}
|
|
|
|
|
|
-unsigned CCassandraLogAgent::executeSimpleSelectStatement(const char* st, unsigned& resultValue)
|
|
|
+bool CCassandraLogAgent::executeSimpleSelectStatement(const char* st, unsigned& resultValue)
|
|
|
{
|
|
|
CassandraStatement statement(cassSession->prepareStatement(st, getEspLogLevel()>LogNormal));
|
|
|
CassandraFuture future(cass_session_execute(cassSession->querySession(), statement));
|
|
|
future.wait("execute");
|
|
|
CassandraResult result(cass_future_get_result(future));
|
|
|
+ if (cass_result_row_count(result) == 0)
|
|
|
+ return false;
|
|
|
+
|
|
|
resultValue = getUnsignedResult(NULL, getSingleResult(result));
|
|
|
- return resultValue;
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
extern "C"
|