ElasticStackLogAccess.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ElasticStackLogAccess.hpp"
  14. #include "platform.h"
  15. #include <string>
  16. #include <vector>
  17. #include <iostream>
  18. #include <json/json.h>
  19. #include <json/writer.h>
  20. #ifdef _CONTAINERIZED
  21. //In containerized world, most likely Elastic Search host is their default k8s hostname
  22. static constexpr const char * DEFAULT_ES_HOST = "elasticsearch-master";
  23. #else
  24. //In baremetal, localhost is good guess as any
  25. static constexpr const char * DEFAULT_ES_HOST = "localhost";
  26. #endif
  27. static constexpr const char * DEFAULT_ES_PROTOCOL = "http";
  28. static constexpr const char * DEFAULT_ES_DOC_TYPE = "_doc";
  29. static constexpr const char * DEFAULT_ES_PORT = "9200";
  30. static constexpr int DEFAULT_ES_DOC_LIMIT = 100;
  31. static constexpr int DEFAULT_ES_DOC_START = 0;
  32. static constexpr const char * DEFAULT_TS_NAME = "@timestamp";
  33. static constexpr const char * DEFAULT_INDEX_PATTERN = "filebeat*";
  34. static constexpr const char * DEFAULT_HPCC_LOG_SEQ_COL = "hpcc.log.sequence";
  35. static constexpr const char * DEFAULT_HPCC_LOG_TIMESTAMP_COL = "hpcc.log.timestamp";
  36. static constexpr const char * DEFAULT_HPCC_LOG_PROCID_COL = "hpcc.log.procid";
  37. static constexpr const char * DEFAULT_HPCC_LOG_THREADID_COL = "hpcc.log.threadid";
  38. static constexpr const char * DEFAULT_HPCC_LOG_MESSAGE_COL = "hpcc.log.message";
  39. static constexpr const char * DEFAULT_HPCC_LOG_JOBID_COL = "hpcc.log.jobid";
  40. static constexpr const char * DEFAULT_HPCC_LOG_COMPONENT_COL = "kubernetes.container.name";
  41. static constexpr const char * DEFAULT_HPCC_LOG_TYPE_COL = "hpcc.log.class";
  42. static constexpr const char * DEFAULT_HPCC_LOG_AUD_COL = "hpcc.log.audience";
  43. static constexpr const char * LOGMAP_INDEXPATTERN_ATT = "@storename";
  44. static constexpr const char * LOGMAP_SEARCHCOL_ATT = "@searchcolumn";
  45. static constexpr const char * LOGMAP_TIMESTAMPCOL_ATT = "@timestampcolumn";
  46. static constexpr const char * DEFAULT_SCROLL_TIMEOUT = "1m"; //Elastic Time Units (i.e. 1m = 1 minute).
  47. static constexpr std::size_t DEFAULT_MAX_RECORDS_PER_FETCH = 100;
  48. void ElasticStackLogAccess::getMinReturnColumns(std::string & columns)
  49. {
  50. //timestamp, source component, message
  51. columns.append(" \"").append(DEFAULT_HPCC_LOG_TIMESTAMP_COL).append("\", \"").append(m_componentsSearchColName.str()).append("\", \"").append(m_globalSearchColName).append("\" ");
  52. }
  53. void ElasticStackLogAccess::getDefaultReturnColumns(std::string & columns)
  54. {
  55. //timestamp, source component, all hpcc.log fields
  56. columns.append(" \"").append(DEFAULT_HPCC_LOG_TIMESTAMP_COL).append("\", \"").append(m_componentsSearchColName.str()).append("\", \"hpcc.log.*\" ");
  57. }
  58. void ElasticStackLogAccess::getAllColumns(std::string & columns)
  59. {
  60. columns.append( " \"*\" ");
  61. }
  62. ElasticStackLogAccess::ElasticStackLogAccess(const std::vector<std::string> &hostUrlList, IPropertyTree & logAccessPluginConfig) : m_esClient(hostUrlList)
  63. {
  64. if (!hostUrlList.at(0).empty())
  65. m_esConnectionStr.set(hostUrlList.at(0).c_str());
  66. m_pluginCfg.set(&logAccessPluginConfig);
  67. m_globalIndexTimestampField.set(DEFAULT_TS_NAME);
  68. m_globalIndexSearchPattern.set(DEFAULT_INDEX_PATTERN);
  69. m_globalSearchColName.set(DEFAULT_HPCC_LOG_MESSAGE_COL);
  70. m_classSearchColName.set(DEFAULT_HPCC_LOG_TYPE_COL);
  71. m_workunitSearchColName.set(DEFAULT_HPCC_LOG_JOBID_COL);
  72. m_componentsSearchColName.set(DEFAULT_HPCC_LOG_COMPONENT_COL);
  73. m_audienceSearchColName.set(DEFAULT_HPCC_LOG_AUD_COL);
  74. Owned<IPropertyTreeIterator> logMapIter = m_pluginCfg->getElements("logMaps");
  75. ForEach(*logMapIter)
  76. {
  77. IPropertyTree & logMap = logMapIter->query();
  78. const char * logMapType = logMap.queryProp("@type");
  79. if (streq(logMapType, "global"))
  80. {
  81. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  82. m_globalIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  83. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  84. m_globalSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  85. if (logMap.hasProp(LOGMAP_TIMESTAMPCOL_ATT))
  86. m_globalIndexTimestampField = logMap.queryProp(LOGMAP_TIMESTAMPCOL_ATT);
  87. }
  88. else if (streq(logMapType, "workunits"))
  89. {
  90. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  91. m_workunitIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  92. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  93. m_workunitSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  94. }
  95. else if (streq(logMapType, "components"))
  96. {
  97. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  98. m_componentsIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  99. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  100. m_componentsSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  101. }
  102. else if (streq(logMapType, "class"))
  103. {
  104. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  105. m_classIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  106. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  107. m_classSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  108. }
  109. else if (streq(logMapType, "audience"))
  110. {
  111. if (logMap.hasProp(LOGMAP_INDEXPATTERN_ATT))
  112. m_audienceIndexSearchPattern = logMap.queryProp(LOGMAP_INDEXPATTERN_ATT);
  113. if (logMap.hasProp(LOGMAP_SEARCHCOL_ATT))
  114. m_audienceSearchColName = logMap.queryProp(LOGMAP_SEARCHCOL_ATT);
  115. }
  116. }
  117. #ifdef LOGACCESSDEBUG
  118. StringBuffer out;
  119. const IPropertyTree * status = getESStatus();
  120. toXML(status, out);
  121. fprintf(stdout, "ES Status: %s", out.str());
  122. const IPropertyTree * is = getIndexSearchStatus(m_globalIndexSearchPattern);
  123. toXML(is, out);
  124. fprintf(stdout, "ES available indexes: %s", out.str());
  125. const IPropertyTree * ts = getTimestampTypeFormat(m_globalIndexSearchPattern, m_globalIndexTimestampField);
  126. toXML(ts, out);
  127. fprintf(stdout, "ES %s timestamp info: '%s'", m_globalIndexSearchPattern.str(), out.str());
  128. #endif
  129. }
  130. const IPropertyTree * ElasticStackLogAccess::performAndLogESRequest(Client::HTTPMethod httpmethod, const char * url, const char * reqbody, const char * logmessageprefix, LogMsgCategory reqloglevel = MCdebugProgress, LogMsgCategory resploglevel = MCdebugProgress)
  131. {
  132. try
  133. {
  134. LOG(reqloglevel,"ESLogAccess: Requesting '%s'... ", logmessageprefix );
  135. cpr::Response esREsponse = m_esClient.performRequest(httpmethod,url,reqbody);
  136. Owned<IPropertyTree> response = createPTreeFromJSONString(esREsponse.text.c_str());
  137. LOG(resploglevel,"ESLogAccess: '%s' response: '%s'", logmessageprefix, esREsponse.text.c_str());
  138. return response.getClear();
  139. }
  140. catch (ConnectionException & ce)//std::runtime_error
  141. {
  142. LOG(MCuserError, "ESLogAccess: Encountered error requesting '%s': '%s'", logmessageprefix, ce.what());
  143. }
  144. catch (...)
  145. {
  146. LOG(MCuserError, "ESLogAccess: Encountered error requesting '%s'", logmessageprefix);
  147. }
  148. return nullptr;
  149. }
  150. const IPropertyTree * ElasticStackLogAccess::getTimestampTypeFormat(const char * indexpattern, const char * fieldname)
  151. {
  152. if (isEmptyString(indexpattern))
  153. throw makeStringException(-1, "ElasticStackLogAccess::getTimestampTypeFormat: indexpattern must be provided");
  154. if (isEmptyString(fieldname))
  155. throw makeStringException(-1, "ElasticStackLogAccess::getTimestampTypeFormat: fieldname must be provided");
  156. VStringBuffer timestampformatreq("%s/_mapping/field/created_ts?include_type_name=true&format=JSON", indexpattern);
  157. return performAndLogESRequest(Client::HTTPMethod::GET, timestampformatreq.str(), "", "getTimestampTypeFormat");
  158. }
  159. const IPropertyTree * ElasticStackLogAccess::getIndexSearchStatus(const char * indexpattern)
  160. {
  161. if (!indexpattern || !*indexpattern)
  162. throw makeStringException(-1, "ElasticStackLogAccess::getIndexSearchStatus: indexpattern must be provided");
  163. VStringBuffer indexsearch("_cat/indices/%s?format=JSON", indexpattern);
  164. return performAndLogESRequest(Client::HTTPMethod::GET, indexsearch.str(), "", "List of available indexes");
  165. }
  166. const IPropertyTree * ElasticStackLogAccess::getESStatus()
  167. {
  168. return performAndLogESRequest(Client::HTTPMethod::GET, "_cluster/health", "", "Target cluster health");
  169. }
  170. /*
  171. * Transform iterator of hits/fields to back-end agnostic response
  172. *
  173. */
  174. void processHitsJsonResp(IPropertyTreeIterator * iter, StringBuffer & returnbuf, LogAccessLogFormat format, bool wrapped)
  175. {
  176. if (!iter)
  177. throw makeStringExceptionV(-1, "%s: Detected null 'hits' ElasticSearch response", COMPONENT_NAME);
  178. switch (format)
  179. {
  180. case LOGACCESS_LOGFORMAT_xml:
  181. {
  182. if (wrapped)
  183. returnbuf.append("<lines>");
  184. ForEach(*iter)
  185. {
  186. IPropertyTree & cur = iter->query();
  187. returnbuf.append("<line>");
  188. toXML(&cur,returnbuf);
  189. returnbuf.append("</line>");
  190. }
  191. if (wrapped)
  192. returnbuf.append("</lines>");
  193. break;
  194. }
  195. case LOGACCESS_LOGFORMAT_json:
  196. {
  197. if (wrapped)
  198. returnbuf.append("{\"lines\": [");
  199. StringBuffer hitchildjson;
  200. bool first = true;
  201. ForEach(*iter)
  202. {
  203. IPropertyTree & cur = iter->query();
  204. toJSON(&cur,hitchildjson.clear());
  205. if (!first)
  206. returnbuf.append(", ");
  207. first = false;
  208. returnbuf.appendf("{\"fields\": [ %s ]}", hitchildjson.str());
  209. }
  210. if (wrapped)
  211. returnbuf.append("]}");
  212. break;
  213. }
  214. case LOGACCESS_LOGFORMAT_csv:
  215. {
  216. ForEach(*iter)
  217. {
  218. IPropertyTree & cur = iter->query();
  219. bool first = true;
  220. Owned<IPropertyTreeIterator> fieldelementsitr = cur.getElements("*");
  221. ForEach(*fieldelementsitr)
  222. {
  223. if (!first)
  224. returnbuf.append(", ");
  225. else
  226. first = false;
  227. returnbuf.append(fieldelementsitr->query().queryProp(".")); // commas in data should be escaped
  228. }
  229. returnbuf.append("\n");
  230. }
  231. break;
  232. }
  233. default:
  234. break;
  235. }
  236. }
  237. /*
  238. * Transform ES query response to back-end agnostic response
  239. *
  240. */
  241. void processESSearchJsonResp(const cpr::Response & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format)
  242. {
  243. if (retrievedDocument.status_code != 200)
  244. throw makeStringExceptionV(-1, "ElasticSearch request failed: %s", retrievedDocument.text.c_str());
  245. #ifdef _DEBUG
  246. DBGLOG("Retrieved ES JSON DOC: %s", retrievedDocument.text.c_str());
  247. #endif
  248. Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.text.c_str());
  249. if (!tree)
  250. throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
  251. if (tree->getPropBool("timed_out", false))
  252. LOG(MCuserProgress,"ES Log Access: timeout reported");
  253. if (tree->getPropInt("_shards/failed",0) > 0)
  254. LOG(MCuserProgress,"ES Log Access: failed _shards reported");
  255. DBGLOG("ES Log Access: hit count: '%d'", tree->getPropInt("hits/total/value"));
  256. Owned<IPropertyTreeIterator> hitsFieldsElements = tree->getElements("hits/hits/fields");
  257. processHitsJsonResp(hitsFieldsElements, returnbuf, format, true);
  258. }
  259. /*
  260. * Transform ES scroll query response to back-end agnostic response
  261. *
  262. */
  263. void processESScrollJsonResp(const char * retValue, StringBuffer & returnbuf, LogAccessLogFormat format, bool wrapped)
  264. {
  265. Owned<IPropertyTree> tree = createPTreeFromJSONString(retValue);
  266. if (!tree)
  267. throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
  268. Owned<IPropertyTreeIterator> hitsFieldsElements = tree->getElements("hits/fields");
  269. processHitsJsonResp(hitsFieldsElements, returnbuf, format, wrapped);
  270. }
  271. void esTimestampQueryRangeString(std::string & range, const char * timestampfield, std::time_t from, std::time_t to)
  272. {
  273. if (isEmptyString(timestampfield))
  274. throw makeStringException(-1, "ES Log Access: TimeStamp Field must be provided");
  275. //Elastic Search Date formats can be customized, but if no format is specified then it uses the default:
  276. //"strict_date_optional_time||epoch_millis"
  277. // "%Y-%m-%d"'T'"%H:%M:%S"
  278. //We'll report the timestamps as epoch_millis
  279. range = "\"range\": { \"";
  280. range += timestampfield;
  281. range += "\": {";
  282. range += "\"gte\": \"";
  283. range += std::to_string(from*1000);
  284. range += "\"";
  285. if (to != -1) //aka 'to' has been initialized
  286. {
  287. range += ",\"lte\": \"";
  288. range += std::to_string(to*1000);
  289. range += "\"";
  290. }
  291. range += "} }";
  292. }
  293. /*
  294. * Constructs ElasticSearch match clause
  295. * Use for exact term matches such as a price, a product ID, or a username.
  296. */
  297. void esTermQueryString(std::string & search, const char *searchval, const char *searchfield)
  298. {
  299. //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html
  300. //You can use the term query to find documents based on a precise value such as a price, a product ID, or a username.
  301. //Avoid using the term query for text fields.
  302. //By default, Elasticsearch changes the values of text fields as part of analysis. This can make finding exact matches for text field values difficult.
  303. if (isEmptyString(searchval) || isEmptyString(searchfield))
  304. throw makeStringException(-1, "Could not create ES term query string: Either search value or search field is empty");
  305. search += "\"term\": { \"";
  306. search += searchfield;
  307. search += "\" : { \"value\": \"";
  308. search += searchval;
  309. search += "\" } }";
  310. }
  311. /*
  312. * Constructs ElasticSearch match clause
  313. * Use for full-text search
  314. */
  315. void esMatchQueryString(std::string & search, const char *searchval, const char *searchfield)
  316. {
  317. //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html
  318. //Returns documents that match a provided text, number, date or boolean value. The provided text is analyzed before matching.
  319. //The match query is the standard query for performing a full-text search, including options for fuzzy matching.
  320. if (isEmptyString(searchval) || isEmptyString(searchfield))
  321. throw makeStringException(-1, "Could not create ES match query string: Either search value or search field is empty");
  322. search += "\"match\": { \"";
  323. search += searchfield;
  324. search += "\" : \"";
  325. search += searchval;
  326. search += "\" }";
  327. }
  328. /*
  329. * Construct Elasticsearch query directives string
  330. */
  331. void ElasticStackLogAccess::esSearchMetaData(std::string & search, const LogAccessReturnColsMode retcolmode, const StringArray & selectcols, unsigned size = DEFAULT_ES_DOC_LIMIT, offset_t from = DEFAULT_ES_DOC_START)
  332. {
  333. //Query parameters:
  334. //https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-body.html
  335. //_source: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-source-filtering.html
  336. search += "\"_source\": false, \"fields\": [" ;
  337. switch (retcolmode)
  338. {
  339. case RETURNCOLS_MODE_all:
  340. getAllColumns(search);
  341. break;
  342. case RETURNCOLS_MODE_min:
  343. getMinReturnColumns(search);
  344. break;
  345. case RETURNCOLS_MODE_default:
  346. getDefaultReturnColumns(search);
  347. break;
  348. case RETURNCOLS_MODE_custom:
  349. {
  350. if (selectcols.length() > 0)
  351. {
  352. StringBuffer sourcecols;
  353. ForEachItemIn(idx, selectcols)
  354. {
  355. sourcecols.appendf("\"%s\"", selectcols.item(idx));
  356. if (idx < selectcols.length() -1)
  357. sourcecols.append(",");
  358. }
  359. search += sourcecols.str();
  360. }
  361. else
  362. {
  363. throw makeStringExceptionV(-1, "%s: Custom return columns specified, but no columns provided", COMPONENT_NAME);
  364. }
  365. break;
  366. }
  367. default:
  368. throw makeStringExceptionV(-1, "%s: Could not determine return colums mode", COMPONENT_NAME);
  369. }
  370. search += "],";
  371. search += "\"from\": ";
  372. search += std::to_string(from);
  373. search += ", \"size\": ";
  374. search += std::to_string(size);
  375. search += ", ";
  376. }
  377. void ElasticStackLogAccess::populateQueryStringAndQueryIndex(std::string & queryString, std::string & queryIndex, const LogAccessConditions & options)
  378. {
  379. try
  380. {
  381. StringBuffer queryValue;
  382. std::string queryField = m_globalSearchColName.str();
  383. queryIndex = m_globalIndexSearchPattern.str();
  384. bool fullTextSearch = true;
  385. bool wildCardSearch = false;
  386. options.queryFilter()->toString(queryValue);
  387. switch (options.queryFilter()->filterType())
  388. {
  389. case LOGACCESS_FILTER_jobid:
  390. {
  391. if (!m_workunitSearchColName.isEmpty())
  392. {
  393. queryField = m_workunitSearchColName.str();
  394. fullTextSearch = false; //found dedicated components column
  395. }
  396. if (!m_workunitIndexSearchPattern.isEmpty())
  397. {
  398. queryIndex = m_workunitIndexSearchPattern.str();
  399. }
  400. DBGLOG("%s: Searching log entries by jobid: '%s'...", COMPONENT_NAME, queryValue.str() );
  401. break;
  402. }
  403. case LOGACCESS_FILTER_class:
  404. {
  405. if (!m_classSearchColName.isEmpty())
  406. {
  407. queryField = m_classSearchColName.str();
  408. fullTextSearch = false; //found dedicated components column
  409. }
  410. if (!m_classIndexSearchPattern.isEmpty())
  411. {
  412. queryIndex = m_classIndexSearchPattern.str();
  413. }
  414. DBGLOG("%s: Searching log entries by class: '%s'...", COMPONENT_NAME, queryValue.str() );
  415. break;
  416. }
  417. case LOGACCESS_FILTER_audience:
  418. {
  419. if (!m_audienceSearchColName.isEmpty())
  420. {
  421. queryField = m_audienceSearchColName.str();
  422. fullTextSearch = false; //found dedicated components column
  423. }
  424. if (!m_audienceIndexSearchPattern.isEmpty())
  425. {
  426. queryIndex = m_audienceIndexSearchPattern.str();
  427. }
  428. DBGLOG("%s: Searching log entries by target audience: '%s'...", COMPONENT_NAME, queryValue.str() );
  429. break;
  430. }
  431. case LOGACCESS_FILTER_component:
  432. {
  433. if (!m_componentsSearchColName.isEmpty())
  434. {
  435. queryField = m_componentsSearchColName.str();
  436. fullTextSearch = false; //found dedicated components column
  437. }
  438. if (!m_componentsIndexSearchPattern.isEmpty())
  439. {
  440. queryIndex = m_componentsIndexSearchPattern.str();
  441. }
  442. DBGLOG("%s: Searching '%s' component log entries...", COMPONENT_NAME, queryValue.str() );
  443. break;
  444. }
  445. case LOGACCESS_FILTER_wildcard:
  446. {
  447. wildCardSearch = true;
  448. DBGLOG("%s: Performing wildcard log entry search...", COMPONENT_NAME);
  449. break;
  450. }
  451. case LOGACCESS_FILTER_or:
  452. throw makeStringExceptionV(-1, "%s: Compound query criteria not currently supported: '%s'", COMPONENT_NAME, queryValue.str());
  453. //"query":{"bool":{"must":[{"match":{"kubernetes.container.name.keyword":{"query":"eclwatch","operator":"or"}}},{"match":{"container.image.name.keyword":"hpccsystems\\core"}}]} }
  454. case LOGACCESS_FILTER_and:
  455. throw makeStringExceptionV(-1, "%s: Compound query criteria not currently supported: '%s'", COMPONENT_NAME, queryValue.str());
  456. //"query":{"bool":{"must":[{"match":{"kubernetes.container.name.keyword":{"query":"eclwatch","operator":"and"}}},{"match":{"created_ts":"2021-08-25T20:23:04.923Z"}}]} }
  457. default:
  458. throw makeStringExceptionV(-1, "%s: Unknown query criteria type encountered: '%s'", COMPONENT_NAME, queryValue.str());
  459. }
  460. queryString = "{";
  461. esSearchMetaData(queryString, options.getReturnColsMode(), options.getLogFieldNames(), options.getLimit(), options.getStartFrom());
  462. queryString += "\"query\": { \"bool\": {";
  463. if(!wildCardSearch)
  464. {
  465. queryString += " \"must\": { ";
  466. std::string criteria;
  467. if (fullTextSearch) //are we performing a query on a blob, or exact term match?
  468. esMatchQueryString(criteria, queryValue.str(), queryField.c_str());
  469. else
  470. esTermQueryString(criteria, queryValue.str(), queryField.c_str());
  471. queryString += criteria;
  472. queryString += "}, "; //end must, expect filter to follow
  473. }
  474. std::string filter = "\"filter\": {";
  475. std::string range;
  476. const LogAccessTimeRange & trange = options.getTimeRange();
  477. //Bail out earlier?
  478. if (trange.getStartt().isNull())
  479. throw makeStringExceptionV(-1, "%s: start time must be provided!", COMPONENT_NAME);
  480. esTimestampQueryRangeString(range, m_globalIndexTimestampField.str(), trange.getStartt().getSimple(),trange.getEndt().isNull() ? -1 : trange.getEndt().getSimple());
  481. filter += range;
  482. filter += "}"; //end filter
  483. queryString += filter;
  484. queryString += "}}}"; //end bool and query
  485. DBGLOG("%s: Search string '%s'", COMPONENT_NAME, queryString.c_str());
  486. }
  487. catch (std::runtime_error &e)
  488. {
  489. const char * wha = e.what();
  490. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, wha);
  491. }
  492. catch (IException * e)
  493. {
  494. StringBuffer mess;
  495. e->errorMessage(mess);
  496. e->Release();
  497. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, mess.str());
  498. }
  499. }
  500. /*
  501. * Construct ES query string, execute query
  502. */
  503. cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions & options)
  504. {
  505. try
  506. {
  507. std::string queryString;
  508. std::string queryIndex;
  509. populateQueryStringAndQueryIndex(queryString, queryIndex, options);
  510. return m_esClient.search(queryIndex.c_str(), DEFAULT_ES_DOC_TYPE, queryString);
  511. }
  512. catch (std::runtime_error &e)
  513. {
  514. const char * wha = e.what();
  515. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, wha);
  516. }
  517. catch (IException * e)
  518. {
  519. StringBuffer mess;
  520. e->errorMessage(mess);
  521. e->Release();
  522. throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, mess.str());
  523. }
  524. }
  525. bool ElasticStackLogAccess::fetchLog(const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format)
  526. {
  527. cpr::Response esresp = performESQuery(options);
  528. processESSearchJsonResp(esresp, returnbuf, format);
  529. return true;
  530. }
  531. class ELASTICSTACKLOGACCESS_API ElasticStackLogStream : public CInterfaceOf<IRemoteLogAccessStream>
  532. {
  533. public:
  534. virtual bool readLogEntries(StringBuffer & record, unsigned & recsRead) override
  535. {
  536. Json::Value res;
  537. recsRead = 0;
  538. if (m_esSroller.next(res))
  539. {
  540. if (!res["hits"].empty())
  541. {
  542. recsRead = res["hits"].size();
  543. std::ostringstream sout;
  544. m_jsonWriter->write(res, &sout); // serialize Json object to string for processing
  545. processESScrollJsonResp(sout.str().c_str(), record, m_outputFormat, false); // convert Json string to target format
  546. return true;
  547. }
  548. }
  549. return false;
  550. }
  551. ElasticStackLogStream(std::string & queryString, const char * connstr, const char * indexsearchpattern, LogAccessLogFormat format, std::size_t pageSize, std::string scrollTo)
  552. : m_esSroller(std::make_shared<elasticlient::Client>(std::vector<std::string>({connstr})), pageSize, scrollTo)
  553. {
  554. m_outputFormat = format;
  555. m_esSroller.init(indexsearchpattern, DEFAULT_ES_DOC_TYPE, queryString);
  556. m_jsonWriter.reset(m_jsonStreamBuilder.newStreamWriter());
  557. }
  558. virtual ~ElasticStackLogStream() override = default;
  559. private:
  560. elasticlient::Scroll m_esSroller;
  561. LogAccessLogFormat m_outputFormat;
  562. Json::StreamWriterBuilder m_jsonStreamBuilder;
  563. std::unique_ptr<Json::StreamWriter> m_jsonWriter;
  564. };
  565. IRemoteLogAccessStream * ElasticStackLogAccess::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format)
  566. {
  567. return getLogReader(options, format, DEFAULT_MAX_RECORDS_PER_FETCH);
  568. }
  569. IRemoteLogAccessStream * ElasticStackLogAccess::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize)
  570. {
  571. std::string queryString;
  572. std::string queryIndex;
  573. populateQueryStringAndQueryIndex(queryString, queryIndex, options);
  574. return new ElasticStackLogStream(queryString, m_esConnectionStr.str(), queryIndex.c_str(), format, pageSize, DEFAULT_SCROLL_TIMEOUT);
  575. }
  576. extern "C" IRemoteLogAccess * createInstance(IPropertyTree & logAccessPluginConfig)
  577. {
  578. //constructing ES Connection string(s) here b/c ES Client explicit ctr requires conn string array
  579. const char * protocol = logAccessPluginConfig.queryProp("connection/@protocol");
  580. const char * host = logAccessPluginConfig.queryProp("connection/@host");
  581. const char * port = logAccessPluginConfig.queryProp("connection/@port");
  582. std::string elasticSearchConnString;
  583. elasticSearchConnString = isEmptyString(protocol) ? DEFAULT_ES_PROTOCOL : protocol;
  584. elasticSearchConnString.append("://");
  585. elasticSearchConnString.append(isEmptyString(host) ? DEFAULT_ES_HOST : host);
  586. elasticSearchConnString.append(":").append((!port || !*port) ? DEFAULT_ES_PORT : port);
  587. elasticSearchConnString.append("/"); // required!
  588. return new ElasticStackLogAccess({elasticSearchConnString}, logAccessPluginConfig);
  589. }