ElasticStackLogAccess.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ElasticStackLogAccess.hpp"
  14. #include "platform.h"
  15. #include <string>
  16. #include <vector>
  17. #include <iostream>
  18. #include <json/json.h>
  19. #ifdef _CONTAINERIZED
  20. //In containerized world, most likely Elastic Search host is their default k8s hostname
  21. static constexpr const char * DEFAULT_ES_HOST = "elasticsearch-master";
  22. #else
  23. //In baremetal, localhost is good guess as any
  24. static constexpr const char * DEFAULT_ES_HOST = "localhost";
  25. #endif
  26. static constexpr const char * DEFAULT_ES_PROTOCOL = "http";
  27. static constexpr const char * DEFAULT_ES_DOC_TYPE = "_doc";
  28. static constexpr const char * DEFAULT_ES_PORT = "9200";
  29. static constexpr int DEFAULT_ES_DOC_LIMIT = 100;
  30. static constexpr int DEFAULT_ES_DOC_START = 0;
  31. static constexpr const char * DEFAULT_TS_NAME = "@timestamp";
  32. static constexpr const char * DEFAULT_INDEX_PATTERN = "filebeat*";
  33. static constexpr const char * DEFAULT_HPCC_LOG_SEQ_COL = "hpcc.log.sequence";
  34. static constexpr const char * DEFAULT_HPCC_LOG_TIMESTAMP_COL = "hpcc.log.timestamp";
  35. static constexpr const char * DEFAULT_HPCC_LOG_PROCID_COL = "hpcc.log.procid";
  36. static constexpr const char * DEFAULT_HPCC_LOG_THREADID_COL = "hpcc.log.threadid";
  37. static constexpr const char * DEFAULT_HPCC_LOG_MESSAGE_COL = "hpcc.log.message";
  38. static constexpr const char * DEFAULT_HPCC_LOG_JOBID_COL = "hpcc.log.jobid";
  39. static constexpr const char * DEFAULT_HPCC_LOG_COMPONENT_COL = "kubernetes.container.name";
  40. static constexpr const char * DEFAULT_HPCC_LOG_TYPE_COL = "hpcc.log.class";
  41. static constexpr const char * DEFAULT_HPCC_LOG_AUD_COL = "hpcc.log.audience";
  42. static constexpr const char * LOGMAP_INDEXPATTERN_ATT = "@storename";
  43. static constexpr const char * LOGMAP_SEARCHCOL_ATT = "@searchcolumn";
  44. static constexpr const char * LOGMAP_TIMESTAMPCOL_ATT = "@timestampcolumn";
  45. ElasticStackLogAccess::ElasticStackLogAccess(const std::vector<std::string> &hostUrlList, IPropertyTree & logAccessPluginConfig) : m_esClient(hostUrlList)
  46. {
  47. if (!hostUrlList.at(0).empty())
  48. m_esConnectionStr.set(hostUrlList.at(0).c_str());
  49. m_pluginCfg.set(&logAccessPluginConfig);
  50. m_globalIndexTimestampField.set(DEFAULT_TS_NAME);
  51. m_globalIndexSearchPattern.set(DEFAULT_INDEX_PATTERN);
  52. m_globalSearchColName.set(DEFAULT_HPCC_LOG_MESSAGE_COL);
  53. m_classSearchColName.set(DEFAULT_HPCC_LOG_TYPE_COL);
  54. m_workunitSearchColName.set(DEFAULT_HPCC_LOG_JOBID_COL);
  55. m_componentsSearchColName.set(DEFAULT_HPCC_LOG_COMPONENT_COL);
  56. m_audienceSearchColName.set(DEFAULT_HPCC_LOG_AUD_COL);
  57. Owned<IPropertyTreeIterator> logMapIter = m_pluginCfg->getElements("logMaps");
  58. ForEach(*logMapIter)
  59. {
  60. IPropertyTree & logMap = logMapIter->query();
  61. const char * logMapType = logMap.queryProp("@type");
  62. if (streq(logMapType, "global"))
  63. {
  64. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  65. m_globalIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  66. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  67. m_globalSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  68. if (logMap.hasProp(LOGMAP_TIMESTAMPCOL_ATT))
  69. m_globalIndexTimestampField = logMap.queryProp(LOGMAP_TIMESTAMPCOL_ATT);
  70. }
  71. else if (streq(logMapType, "workunits"))
  72. {
  73. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  74. m_workunitIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  75. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  76. m_workunitSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  77. }
  78. else if (streq(logMapType, "components"))
  79. {
  80. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  81. m_componentsIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  82. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  83. m_componentsSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  84. }
  85. else if (streq(logMapType, "class"))
  86. {
  87. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  88. m_classIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  89. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  90. m_classSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  91. }
  92. else if (streq(logMapType, "audience"))
  93. {
  94. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  95. m_audienceIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  96. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  97. m_audienceSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  98. }
  99. }
  100. #ifdef LOGACCESSDEBUG
  101. StringBuffer out;
  102. const IPropertyTree * status = getESStatus();
  103. toXML(status, out);
  104. fprintf(stdout, "ES Status: %s", out.str());
  105. const IPropertyTree * is = getIndexSearchStatus(m_globalIndexSearchPattern);
  106. toXML(is, out);
  107. fprintf(stdout, "ES available indexes: %s", out.str());
  108. const IPropertyTree * ts = getTimestampTypeFormat(m_globalIndexSearchPattern, m_globalIndexTimestampField);
  109. toXML(ts, out);
  110. fprintf(stdout, "ES %s timestamp info: '%s'", m_globalIndexSearchPattern.str(), out.str());
  111. #endif
  112. }
  113. const IPropertyTree * ElasticStackLogAccess::performAndLogESRequest(Client::HTTPMethod httpmethod, const char * url, const char * reqbody, const char * logmessageprefix, LogMsgCategory reqloglevel = MCdebugProgress, LogMsgCategory resploglevel = MCdebugProgress)
  114. {
  115. try
  116. {
  117. LOG(reqloglevel,"ESLogAccess: Requesting '%s'... ", logmessageprefix );
  118. cpr::Response esREsponse = m_esClient.performRequest(httpmethod,url,reqbody);
  119. Owned<IPropertyTree> response = createPTreeFromJSONString(esREsponse.text.c_str());
  120. LOG(resploglevel,"ESLogAccess: '%s' response: '%s'", logmessageprefix, esREsponse.text.c_str());
  121. return response.getClear();
  122. }
  123. catch (ConnectionException & ce)//std::runtime_error
  124. {
  125. LOG(MCuserError, "ESLogAccess: Encountered error requesting '%s': '%s'", logmessageprefix, ce.what());
  126. }
  127. catch (...)
  128. {
  129. LOG(MCuserError, "ESLogAccess: Encountered error requesting '%s'", logmessageprefix);
  130. }
  131. return nullptr;
  132. }
  133. const IPropertyTree * ElasticStackLogAccess::getTimestampTypeFormat(const char * indexpattern, const char * fieldname)
  134. {
  135. if (isEmptyString(indexpattern))
  136. throw makeStringException(-1, "ElasticStackLogAccess::getTimestampTypeFormat: indexpattern must be provided");
  137. if (isEmptyString(fieldname))
  138. throw makeStringException(-1, "ElasticStackLogAccess::getTimestampTypeFormat: fieldname must be provided");
  139. VStringBuffer timestampformatreq("%s/_mapping/field/created_ts?include_type_name=true&format=JSON", indexpattern);
  140. return performAndLogESRequest(Client::HTTPMethod::GET, timestampformatreq.str(), "", "getTimestampTypeFormat");
  141. }
  142. const IPropertyTree * ElasticStackLogAccess::getIndexSearchStatus(const char * indexpattern)
  143. {
  144. if (!indexpattern || !*indexpattern)
  145. throw makeStringException(-1, "ElasticStackLogAccess::getIndexSearchStatus: indexpattern must be provided");
  146. VStringBuffer indexsearch("_cat/indices/%s?format=JSON", indexpattern);
  147. return performAndLogESRequest(Client::HTTPMethod::GET, indexsearch.str(), "", "List of available indexes");
  148. }
  149. const IPropertyTree * ElasticStackLogAccess::getESStatus()
  150. {
  151. return performAndLogESRequest(Client::HTTPMethod::GET, "_cluster/health", "", "Target cluster health");
  152. }
  153. /*
  154. * Transform ES query response to back-end agnostic response
  155. *
  156. */
  157. void processESJsonResp(const cpr::Response & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format)
  158. {
  159. if (retrievedDocument.status_code != 200)
  160. throw makeStringExceptionV(-1, "ElasticSearch request failed: %s", retrievedDocument.status_line.c_str());
  161. DBGLOG("Retrieved ES JSON DOC: %s", retrievedDocument.text.c_str());
  162. Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.text.c_str());
  163. if (!tree)
  164. throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
  165. if (tree->getPropBool("timed_out", 0))
  166. LOG(MCuserProgress,"ES Log Access: timeout reported");
  167. if (tree->getPropInt("_shards/failed",0) > 0)
  168. LOG(MCuserProgress,"ES Log Access: failed _shards reported");
  169. DBGLOG("ES Log Access: hit count: '%d'", tree->getPropInt("hits/total/value"));
  170. Owned<IPropertyTreeIterator> iter = tree->getElements("hits/hits/fields");
  171. switch (format)
  172. {
  173. case LOGACCESS_LOGFORMAT_xml:
  174. {
  175. returnbuf.append("<lines>");
  176. ForEach(*iter)
  177. {
  178. IPropertyTree & cur = iter->query();
  179. returnbuf.append("<line>");
  180. toXML(&cur,returnbuf);
  181. returnbuf.append("</line>");
  182. }
  183. returnbuf.append("</lines>");
  184. break;
  185. }
  186. case LOGACCESS_LOGFORMAT_json:
  187. {
  188. returnbuf.append("{\"lines\": [");
  189. StringBuffer hitchildjson;
  190. bool first = true;
  191. ForEach(*iter)
  192. {
  193. IPropertyTree & cur = iter->query();
  194. toJSON(&cur,hitchildjson.clear());
  195. if (!first)
  196. returnbuf.append(", ");
  197. first = false;
  198. returnbuf.appendf("{\"fields\": [ %s ]}", hitchildjson.str());
  199. }
  200. returnbuf.append("]}");
  201. break;
  202. }
  203. case LOGACCESS_LOGFORMAT_csv:
  204. {
  205. ForEach(*iter)
  206. {
  207. IPropertyTree & cur = iter->query();
  208. bool first = true;
  209. Owned<IPropertyTreeIterator> fieldelementsitr = cur.getElements("*");
  210. ForEach(*fieldelementsitr)
  211. {
  212. if (!first)
  213. returnbuf.append(", ");
  214. else
  215. first = false;
  216. returnbuf.append(fieldelementsitr->query().queryProp(".")); // commas in data should be escaped
  217. }
  218. returnbuf.append("\n");
  219. }
  220. break;
  221. }
  222. default:
  223. break;
  224. }
  225. }
  226. void esTimestampQueryRangeString(std::string & range, const char * timestampfield, std::time_t from, std::time_t to)
  227. {
  228. if (isEmptyString(timestampfield))
  229. throw makeStringException(-1, "ES Log Access: TimeStamp Field must be provided");
  230. //Elastic Search Date formats can be customized, but if no format is specified then it uses the default:
  231. //"strict_date_optional_time||epoch_millis"
  232. // "%Y-%m-%d"'T'"%H:%M:%S"
  233. //We'll report the timestamps as epoch_millis
  234. range = "\"range\": { \"";
  235. range += timestampfield;
  236. range += "\": {";
  237. range += "\"gte\": \"";
  238. range += std::to_string(from*1000);
  239. range += "\"";
  240. if (to != -1) //aka 'to' has been initialized
  241. {
  242. range += ",\"lte\": \"";
  243. range += std::to_string(to*1000);
  244. range += "\"";
  245. }
  246. range += "} }";
  247. }
  248. /*
  249. * Constructs ElasticSearch match clause
  250. * Use for exact term matches such as a price, a product ID, or a username.
  251. */
  252. void esTermQueryString(std::string & search, const char *searchval, const char *searchfield)
  253. {
  254. //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html
  255. //You can use the term query to find documents based on a precise value such as a price, a product ID, or a username.
  256. //Avoid using the term query for text fields.
  257. //By default, Elasticsearch changes the values of text fields as part of analysis. This can make finding exact matches for text field values difficult.
  258. if (isEmptyString(searchval) || isEmptyString(searchfield))
  259. throw makeStringException(-1, "Could not create ES term query string: Either search value or search field is empty");
  260. search += "\"term\": { \"";
  261. search += searchfield;
  262. search += "\" : { \"value\": \"";
  263. search += searchval;
  264. search += "\" } }";
  265. }
  266. /*
  267. * Constructs ElasticSearch match clause
  268. * Use for full-text search
  269. */
  270. void esMatchQueryString(std::string & search, const char *searchval, const char *searchfield)
  271. {
  272. //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html
  273. //Returns documents that match a provided text, number, date or boolean value. The provided text is analyzed before matching.
  274. //The match query is the standard query for performing a full-text search, including options for fuzzy matching.
  275. if (isEmptyString(searchval) || isEmptyString(searchfield))
  276. throw makeStringException(-1, "Could not create ES match query string: Either search value or search field is empty");
  277. search += "\"match\": { \"";
  278. search += searchfield;
  279. search += "\" : \"";
  280. search += searchval;
  281. search += "\" }";
  282. }
  283. /*
  284. * Construct Elasticsearch query directives string
  285. */
  286. void esSearchMetaData(std::string & search, const StringArray & selectcols, unsigned size = DEFAULT_ES_DOC_LIMIT, offset_t from = DEFAULT_ES_DOC_START)
  287. {
  288. //Query parameters:
  289. //https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-body.html
  290. //_source: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-source-filtering.html
  291. search += "\"_source\": false, \"fields\": [" ;
  292. if (selectcols.length() > 0)
  293. {
  294. StringBuffer sourcecols;
  295. ForEachItemIn(idx, selectcols)
  296. {
  297. sourcecols.appendf("\"%s\"", selectcols.item(idx));
  298. if (idx < selectcols.length() -1)
  299. sourcecols.append(",");
  300. }
  301. if (!sourcecols.isEmpty())
  302. search += sourcecols.str() ;
  303. else
  304. search += "\"*\"";
  305. //search += "!*.keyword"; //all fields ignoring the .keyword postfixed fields
  306. }
  307. else
  308. //search += "!*.keyword"; //all fields ignoring the .keyword postfixed fields
  309. search += "\"*\"";
  310. search += "],";
  311. search += "\"from\": ";
  312. search += std::to_string(from);
  313. search += ", \"size\": ";
  314. search += std::to_string(size);
  315. search += ", ";
  316. }
  317. /*
  318. * Construct ES query string, execute query
  319. */
  320. cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions & options)
  321. {
  322. try
  323. {
  324. StringBuffer queryValue;
  325. std::string queryField = m_globalSearchColName.str();
  326. std::string queryIndex = m_globalIndexSearchPattern.str();
  327. bool fullTextSearch = true;
  328. bool wildCardSearch = false;
  329. options.queryFilter()->toString(queryValue);
  330. switch (options.queryFilter()->filterType())
  331. {
  332. case LOGACCESS_FILTER_jobid:
  333. {
  334. if (!m_workunitSearchColName.isEmpty())
  335. {
  336. queryField = m_workunitSearchColName.str();
  337. fullTextSearch = false; //found dedicated components column
  338. }
  339. if (!m_workunitIndexSearchPattern.isEmpty())
  340. {
  341. queryIndex = m_workunitIndexSearchPattern.str();
  342. }
  343. DBGLOG("%s: Searching log entries by jobid: '%s'...", COMPONENT_NAME, queryValue.str() );
  344. break;
  345. }
  346. case LOGACCESS_FILTER_class:
  347. {
  348. if (!m_classSearchColName.isEmpty())
  349. {
  350. queryField = m_classSearchColName.str();
  351. fullTextSearch = false; //found dedicated components column
  352. }
  353. if (!m_classIndexSearchPattern.isEmpty())
  354. {
  355. queryIndex = m_classIndexSearchPattern.str();
  356. }
  357. DBGLOG("%s: Searching log entries by class: '%s'...", COMPONENT_NAME, queryValue.str() );
  358. break;
  359. }
  360. case LOGACCESS_FILTER_audience:
  361. {
  362. if (!m_audienceSearchColName.isEmpty())
  363. {
  364. queryField = m_audienceSearchColName.str();
  365. fullTextSearch = false; //found dedicated components column
  366. }
  367. if (!m_audienceIndexSearchPattern.isEmpty())
  368. {
  369. queryIndex = m_audienceIndexSearchPattern.str();
  370. }
  371. DBGLOG("%s: Searching log entries by target audience: '%s'...", COMPONENT_NAME, queryValue.str() );
  372. break;
  373. }
  374. case LOGACCESS_FILTER_component:
  375. {
  376. if (!m_componentsSearchColName.isEmpty())
  377. {
  378. queryField = m_componentsSearchColName.str();
  379. fullTextSearch = false; //found dedicated components column
  380. }
  381. if (!m_componentsIndexSearchPattern.isEmpty())
  382. {
  383. queryIndex = m_componentsIndexSearchPattern.str();
  384. }
  385. DBGLOG("%s: Searching '%s' component log entries...", COMPONENT_NAME, queryValue.str() );
  386. break;
  387. }
  388. case LOGACCESS_FILTER_wildcard:
  389. {
  390. wildCardSearch = true;
  391. DBGLOG("%s: Performing wildcard log entry search...", COMPONENT_NAME);
  392. break;
  393. }
  394. case LOGACCESS_FILTER_or:
  395. throw makeStringExceptionV(-1, "%s: Compound query criteria not currently supported: '%s'", COMPONENT_NAME, queryValue.str());
  396. //"query":{"bool":{"must":[{"match":{"kubernetes.container.name.keyword":{"query":"eclwatch","operator":"or"}}},{"match":{"container.image.name.keyword":"hpccsystems\\core"}}]} }
  397. case LOGACCESS_FILTER_and:
  398. throw makeStringExceptionV(-1, "%s: Compound query criteria not currently supported: '%s'", COMPONENT_NAME, queryValue.str());
  399. //"query":{"bool":{"must":[{"match":{"kubernetes.container.name.keyword":{"query":"eclwatch","operator":"and"}}},{"match":{"created_ts":"2021-08-25T20:23:04.923Z"}}]} }
  400. default:
  401. throw makeStringExceptionV(-1, "%s: Unknown query criteria type encountered: '%s'", COMPONENT_NAME, queryValue.str());
  402. }
  403. std::string fullRequest = "{";
  404. esSearchMetaData(fullRequest, options.getLogFieldNames(), options.getLimit(), options.getStartFrom());
  405. fullRequest += "\"query\": { \"bool\": {";
  406. if(!wildCardSearch)
  407. {
  408. fullRequest += " \"must\": { ";
  409. std::string criteria;
  410. if (fullTextSearch) //are we performing a query on a blob, or exact term match?
  411. esMatchQueryString(criteria, queryValue.str(), queryField.c_str());
  412. else
  413. esTermQueryString(criteria, queryValue.str(), queryField.c_str());
  414. fullRequest += criteria;
  415. fullRequest += "}, "; //end must, expect filter to follow
  416. }
  417. std::string filter = "\"filter\": {";
  418. std::string range;
  419. const LogAccessTimeRange & trange = options.getTimeRange();
  420. //Bail out earlier?
  421. if (trange.getStartt().isNull())
  422. throw makeStringExceptionV(-1, "%s: start time must be provided!", COMPONENT_NAME);
  423. esTimestampQueryRangeString(range, m_globalIndexTimestampField.str(), trange.getStartt().getSimple(),trange.getEndt().isNull() ? -1 : trange.getEndt().getSimple());
  424. filter += range;
  425. filter += "}"; //end filter
  426. fullRequest += filter;
  427. fullRequest += "}}}"; //end bool and query
  428. DBGLOG("%s: Search string '%s'", COMPONENT_NAME, fullRequest.c_str());
  429. return m_esClient.search(queryIndex.c_str(), DEFAULT_ES_DOC_TYPE, fullRequest);
  430. }
  431. catch (std::runtime_error &e)
  432. {
  433. const char * wha = e.what();
  434. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, wha);
  435. }
  436. catch (IException * e)
  437. {
  438. StringBuffer mess;
  439. e->errorMessage(mess);
  440. e->Release();
  441. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, mess.str());
  442. }
  443. }
  444. bool ElasticStackLogAccess::fetchLog(const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format)
  445. {
  446. cpr::Response esresp = performESQuery(options);
  447. processESJsonResp(esresp, returnbuf, format);
  448. return true;
  449. }
  450. extern "C" IRemoteLogAccess * createInstance(IPropertyTree & logAccessPluginConfig)
  451. {
  452. //constructing ES Connection string(s) here b/c ES Client explicit ctr requires conn string array
  453. const char * protocol = logAccessPluginConfig.queryProp("connection/@protocol");
  454. const char * host = logAccessPluginConfig.queryProp("connection/@host");
  455. const char * port = logAccessPluginConfig.queryProp("connection/@port");
  456. std::string elasticSearchConnString;
  457. elasticSearchConnString = isEmptyString(protocol) ? DEFAULT_ES_PROTOCOL : protocol;
  458. elasticSearchConnString.append("://");
  459. elasticSearchConnString.append(isEmptyString(host) ? DEFAULT_ES_HOST : host);
  460. elasticSearchConnString.append(":").append((!port || !*port) ? DEFAULT_ES_PORT : port);
  461. elasticSearchConnString.append("/"); // required!
  462. return new ElasticStackLogAccess({elasticSearchConnString}, logAccessPluginConfig);
  463. }