cassandralogagent.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "LoggingErrors.hpp"
  14. #include "cassandralogagent.hpp"
  15. static const int defaultMaxTriesGTS = -1;
  16. static void setCassandraLogAgentOption(StringArray& opts, const char* opt, const char* val)
  17. {
  18. if (opt && *opt && val)
  19. {
  20. VStringBuffer optstr("%s=%s", opt, val);
  21. opts.append(optstr);
  22. }
  23. }
  24. static const CassValue* getSingleResult(const CassResult* result)
  25. {
  26. const CassRow* row = cass_result_first_row(result);
  27. return row ? cass_row_get_column(row, 0) : NULL;
  28. }
  29. bool CCassandraLogAgent::init(const char* name, const char* type, IPropertyTree* cfg, const char* process)
  30. {
  31. if (!name || !*name || !type || !*type)
  32. throw MakeStringException(-1, "Name or type not specified for CassandraLogAgent");
  33. if (!cfg)
  34. throw MakeStringException(-1, "Unable to find configuration for log agent %s:%s", name, type);
  35. agentName.set(name);
  36. const char* servicesConfig = cfg->queryProp("@services");
  37. if (isEmptyString(servicesConfig))
  38. throw MakeStringException(-1,"No Logging Service defined for %s", agentName.get());
  39. setServices(servicesConfig);
  40. IPropertyTree* cassandra = cfg->queryBranch("Cassandra");
  41. if(!cassandra)
  42. throw MakeStringException(-1, "Unable to find Cassandra settings for log agent %s:%s", name, type);
  43. readDBCfg(cassandra, dbServer, dbUserID, dbPassword);
  44. if (hasService(LGSTUpdateLOG))
  45. {
  46. //Read information about data mapping for every log groups
  47. readLogGroupCfg(cfg, defaultLogGroup, logGroups);
  48. if (defaultLogGroup.isEmpty())
  49. throw MakeStringException(-1,"LogGroup not defined");
  50. //Read mapping between log sources and log groups
  51. readLogSourceCfg(cfg, logSourceCount, logSourcePath, logSources);
  52. }
  53. //Read transactions settings
  54. if (hasService(LGSTGetTransactionSeed))
  55. {
  56. readTransactionCfg(cfg);
  57. maxTriesGTS = cfg->getPropInt("MaxTriesGTS", defaultMaxTriesGTS);
  58. }
  59. //Setup Cassandra
  60. initKeySpace();
  61. return true;
  62. }
  63. void CCassandraLogAgent::initKeySpace()
  64. {
  65. //Initialize Cassandra Cluster Session
  66. cassSession.setown(new CassandraClusterSession(cass_cluster_new()));
  67. if (!cassSession)
  68. throw MakeStringException(-1,"Unable to create cassandra cassSession session");
  69. setSessionOptions(NULL);
  70. //ensure defaultDB
  71. ensureDefaultKeySpace();
  72. //ensure transSeed tables
  73. ensureTransSeedTable();
  74. //Read logging transaction seed
  75. queryTransactionSeed(loggingTransactionApp.get(), loggingTransactionSeed);
  76. }
  77. void CCassandraLogAgent::ensureDefaultKeySpace()
  78. {
  79. CassandraSession s(cass_session_new());
  80. CassandraFuture future1(cass_session_connect(s, cassSession->queryCluster()));
  81. future1.wait("connect without keyspace");
  82. VStringBuffer st("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };",
  83. defaultDB.str());
  84. CassandraStatement statement(cass_statement_new(st.str(), 0));
  85. CassandraFuture future2(cass_session_execute(s, statement));
  86. future2.wait("execute");
  87. s.set(NULL);
  88. }
  89. void CCassandraLogAgent::ensureTransSeedTable()
  90. {
  91. //Create transaction seed table as needed
  92. StringBuffer transSeedTableKeys;
  93. StringArray transSeedTableColumnNames, transSeedTableColumnTypes;
  94. transSeedTableColumnNames.append("id");
  95. transSeedTableColumnTypes.append("int");
  96. transSeedTableColumnNames.append("application");
  97. transSeedTableColumnTypes.append("varchar");
  98. transSeedTableKeys.set("application"); //primary keys
  99. //The defaultDB has transactions table.
  100. setSessionOptions(defaultDB.str());
  101. cassSession->connect();
  102. createTable(defaultDB.str(), transactionTable.str(), transSeedTableColumnNames, transSeedTableColumnTypes, transSeedTableKeys.str());
  103. unsigned id = 0;
  104. VStringBuffer st("SELECT id FROM %s LIMIT 1;", transactionTable.str());
  105. if (!executeSimpleSelectStatement(st.str(), id))
  106. {
  107. st.setf("INSERT INTO %s (id, application) values ( 10000, '%s');",
  108. transactionTable.str(), loggingTransactionApp.get());
  109. executeSimpleStatement(st.str());
  110. if (!strieq(defaultTransactionApp.get(), loggingTransactionApp.get()))
  111. {
  112. st.setf("INSERT INTO %s (id, application) values ( 10000, '%s');",
  113. transactionTable.str(), defaultTransactionApp.get());
  114. executeSimpleStatement(st.str());
  115. }
  116. }
  117. cassSession->disconnect();
  118. }
  119. void CCassandraLogAgent::queryTransactionSeed(const char* appName, StringBuffer& seed)
  120. {
  121. CriticalBlock b(transactionSeedCrit);
  122. unsigned seedInt = 0;
  123. VStringBuffer st("SELECT id FROM %s WHERE application = '%s'", transactionTable.str(), appName);
  124. setSessionOptions(defaultDB.str()); //Switch to defaultDB since it may not be the current keyspace.
  125. cassSession->connect();
  126. executeSimpleSelectStatement(st.str(), seedInt);
  127. seed.setf("%d", seedInt);
  128. //update transactions for the next seed
  129. VStringBuffer updateQuery("UPDATE %s SET id=%d WHERE application = '%s'",
  130. transactionTable.str(), ++seedInt, appName);
  131. executeSimpleStatement(updateQuery.str());
  132. cassSession->disconnect();
  133. }
  134. void CCassandraLogAgent::setSessionOptions(const char *keyspace)
  135. {
  136. StringArray opts;
  137. setCassandraLogAgentOption(opts, "contact_points", dbServer.str());
  138. if (!dbUserID.isEmpty())
  139. {
  140. setCassandraLogAgentOption(opts, "user", dbUserID.str());
  141. if (!dbPassword.isEmpty())
  142. setCassandraLogAgentOption(opts, "password", dbPassword.str());
  143. }
  144. if (keyspace && *keyspace)
  145. setCassandraLogAgentOption(opts, "keyspace", keyspace);
  146. cassSession->setOptions(opts);
  147. }
  148. void CCassandraLogAgent::createTable(const char *dbName, const char *tableName, StringArray& columnNames, StringArray& columnTypes, const char* keys)
  149. {
  150. StringBuffer fields;
  151. ForEachItemIn(i, columnNames)
  152. fields.appendf("%s %s,", columnNames.item(i), columnTypes.item(i));
  153. VStringBuffer createTableSt("CREATE TABLE IF NOT EXISTS %s.%s (%s PRIMARY KEY (%s));", dbName, tableName, fields.str(), keys);
  154. executeSimpleStatement(createTableSt.str());
  155. }
  156. void CCassandraLogAgent::addField(CLogField& logField, const char* name, StringBuffer& value, StringBuffer& fields, StringBuffer& values)
  157. {
  158. const char* fieldType = logField.getType();
  159. if(strieq(fieldType, "int"))
  160. {
  161. appendFieldInfo(logField.getMapTo(), value, fields, values, false);
  162. return;
  163. }
  164. if(strieq(fieldType, "raw"))
  165. {
  166. appendFieldInfo(logField.getMapTo(), value, fields, values, true);;
  167. return;
  168. }
  169. if(strieq(fieldType, "varchar") || strieq(fieldType, "text"))
  170. {
  171. if(fields.length() != 0)
  172. fields.append(',');
  173. fields.append(logField.getMapTo());
  174. if(values.length() != 0)
  175. values.append(',');
  176. values.append('\'');
  177. const char* str = value.str();
  178. int length = value.length();
  179. for(int i = 0; i < length; i++)
  180. {
  181. unsigned char c = str[i];
  182. if(c == '\t' || c == '\n' || c== '\r')
  183. values.append(' ');
  184. else if(c == '\'')
  185. values.append('"');
  186. else if(c < 32 || c > 126)
  187. values.append('?');
  188. else
  189. values.append(c);
  190. }
  191. values.append('\'');
  192. return;
  193. }
  194. DBGLOG("Unknown format %s", fieldType);
  195. }
  196. void CCassandraLogAgent::setUpdateLogStatement(const char* dbName, const char* tableName,
  197. const char* fields, const char* values, StringBuffer& statement)
  198. {
  199. statement.setf("INSERT INTO %s.%s (%s, date_added) values (%s, toUnixTimestamp(now()));", dbName, tableName, fields, values);
  200. }
  201. void CCassandraLogAgent::executeSimpleStatement(const char* st)
  202. {
  203. CassandraStatement statement(cassSession->prepareStatement(st, getEspLogLevel()>LogNormal));
  204. CassandraFuture future(cass_session_execute(cassSession->querySession(), statement));
  205. future.wait("execute");
  206. }
  207. void CCassandraLogAgent::executeUpdateLogStatement(StringBuffer& st)
  208. {
  209. cassSession->connect();
  210. CassandraFuture futurePrep(cass_session_prepare_n(cassSession->querySession(), st.str(), st.length()));
  211. futurePrep.wait("prepare statement");
  212. Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(futurePrep), NULL);
  213. CassandraStatement statement(prepared.getClear());
  214. CassandraFuture future(cass_session_execute(cassSession->querySession(), statement));
  215. future.wait("execute");
  216. cassSession->disconnect();
  217. }
  218. bool CCassandraLogAgent::executeSimpleSelectStatement(const char* st, unsigned& resultValue)
  219. {
  220. CassandraStatement statement(cassSession->prepareStatement(st, getEspLogLevel()>LogNormal));
  221. CassandraFuture future(cass_session_execute(cassSession->querySession(), statement));
  222. future.wait("execute");
  223. CassandraResult result(cass_future_get_result(future));
  224. if (cass_result_row_count(result) == 0)
  225. return false;
  226. resultValue = getUnsignedResult(NULL, getSingleResult(result));
  227. return true;
  228. }
  229. extern "C"
  230. {
  231. CASSANDRALOGAGENT_API IEspLogAgent* newLoggingAgent()
  232. {
  233. return new CCassandraLogAgent();
  234. }
  235. }
  236. #ifdef SET_LOGTABLE
  237. //Keep this for now just in case. We may remove after a few releases.
  238. void CCassandraLogAgent::ensureKeySpace()
  239. {
  240. CassandraSession s(cass_session_new());
  241. CassandraFuture future(cass_session_connect(s, cassSession->queryCluster()));
  242. future.wait("connect without keyspace");
  243. VStringBuffer createKeySpace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };",
  244. cassSession->queryKeySpace());
  245. executeSimpleStatement(createKeySpace.str());
  246. s.set(NULL);
  247. //prepare transSeedTable
  248. StringBuffer transSeedTableKeys;
  249. StringArray transSeedTableColumnNames, transSeedTableColumnTypes;
  250. transSeedTableColumnNames.append("id");
  251. transSeedTableColumnTypes.append("int");
  252. transSeedTableColumnNames.append("application");
  253. transSeedTableColumnTypes.append("varchar");
  254. transSeedTableColumnNames.append("update_time");
  255. transSeedTableColumnTypes.append("timestamp");
  256. transSeedTableKeys.set("application");
  257. cassSession->connect();
  258. createTable("transactions", transSeedTableColumnNames, transSeedTableColumnTypes, transSeedTableKeys.str());
  259. //prepare log tables
  260. ForEachItemIn(i, logDBTables)
  261. {
  262. CDBTable& table = logDBTables.item(i);
  263. StringBuffer logTableKeys;
  264. StringArray logTableColumnNames, logTableColumnTypes;
  265. DBFieldMap* fieldMap = table.getFieldMap();
  266. StringArray& logTableColumnNameArray = fieldMap->getMapToNames();
  267. logTableColumnNames.append("log_id");
  268. logTableColumnTypes.append("varchar");
  269. ForEachItemIn(ii, logTableColumnNameArray)
  270. {
  271. logTableColumnNames.append(logTableColumnNameArray.item(ii));
  272. logTableColumnTypes.append(fieldMap->getMapToTypes().item(ii));
  273. }
  274. logTableColumnNames.append("date_added");
  275. logTableColumnTypes.append("timestamp");
  276. logTableKeys.set("log_id");
  277. createTable(table.getTableName(), logTableColumnNames, logTableColumnTypes, logTableKeys.str());
  278. }
  279. initTransSeedTable();
  280. cassSession->disconnect();
  281. }
  282. #endif