cassandrawu.cpp 187 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jsort.hpp"
  28. #include "jptree.hpp"
  29. #include "jlzw.hpp"
  30. #include "jregexp.hpp"
  31. #include "dadfs.hpp"
  32. #include "dasds.hpp"
  33. #include "wuerror.hpp"
  34. #include "workunit.hpp"
  35. #include "workunit.ipp"
  36. #include "cassandraembed.hpp"
  37. #include <list>
  38. #include <string>
  39. #include <algorithm>
  40. #define EXPORT DECL_EXPORT
  41. namespace cassandraembed {
  42. #define CASS_WU_QUERY_EXPIRES (1000*60*5)
  43. #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
  44. #define DEFAULT_PREFIX_SIZE 2
  45. #define MIN_PREFIX_SIZE 2
  46. #define MAX_PREFIX_SIZE 8
  47. #define DEFAULT_PARTITIONS 2
  48. #define MIN_PARTITIONS 1
  49. #define MAX_PARTITIONS 10
  50. static const CassValue *getSingleResult(const CassResult *result)
  51. {
  52. const CassRow *row = cass_result_first_row(result);
  53. if (row)
  54. return cass_row_get_column(row, 0);
  55. else
  56. return NULL;
  57. }
  58. static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
  59. {
  60. const char *output;
  61. size_t length;
  62. check(cass_value_get_string(value, &output, &length));
  63. return str.append(length, output);
  64. }
  65. struct CassandraXmlMapping;
  66. interface ICassandraSession : public IInterface // MORE - rename!
  67. {
  68. virtual CassSession *querySession() const = 0;
  69. virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
  70. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const = 0;
  71. virtual unsigned queryTraceLevel() const = 0;
  72. virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
  73. virtual const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const = 0;
  74. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
  75. virtual IPTree *cassandraToWorkunitXML(const char *wuid) const = 0;
  76. virtual unsigned queryPrefixSize() const = 0;
  77. virtual unsigned queryPartitions() const = 0;
  78. };
  79. struct CassandraColumnMapper
  80. {
  81. virtual ~CassandraColumnMapper() {}
  82. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  83. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) = 0;
  84. };
  85. static class StringColumnMapper : implements CassandraColumnMapper
  86. {
  87. public:
  88. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  89. {
  90. rtlDataAttr str;
  91. unsigned chars;
  92. getUTF8Result(NULL, value, chars, str.refstr());
  93. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  94. row->setProp(name, s);
  95. return row;
  96. }
  97. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  98. {
  99. const char *value = row->queryProp(name);
  100. if (!value)
  101. return false;
  102. if (statement)
  103. statement->bindString(idx, value);
  104. return true;
  105. }
  106. } stringColumnMapper;
  107. static class RequiredStringColumnMapper : public StringColumnMapper
  108. {
  109. public:
  110. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  111. {
  112. const char *value = row->queryProp(name);
  113. if (!value)
  114. value = "";
  115. if (statement)
  116. statement->bindString(idx, value);
  117. return true;
  118. }
  119. } requiredStringColumnMapper;
  120. static class SuppliedStringColumnMapper : public StringColumnMapper
  121. {
  122. public:
  123. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  124. {
  125. if (statement)
  126. statement->bindString(idx, userVal);
  127. return true;
  128. }
  129. } suppliedStringColumnMapper;
  130. static class BlobColumnMapper : implements CassandraColumnMapper
  131. {
  132. public:
  133. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  134. {
  135. rtlDataAttr str;
  136. unsigned chars;
  137. getDataResult(NULL, value, chars, str.refdata());
  138. row->setPropBin(name, chars, str.getbytes());
  139. return row;
  140. }
  141. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  142. {
  143. MemoryBuffer value;
  144. row->getPropBin(name, value);
  145. if (value.length())
  146. {
  147. if (statement)
  148. statement->bindBytes(idx, (const cass_byte_t *) value.toByteArray(), value.length());
  149. return true;
  150. }
  151. else
  152. return false;
  153. }
  154. } blobColumnMapper;
  155. static class compressTreeColumnMapper : implements CassandraColumnMapper
  156. {
  157. public:
  158. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  159. {
  160. rtlDataAttr str;
  161. unsigned chars;
  162. getDataResult(NULL, value, chars, str.refdata());
  163. if (chars)
  164. {
  165. MemoryBuffer compressed, decompressed;
  166. compressed.setBuffer(chars, str.getbytes(), false);
  167. decompressToBuffer(decompressed, compressed);
  168. Owned<IPTree> p = createPTree(decompressed);
  169. row->setPropTree(name, p.getClear());
  170. }
  171. return row;
  172. }
  173. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  174. {
  175. IPTree *child = row->queryPropTree(name);
  176. if (child && child->hasChildren())
  177. {
  178. if (statement)
  179. {
  180. MemoryBuffer decompressed, compressed;
  181. child->serialize(decompressed);
  182. compressToBuffer(compressed, decompressed.length(), decompressed.toByteArray());
  183. statement->bindBytes(idx, (const cass_byte_t *) compressed.toByteArray(), compressed.length());
  184. }
  185. return true;
  186. }
  187. else
  188. return false;
  189. }
  190. } compressTreeColumnMapper;
  191. static class TimeStampColumnMapper : implements CassandraColumnMapper
  192. {
  193. public:
  194. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  195. {
  196. // never fetched (that may change?)
  197. return row;
  198. }
  199. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  200. {
  201. // never bound, but does need to be included in the ?
  202. return true;
  203. }
  204. } timestampColumnMapper;
  205. static class HashRootNameColumnMapper : implements CassandraColumnMapper
  206. {
  207. public:
  208. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  209. {
  210. throwUnexpected(); // we never return the partition column
  211. }
  212. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  213. {
  214. if (statement)
  215. {
  216. int hash = rtlHash32VStr(row->queryName(), 0) % session->queryPartitions();
  217. statement->bindInt32(idx, hash);
  218. }
  219. return true;
  220. }
  221. } hashRootNameColumnMapper;
  222. static class RootNameColumnMapper : implements CassandraColumnMapper
  223. {
  224. public:
  225. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  226. {
  227. rtlDataAttr str;
  228. unsigned chars;
  229. getUTF8Result(NULL, value, chars, str.refstr());
  230. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  231. row->renameProp("/", s);
  232. return row;
  233. }
  234. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  235. {
  236. if (statement)
  237. {
  238. const char *value = row->queryName();
  239. statement->bindString(idx, value);
  240. }
  241. return true;
  242. }
  243. } rootNameColumnMapper;
  244. // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
  245. // is an error to try to map such a column to/from the XML representation
  246. static class WuidColumnMapper : implements CassandraColumnMapper
  247. {
  248. public:
  249. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  250. {
  251. throwUnexpected();
  252. }
  253. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  254. {
  255. throwUnexpected();
  256. }
  257. } wuidColumnMapper;
  258. static class BoolColumnMapper : implements CassandraColumnMapper
  259. {
  260. public:
  261. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  262. {
  263. row->addPropBool(name, getBooleanResult(NULL, value));
  264. return row;
  265. }
  266. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  267. {
  268. if (row->hasProp(name))
  269. {
  270. if (statement)
  271. {
  272. bool value = row->getPropBool(name, false);
  273. statement->bindBool(idx, value ? cass_true : cass_false);
  274. }
  275. return true;
  276. }
  277. else
  278. return false;
  279. }
  280. } boolColumnMapper;
  281. static class PrefixSearchColumnMapper : implements CassandraColumnMapper
  282. {
  283. public:
  284. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  285. {
  286. return row;
  287. }
  288. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  289. {
  290. return _fromXML(statement, idx, row, userVal, session->queryPrefixSize(), true);
  291. }
  292. protected:
  293. static bool _fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
  294. {
  295. const char *columnVal = row->queryProp(xpath);
  296. if (columnVal)
  297. {
  298. if (statement)
  299. {
  300. StringBuffer buf(columnVal);
  301. if (uc)
  302. buf.toUpperCase();
  303. if (prefixLength)
  304. statement->bindString_n(idx, buf, prefixLength);
  305. else
  306. statement->bindString(idx, buf);
  307. }
  308. return true;
  309. }
  310. else
  311. return false;
  312. }
  313. } prefixSearchColumnMapper;
  314. static class SearchColumnMapper : public PrefixSearchColumnMapper
  315. {
  316. public:
  317. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  318. {
  319. return _fromXML(statement, idx, row, userVal, 0, true);
  320. }
  321. } searchColumnMapper;
  322. static class LCSearchColumnMapper : public PrefixSearchColumnMapper
  323. {
  324. public:
  325. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  326. {
  327. return _fromXML(statement, idx, row, userVal, 0, false);
  328. }
  329. } lcSearchColumnMapper;
  330. static class IntColumnMapper : implements CassandraColumnMapper
  331. {
  332. public:
  333. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  334. {
  335. if (name)
  336. row->addPropInt(name, getSignedResult(NULL, value));
  337. return row;
  338. }
  339. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  340. {
  341. if (row->hasProp(name))
  342. {
  343. if (statement)
  344. {
  345. int value = row->getPropInt(name);
  346. statement->bindInt32(idx, value);
  347. }
  348. return true;
  349. }
  350. else
  351. return false;
  352. }
  353. } intColumnMapper;
  354. static class DefaultedIntColumnMapper : public IntColumnMapper
  355. {
  356. public:
  357. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  358. {
  359. if (statement)
  360. {
  361. int value = row->getPropInt(name, atoi(userVal));
  362. statement->bindInt32(idx, value);
  363. }
  364. return true;
  365. }
  366. } defaultedIntColumnMapper;
  367. static class BigIntColumnMapper : implements CassandraColumnMapper
  368. {
  369. public:
  370. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  371. {
  372. row->addPropInt64(name, getSignedResult(NULL, value));
  373. return row;
  374. }
  375. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  376. {
  377. if (row->hasProp(name))
  378. {
  379. if (statement)
  380. {
  381. __int64 value = row->getPropInt64(name);
  382. statement->bindInt64(idx, value);
  383. }
  384. return true;
  385. }
  386. else
  387. return false;
  388. }
  389. } bigintColumnMapper;
  390. static class SimpleMapColumnMapper : implements CassandraColumnMapper
  391. {
  392. public:
  393. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  394. {
  395. Owned<IPTree> map = createPTree(name);
  396. CassandraIterator elems(cass_iterator_from_map(value));
  397. while (cass_iterator_next(elems))
  398. {
  399. rtlDataAttr str;
  400. unsigned chars;
  401. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  402. StringAttr s(str.getstr(), chars);
  403. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  404. }
  405. row->addPropTree(name, map.getClear());
  406. return row;
  407. }
  408. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  409. {
  410. Owned<IPTree> child = row->getPropTree(name);
  411. if (child)
  412. {
  413. unsigned numItems = child->numChildren();
  414. if (numItems)
  415. {
  416. if (statement)
  417. {
  418. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  419. Owned<IPTreeIterator> items = child->getElements("*");
  420. ForEach(*items)
  421. {
  422. IPTree &item = items->query();
  423. const char *key = item.queryName();
  424. const char *value = item.queryProp(NULL);
  425. if (key && value)
  426. {
  427. check(cass_collection_append_string(collection, key));
  428. check(cass_collection_append_string(collection, value));
  429. }
  430. }
  431. statement->bindCollection(idx, collection);
  432. }
  433. return true;
  434. }
  435. }
  436. return false;
  437. }
  438. } simpleMapColumnMapper;
  439. static class AttributeMapColumnMapper : implements CassandraColumnMapper
  440. {
  441. public:
  442. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  443. {
  444. CassandraIterator elems(cass_iterator_from_map(value));
  445. while (cass_iterator_next(elems))
  446. {
  447. rtlDataAttr str;
  448. unsigned chars;
  449. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  450. StringBuffer s("@");
  451. s.append(chars, str.getstr());
  452. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  453. }
  454. return row;
  455. }
  456. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  457. {
  458. // NOTE - name here provides a list of attributes that we should NOT be mapping
  459. Owned<IAttributeIterator> attrs = row->getAttributes();
  460. unsigned numItems = 0;
  461. ForEach(*attrs)
  462. {
  463. StringBuffer key(attrs->queryName());
  464. key.append('@');
  465. if (strstr(name, key) == NULL)
  466. numItems++;
  467. }
  468. if (numItems)
  469. {
  470. if (statement)
  471. {
  472. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  473. ForEach(*attrs)
  474. {
  475. StringBuffer key(attrs->queryName());
  476. key.append('@');
  477. if (strstr(name, key) == NULL)
  478. {
  479. const char *value = attrs->queryValue();
  480. check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
  481. check(cass_collection_append_string(collection, value));
  482. }
  483. }
  484. statement->bindCollection(idx, collection);
  485. }
  486. return true;
  487. }
  488. else
  489. return false;
  490. }
  491. } attributeMapColumnMapper;
  492. static class ElementMapColumnMapper : implements CassandraColumnMapper
  493. {
  494. public:
  495. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  496. {
  497. CassandraIterator elems(cass_iterator_from_map(value));
  498. while (cass_iterator_next(elems))
  499. {
  500. rtlDataAttr str;
  501. unsigned chars;
  502. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  503. StringBuffer elemName(chars, str.getstr());
  504. stringColumnMapper.toXML(row, elemName, cass_iterator_get_map_value(elems));
  505. }
  506. return row;
  507. }
  508. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  509. {
  510. // NOTE - name here provides a list of elements that we should NOT be mapping
  511. Owned<IPTreeIterator> elems = row->getElements("*");
  512. unsigned numItems = 0;
  513. ForEach(*elems)
  514. {
  515. IPTree &item = elems->query();
  516. StringBuffer key('@');
  517. key.append(item.queryName());
  518. key.append('@');
  519. if (strstr(name, key) == NULL)
  520. {
  521. const char *value = item.queryProp(".");
  522. if (value)
  523. numItems++;
  524. }
  525. }
  526. if (numItems)
  527. {
  528. if (statement)
  529. {
  530. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  531. ForEach(*elems)
  532. {
  533. IPTree &item = elems->query();
  534. StringBuffer key('@');
  535. key.append(item.queryName());
  536. key.append('@');
  537. if (strstr(name, key) == NULL)
  538. {
  539. const char *value = item.queryProp(".");
  540. if (value)
  541. {
  542. check(cass_collection_append_string(collection, item.queryName()));
  543. check(cass_collection_append_string(collection, value));
  544. }
  545. }
  546. }
  547. statement->bindCollection(idx, collection);
  548. }
  549. return true;
  550. }
  551. else
  552. return false;
  553. }
  554. } elementMapColumnMapper;
  555. static class SubtreeMapColumnMapper : implements CassandraColumnMapper
  556. {
  557. public:
  558. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  559. {
  560. CassandraIterator elems(cass_iterator_from_map(value));
  561. while (cass_iterator_next(elems))
  562. {
  563. rtlDataAttr str;
  564. unsigned chars;
  565. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  566. StringBuffer elemName(chars, str.getstr());
  567. const CassValue *value = cass_iterator_get_map_value(elems);
  568. StringBuffer valStr;
  569. getCassString(valStr, value);
  570. if (valStr.length() && valStr.charAt(0)== '<')
  571. {
  572. IPTree *sub = createPTreeFromXMLString(valStr);
  573. row->setPropTree(elemName, sub);
  574. }
  575. }
  576. return row;
  577. }
  578. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  579. {
  580. // NOTE - name here provides a list of elements that we SHOULD be mapping
  581. Owned<IPTreeIterator> elems = row->getElements("*");
  582. unsigned numItems = 0;
  583. ForEach(*elems)
  584. {
  585. IPTree &item = elems->query();
  586. StringBuffer key("@");
  587. key.append(item.queryName());
  588. key.append('@');
  589. if (strstr(name, key) != NULL)
  590. numItems++;
  591. }
  592. if (numItems)
  593. {
  594. if (statement)
  595. {
  596. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  597. ForEach(*elems)
  598. {
  599. IPTree &item = elems->query();
  600. StringBuffer key("@");
  601. key.append(item.queryName());
  602. key.append('@');
  603. if (strstr(name, key) != NULL)
  604. {
  605. StringBuffer x;
  606. ::toXML(&item, x);
  607. check(cass_collection_append_string(collection, item.queryName()));
  608. check(cass_collection_append_string(collection, x));
  609. }
  610. }
  611. statement->bindCollection(idx, collection);
  612. }
  613. return true;
  614. }
  615. else
  616. return false;
  617. }
  618. } subTreeMapColumnMapper;
  619. /*
  620. static class QueryTextColumnMapper : public StringColumnMapper
  621. {
  622. public:
  623. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  624. {
  625. // Name is "Query/Text ...
  626. IPTree *query = row->queryPropTree("Query");
  627. if (!query)
  628. {
  629. query = createPTree("Query");
  630. query = row->setPropTree("Query", query);
  631. row->setProp("Query/@fetchEntire", "1"); // Compatibility...
  632. }
  633. return StringColumnMapper::toXML(query, "Text", value);
  634. }
  635. } queryTextColumnMapper;
  636. */
  637. static class GraphMapColumnMapper : implements CassandraColumnMapper
  638. {
  639. public:
  640. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  641. : elemName(_elemName), nameAttr(_nameAttr)
  642. {
  643. }
  644. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  645. {
  646. Owned<IPTree> map = createPTree(name);
  647. CassandraIterator elems(cass_iterator_from_map(value));
  648. while (cass_iterator_next(elems))
  649. {
  650. rtlDataAttr str;
  651. unsigned chars;
  652. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  653. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  654. map->addPropTree(elemName, child.getClear());
  655. }
  656. row->addPropTree(name, map.getClear());
  657. return row;
  658. }
  659. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  660. {
  661. Owned<IPTree> child = row->getPropTree(name);
  662. if (child)
  663. {
  664. unsigned numItems = child->numChildren();
  665. if (numItems)
  666. {
  667. if (statement)
  668. {
  669. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  670. Owned<IPTreeIterator> items = child->getElements("*");
  671. ForEach(*items)
  672. {
  673. IPTree &item = items->query();
  674. const char *key = item.queryProp(nameAttr);
  675. // MORE - may need to read, and probably should write, compressed. At least for graphs
  676. StringBuffer value;
  677. ::toXML(&item, value, 0, 0);
  678. if (key && value.length())
  679. {
  680. check(cass_collection_append_string(collection, key));
  681. check(cass_collection_append_string(collection, value));
  682. }
  683. }
  684. statement->bindCollection(idx, collection);
  685. }
  686. return true;
  687. }
  688. }
  689. return false;
  690. }
  691. private:
  692. const char *elemName;
  693. const char *nameAttr;
  694. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid"), associationsMapColumnMapper("File", "@filename"), usedFieldsMapColumnMapper("field", "@name");
  695. static class WarningsMapColumnMapper : implements CassandraColumnMapper
  696. {
  697. public:
  698. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  699. {
  700. CassandraIterator elems(cass_iterator_from_map(value));
  701. while (cass_iterator_next(elems))
  702. {
  703. unsigned code = getUnsignedResult(NULL, cass_iterator_get_map_key(elems));
  704. VStringBuffer xpath("OnWarnings/OnWarning[@code='%u']", code);
  705. IPropertyTree * mapping = row->queryPropTree(xpath);
  706. if (!mapping)
  707. {
  708. IPropertyTree * onWarnings = ensurePTree(row, "OnWarnings");
  709. mapping = onWarnings->addPropTree("OnWarning", createPTree());
  710. mapping->setPropInt("@code", code);
  711. }
  712. rtlDataAttr str;
  713. unsigned chars;
  714. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  715. StringBuffer s(chars, str.getstr());
  716. mapping->setProp("@severity", s);
  717. }
  718. return row;
  719. }
  720. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  721. {
  722. if (!row->hasProp("OnWarnings/OnWarning"))
  723. return false;
  724. else
  725. {
  726. if (statement)
  727. {
  728. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, 5));
  729. Owned<IPTreeIterator> elems = row->getElements("OnWarnings/OnWarning");
  730. ForEach(*elems)
  731. {
  732. IPTree &item = elems->query();
  733. unsigned code = item.getPropInt("@code", 0);
  734. const char *value = item.queryProp("@severity");
  735. if (value)
  736. {
  737. check(cass_collection_append_int32(collection, code));
  738. check(cass_collection_append_string(collection, value));
  739. }
  740. }
  741. statement->bindCollection(idx, collection);
  742. }
  743. return true;
  744. }
  745. }
  746. } warningsMapColumnMapper;
  747. static class PluginListColumnMapper : implements CassandraColumnMapper
  748. {
  749. public:
  750. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  751. : elemName(_elemName), nameAttr(_nameAttr)
  752. {
  753. }
  754. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  755. {
  756. Owned<IPTree> map = name ? createPTree(name) : LINK(row);
  757. CassandraIterator elems(cass_iterator_from_collection(value));
  758. while (cass_iterator_next(elems))
  759. {
  760. Owned<IPTree> child = createPTree(elemName);
  761. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  762. map->addPropTree(elemName, child.getClear());
  763. }
  764. if (name)
  765. row->addPropTree(name, map.getClear());
  766. return row;
  767. }
  768. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  769. {
  770. Owned<IPTree> child = row->getPropTree(name);
  771. if (child)
  772. {
  773. unsigned numItems = child->numChildren();
  774. if (numItems)
  775. {
  776. if (statement)
  777. {
  778. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  779. Owned<IPTreeIterator> items = child->getElements("*");
  780. ForEach(*items)
  781. {
  782. IPTree &item = items->query();
  783. const char *value = item.queryProp(nameAttr);
  784. if (value)
  785. check(cass_collection_append_string(collection, value));
  786. }
  787. statement->bindCollection(idx, collection);
  788. }
  789. return true;
  790. }
  791. }
  792. return false;
  793. }
  794. private:
  795. const char *elemName;
  796. const char *nameAttr;
  797. } pluginListColumnMapper("Plugin", "@dllname"), subfileListColumnMapper("Subfile", "@name");
  798. struct CassandraXmlMapping
  799. {
  800. const char *columnName;
  801. const char *columnType;
  802. const char *xpath;
  803. CassandraColumnMapper &mapper;
  804. };
  805. struct CassandraTableInfo
  806. {
  807. const char *x;
  808. const CassandraXmlMapping *mappings;
  809. };
  810. static const int majorVersion = 1; // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
  811. static const int minorVersion = 2; // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
  812. // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
  813. // Make sure to increment this if any column is ever added below
  814. static const CassandraXmlMapping workunitsMappings [] =
  815. {
  816. {"partition", "int", NULL, hashRootNameColumnMapper},
  817. {"wuid", "text", NULL, rootNameColumnMapper},
  818. {"clustername", "text", "@clusterName", stringColumnMapper},
  819. {"jobname", "text", "@jobName", stringColumnMapper},
  820. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  821. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  822. {"wuScope", "text", "@scope", stringColumnMapper},
  823. {"submitID", "text", "@submitID", stringColumnMapper},
  824. {"state", "text", "@state", stringColumnMapper},
  825. {"action", "text", "Action", stringColumnMapper},
  826. {"protected", "boolean", "@protected", boolColumnMapper},
  827. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  828. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string (with leading spaces to force to one partition)
  829. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  830. {"agentSession", "bigint", "@agentSession", bigintColumnMapper},
  831. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  832. {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@priorityLevel@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  833. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  834. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  835. {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
  836. // These are catchalls for anything not processed above or in a child table
  837. {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper}, // name is the suppression list, note trailing @
  838. {"subtrees", "map<text, text>", "@Parameters@Process@Tracing@", subTreeMapColumnMapper}, // name is the INCLUSION list, note trailing @
  839. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  840. };
  841. static const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
  842. {
  843. {"partition", "int", NULL, hashRootNameColumnMapper},
  844. {"wuid", "text", NULL, rootNameColumnMapper},
  845. {"clustername", "text", "@clusterName", stringColumnMapper},
  846. {"jobname", "text", "@jobName", stringColumnMapper},
  847. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  848. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  849. {"wuScope", "text", "@scope", stringColumnMapper},
  850. {"submitID", "text", "@submitID", stringColumnMapper},
  851. {"state", "text", "@state", stringColumnMapper},
  852. {"action", "text", "Action", stringColumnMapper},
  853. {"protected", "boolean", "@protected", boolColumnMapper},
  854. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  855. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  856. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  857. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  858. };
  859. // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
  860. static const CassandraXmlMapping searchMappings [] =
  861. {
  862. {"xpath", "text", NULL, suppliedStringColumnMapper},
  863. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper},
  864. {"fieldValue", "text", NULL, searchColumnMapper},
  865. {"wuid", "text", NULL, rootNameColumnMapper},
  866. {"clustername", "text", "@clusterName", stringColumnMapper},
  867. {"jobname", "text", "@jobName", stringColumnMapper},
  868. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  869. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  870. {"scope", "text", "@scope", stringColumnMapper},
  871. {"submitID", "text", "@submitID", stringColumnMapper},
  872. {"state", "text", "@state", stringColumnMapper},
  873. {"action", "text", "Action", stringColumnMapper},
  874. {"protected", "boolean", "@protected", boolColumnMapper},
  875. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  876. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  877. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  878. { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
  879. };
  880. // The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
  881. const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
  882. static const CassandraXmlMapping uniqueSearchMappings [] =
  883. {
  884. {"xpath", "text", NULL, suppliedStringColumnMapper},
  885. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
  886. {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
  887. {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
  888. { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
  889. };
  890. // The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
  891. // We also add application name/key combinations, but we have to special-case that
  892. const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
  893. static const CassandraXmlMapping filesSearchMappings [] =
  894. {
  895. {"name", "text", "@name", stringColumnMapper},
  896. {"read", "boolean", "@read", boolColumnMapper},
  897. {"wuid", "text", NULL, suppliedStringColumnMapper},
  898. { NULL, "filesSearchValues", "((name, read), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  899. };
  900. // The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
  901. // version lookups (pick a partition at random).
  902. // Note that this table must have the same minimum layout on all versions.
  903. static const CassandraXmlMapping versionMappings [] =
  904. {
  905. {"partition", "int", "@partition", intColumnMapper},
  906. {"major", "int", "@major", intColumnMapper},
  907. {"minor", "int", "@minor", intColumnMapper},
  908. {"attributes", "map<text, text>", "@major@minor@partition@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  909. { NULL, "version", "((partition))", stringColumnMapper}
  910. };
  911. /*
  912. * Some thoughts on the secondary tables:
  913. * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
  914. * Too large and users will complain, but too small would hinder partitioning of the values across Cassandra nodes. 1 or 2 may be enough.
  915. * 2. I could combine all the secondary tables into 1 with a field indicating the type of the key. The key field would be repeated though... Would it help?
  916. * I'm not sure it really changes a lot - adds a bit of noise into the partitioner...
  917. * Actually, it does mean that the updates and deletes can all be done with a single Cassandra query, though whether that has any advantages over multiple in a batch I don't know
  918. * It MAY well make it easier to make sure that searches are case-insensitive, since we'll generally need to separate out the search field from the display field to achieve that
  919. * 3. Sort orders are tricky - I can use the secondary table to deliver sorted by one field as long as it is the one I am filtering by (but if is is I probably don't need it sorted!)
  920. *
  921. */
  922. // The following describe child tables - all keyed by wuid
  923. enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, WuFilesWrittenChild, WuFieldUsage, ChildTablesSize };
  924. struct ChildTableInfo
  925. {
  926. const char *parentElement;
  927. const char *childElement;
  928. ChildTablesEnum index;
  929. const CassandraXmlMapping *mappings;
  930. };
  931. // wuQueries table is slightly unusual among the child tables as is is 1:1 - it is split out for lazy load purposes.
  932. static const CassandraXmlMapping wuQueryMappings [] =
  933. {
  934. {"partition", "int", NULL, hashRootNameColumnMapper},
  935. {"wuid", "text", NULL, rootNameColumnMapper},
  936. {"associations", "map<text, text>", "Associated", associationsMapColumnMapper},
  937. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  938. {"query", "text", "Text", stringColumnMapper}, // May want to make this even lazier...
  939. {"shortQuery", "text", "ShortText", stringColumnMapper},
  940. { NULL, "wuQueries", "((partition), wuid)", stringColumnMapper}
  941. };
  942. static const ChildTableInfo wuQueriesTable =
  943. {
  944. "Query", NULL,
  945. WuQueryChild,
  946. wuQueryMappings
  947. };
  948. // wuExceptions table holds the exceptions associated with a wuid
  949. static const CassandraXmlMapping wuExceptionsMappings [] =
  950. {
  951. {"partition", "int", NULL, hashRootNameColumnMapper},
  952. {"wuid", "text", NULL, rootNameColumnMapper},
  953. {"sequence", "int", "@sequence", intColumnMapper},
  954. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  955. {"value", "text", ".", stringColumnMapper},
  956. { NULL, "wuExceptions", "((partition), wuid, sequence)", stringColumnMapper}
  957. };
  958. static const ChildTableInfo wuExceptionsTable =
  959. {
  960. "Exceptions", "Exception",
  961. WuExceptionsChild,
  962. wuExceptionsMappings
  963. };
  964. static const CassandraXmlMapping wuStatisticsMappings [] =
  965. {
  966. {"partition", "int", NULL, hashRootNameColumnMapper},
  967. {"wuid", "text", NULL, rootNameColumnMapper},
  968. {"ts", "bigint", "@ts", bigintColumnMapper}, // MORE - should change this to a timeuuid ?
  969. {"kind", "text", "@kind", stringColumnMapper},
  970. {"creator", "text", "@creator", stringColumnMapper},
  971. {"scope", "text", "@scope", stringColumnMapper},
  972. {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
  973. { NULL, "wuStatistics", "((partition), wuid, ts, kind, creator, scope)", stringColumnMapper}
  974. };
  975. static const ChildTableInfo wuStatisticsTable =
  976. {
  977. "Statistics", "Statistic",
  978. WuStatisticsChild,
  979. wuStatisticsMappings
  980. };
  981. static const CassandraXmlMapping wuGraphsMappings [] =
  982. {
  983. {"partition", "int", NULL, hashRootNameColumnMapper},
  984. {"wuid", "text", NULL, rootNameColumnMapper},
  985. {"name", "text", "@name", stringColumnMapper},
  986. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  987. {"xgmml", "blob", "xgmml", compressTreeColumnMapper},
  988. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper} // Note - we do occasionally search by type - but that is done in a postfilter having preloaded/cached all
  989. };
  990. static const ChildTableInfo wuGraphsTable =
  991. {
  992. "Graphs", "Graph",
  993. WuGraphsChild,
  994. wuGraphsMappings
  995. };
  996. // A cut down version of the above - note this does not represent a different table!
  997. static const CassandraXmlMapping wuGraphMetasMappings [] =
  998. {
  999. {"partition", "int", NULL, hashRootNameColumnMapper},
  1000. {"wuid", "text", NULL, rootNameColumnMapper},
  1001. {"name", "text", "@name", stringColumnMapper},
  1002. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  1003. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper}
  1004. };
  1005. static const ChildTableInfo wuGraphMetasTable =
  1006. {
  1007. "Graphs", "Graph",
  1008. WuGraphsChild,
  1009. wuGraphMetasMappings
  1010. };
  1011. #define resultTableFields \
  1012. {"partition", "int", NULL, hashRootNameColumnMapper}, \
  1013. {"wuid", "text", NULL, rootNameColumnMapper}, \
  1014. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, \
  1015. {"name", "text", "@name", stringColumnMapper}, \
  1016. {"attributes", "map<text, text>", "@sequence@name@", attributeMapColumnMapper}, /* name is the suppression list */ \
  1017. {"rowcount", "int", "rowCount", intColumnMapper}, /* This is the number of rows in result (which may be stored in a file rather than in value) */ \
  1018. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper}, /* This is the number of rows in value */ \
  1019. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper}, \
  1020. {"logicalName", "text", "logicalName", stringColumnMapper}, /* either this or value will be present once result status is "calculated" */ \
  1021. {"value", "blob", "Value", blobColumnMapper}, \
  1022. {"graph", "text", "@graph", stringColumnMapper}, \
  1023. {"activity", "int", "@activity", intColumnMapper}
  1024. static const CassandraXmlMapping wuResultsMappings [] =
  1025. {
  1026. resultTableFields,
  1027. { NULL, "wuResults", "((partition), wuid, sequence)", stringColumnMapper}
  1028. };
  1029. static const ChildTableInfo wuResultsTable =
  1030. {
  1031. "Results", "Result",
  1032. WuResultsChild,
  1033. wuResultsMappings
  1034. };
  1035. // This looks very similar to the above, but the key is different...
  1036. static const CassandraXmlMapping wuVariablesMappings [] =
  1037. {
  1038. resultTableFields,
  1039. {"xmlValue", "text", "xmlValue", stringColumnMapper},
  1040. { NULL, "wuVariables", "((partition), wuid, sequence, name)", stringColumnMapper}
  1041. };
  1042. static const ChildTableInfo wuVariablesTable =
  1043. {
  1044. "Variables", "Variable",
  1045. WuVariablesChild,
  1046. wuVariablesMappings
  1047. };
  1048. // Again, very similar, but mapped to a different area of the XML
  1049. static const CassandraXmlMapping wuTemporariesMappings [] =
  1050. {
  1051. resultTableFields,
  1052. { NULL, "wuTemporaries", "((partition), wuid, sequence, name)", stringColumnMapper}
  1053. };
  1054. static const ChildTableInfo wuTemporariesTable =
  1055. {
  1056. "Temporaries", "Variable",
  1057. WuTemporariesChild,
  1058. wuTemporariesMappings
  1059. };
  1060. static const CassandraXmlMapping wuFilesReadMappings [] =
  1061. {
  1062. {"partition", "int", NULL, hashRootNameColumnMapper},
  1063. {"wuid", "text", NULL, rootNameColumnMapper},
  1064. {"name", "text", "@name", stringColumnMapper},
  1065. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1066. {"subfiles", "list<text>", NULL, subfileListColumnMapper},
  1067. { NULL, "wuFilesRead", "((partition), wuid, name)", stringColumnMapper}
  1068. };
  1069. static const ChildTableInfo wuFilesReadTable =
  1070. {
  1071. "FilesRead", "File",
  1072. WuFilesReadChild,
  1073. wuFilesReadMappings
  1074. };
  1075. static const CassandraXmlMapping wuFilesWrittenMappings [] =
  1076. {
  1077. {"partition", "int", NULL, hashRootNameColumnMapper},
  1078. {"wuid", "text", NULL, rootNameColumnMapper},
  1079. {"name", "text", "@name", stringColumnMapper},
  1080. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1081. { NULL, "wuFilesWritten", "((partition), wuid, name)", stringColumnMapper}
  1082. };
  1083. static const ChildTableInfo wuFilesWrittenTable =
  1084. {
  1085. "Files", "File",
  1086. WuFilesWrittenChild,
  1087. wuFilesWrittenMappings
  1088. };
  1089. static const CassandraXmlMapping wuFieldUsageMappings [] =
  1090. {
  1091. {"partition", "int", NULL, hashRootNameColumnMapper},
  1092. {"wuid", "text", NULL, rootNameColumnMapper},
  1093. {"name", "text", "@name", stringColumnMapper},
  1094. {"type", "text", "@type", stringColumnMapper},
  1095. {"numFields", "int", "@numFields", intColumnMapper},
  1096. {"numFieldsUsed", "int", "@numFieldsUsed", intColumnMapper},
  1097. {"fields", "map<text, text>", "fields", usedFieldsMapColumnMapper},
  1098. { NULL, "wuFieldUsage", "((partition), wuid, name)", stringColumnMapper}
  1099. };
  1100. static const ChildTableInfo wuFieldUsageTable =
  1101. {
  1102. "usedsources", "datasource",
  1103. WuFieldUsage,
  1104. wuFieldUsageMappings
  1105. };
  1106. // Order should match the enum above
  1107. static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, &wuFilesWrittenTable, &wuFieldUsageTable, NULL };
  1108. // Graph progress tables are read directly, XML mappers not used
  1109. static const CassandraXmlMapping wuGraphProgressMappings [] =
  1110. {
  1111. {"partition", "int", NULL, hashRootNameColumnMapper},
  1112. {"wuid", "text", NULL, rootNameColumnMapper},
  1113. {"graphID", "text", NULL, stringColumnMapper},
  1114. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1115. {"creator", "text", NULL, stringColumnMapper},
  1116. {"progress", "blob", NULL, blobColumnMapper},
  1117. { NULL, "wuGraphProgress", "((partition), wuid, graphID, subgraphID, creator)", stringColumnMapper}
  1118. };
  1119. static const CassandraXmlMapping wuGraphStateMappings [] =
  1120. {
  1121. {"partition", "int", NULL, hashRootNameColumnMapper},
  1122. {"wuid", "text", NULL, rootNameColumnMapper},
  1123. {"graphID", "text", NULL, stringColumnMapper},
  1124. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1125. {"state", "int", NULL, intColumnMapper},
  1126. { NULL, "wuGraphState", "((partition), wuid, graphID, subgraphID)", stringColumnMapper}
  1127. };
  1128. static const CassandraXmlMapping wuGraphRunningMappings [] =
  1129. {
  1130. {"partition", "int", NULL, hashRootNameColumnMapper},
  1131. {"wuid", "text", NULL, rootNameColumnMapper},
  1132. {"graphID", "text", NULL, stringColumnMapper},
  1133. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1134. { NULL, "wuGraphRunning", "((partition), wuid)", stringColumnMapper}
  1135. };
  1136. void getBoundFieldNames(const ICassandraSession *session, const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
  1137. {
  1138. while (mappings->columnName)
  1139. {
  1140. if (!inXML || mappings->mapper.fromXML(session, NULL, 0, inXML, mappings->xpath, userVal))
  1141. {
  1142. names.appendf(",%s", mappings->columnName);
  1143. if (strcmp(mappings->columnType, "timeuuid")==0)
  1144. bindings.appendf(",now()");
  1145. else
  1146. bindings.appendf(",?");
  1147. }
  1148. mappings++;
  1149. }
  1150. tableName.append(mappings->columnType);
  1151. }
  1152. void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
  1153. {
  1154. while (mappings->columnName)
  1155. {
  1156. names.appendf(",%s", mappings->columnName);
  1157. mappings++;
  1158. }
  1159. tableName.append(mappings->columnType);
  1160. }
  1161. const char *queryTableName(const CassandraXmlMapping *mappings)
  1162. {
  1163. while (mappings->columnName)
  1164. mappings++;
  1165. return mappings->columnType;
  1166. }
  1167. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  1168. {
  1169. StringBuffer fields;
  1170. while (mappings->columnName)
  1171. {
  1172. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  1173. mappings++;
  1174. }
  1175. StringArray options;
  1176. options.appendList(mappings->xpath, "|");
  1177. assertex(options.length()); // Primary key at least should be present!
  1178. out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
  1179. unsigned idx = 1;
  1180. while (options.isItem(idx))
  1181. {
  1182. if (idx==1)
  1183. out.append(" WITH ");
  1184. else
  1185. out.append(", ");
  1186. out.append(options.item(idx));
  1187. idx++;
  1188. }
  1189. out.append(';');
  1190. return out;
  1191. }
  1192. const CassResult *executeQuery(CassSession *session, CassStatement *statement)
  1193. {
  1194. CassandraFuture future(cass_session_execute(session, statement));
  1195. future.wait("executeQuery");
  1196. return cass_future_get_result(future);
  1197. }
  1198. void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid, const ICassandraSession *sessionCache, CIArrayOf<CassandraStatement> &batch)
  1199. {
  1200. if (key)
  1201. {
  1202. StringBuffer ucKey(key);
  1203. ucKey.toUpperCase();
  1204. StringBuffer names;
  1205. StringBuffer tableName;
  1206. getFieldNames(searchMappings, names, tableName);
  1207. VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
  1208. CassandraStatement &update = *new CassandraStatement(sessionCache->prepareStatement(deleteQuery));
  1209. update.bindString(0, xpath);
  1210. update.bindString_n(1, ucKey, sessionCache->queryPrefixSize());
  1211. update.bindString(2, ucKey);
  1212. update.bindString(3, wuid);
  1213. batch.append(update);
  1214. }
  1215. }
  1216. void executeSimpleCommand(CassSession *session, const char *command)
  1217. {
  1218. CassandraStatement statement(cass_statement_new(command, 0));
  1219. CassandraFuture future(cass_session_execute(session, statement));
  1220. future.wait("execute");
  1221. }
  1222. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  1223. {
  1224. StringBuffer schema;
  1225. executeSimpleCommand(session, describeTable(mappings, schema));
  1226. }
  1227. extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1228. {
  1229. StringBuffer names;
  1230. StringBuffer bindings;
  1231. StringBuffer tableName;
  1232. getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
  1233. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1234. CassandraStatement update(session->prepareStatement(insertQuery));
  1235. unsigned bindidx = 0;
  1236. while (mappings->columnName)
  1237. {
  1238. if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
  1239. bindidx++;
  1240. mappings++;
  1241. }
  1242. check(cass_batch_add_statement(batch, update));
  1243. }
  1244. extern void simpleXMLtoCassandra(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1245. {
  1246. StringBuffer names;
  1247. StringBuffer bindings;
  1248. StringBuffer tableName;
  1249. getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
  1250. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1251. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1252. unsigned bindidx = 0;
  1253. while (mappings->columnName)
  1254. {
  1255. if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
  1256. bindidx++;
  1257. mappings++;
  1258. }
  1259. batch.append(update);
  1260. }
  1261. extern void deleteFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1262. {
  1263. StringBuffer names;
  1264. StringBuffer tableName;
  1265. getFieldNames(filesSearchMappings, names, tableName);
  1266. VStringBuffer deleteQuery("DELETE from %s where name=? and read=? and wuid=?", tableName.str());
  1267. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(deleteQuery));
  1268. update.bindString(0, name);
  1269. update.bindBool(1, read ? cass_true : cass_false);
  1270. update.bindString(2, wuid);
  1271. batch.append(update);
  1272. }
  1273. extern void addFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1274. {
  1275. StringBuffer bindings;
  1276. StringBuffer names;
  1277. StringBuffer tableName;
  1278. getBoundFieldNames(session, filesSearchMappings, names, bindings, NULL, NULL, tableName);
  1279. VStringBuffer insertQuery("INSERT INTO %s (%s) values (%s)", tableName.str(), names.str()+1, bindings.str()+1);
  1280. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1281. update.bindString(0, name);
  1282. update.bindBool(1, read ? cass_true : cass_false);
  1283. update.bindString(2, wuid);
  1284. batch.append(update);
  1285. }
  1286. extern void addUniqueValue(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *xpath, const char *value)
  1287. {
  1288. StringBuffer bindings;
  1289. StringBuffer names;
  1290. StringBuffer tableName;
  1291. getBoundFieldNames(session, uniqueSearchMappings, names, bindings, NULL, NULL, tableName);
  1292. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1293. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1294. update.bindString(0, xpath);
  1295. StringBuffer ucValue(value);
  1296. ucValue.toUpperCase();
  1297. update.bindString_n(1, ucValue, session->queryPrefixSize());
  1298. update.bindString(2, ucValue);
  1299. update.bindString(3, value);
  1300. batch.append(update);
  1301. }
  1302. extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree &row, const char *userVal)
  1303. {
  1304. StringBuffer bindings;
  1305. StringBuffer names;
  1306. StringBuffer tableName;
  1307. // Note that we bind all fields, even where there is no value in the XML
  1308. // This ensures that values are correctly deleted where necessary - it also has
  1309. // the fortuitous benefit of reducing the number of variants of the query that we need to prepare and cache.
  1310. getBoundFieldNames(session, mappings, names, bindings, NULL, userVal, tableName);
  1311. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1312. CassandraStatement update(session->prepareStatement(insertQuery));
  1313. update.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
  1314. update.bindString(1, wuid);
  1315. unsigned colidx = 2; // We already bound wuid and partition
  1316. while (mappings[colidx].columnName)
  1317. {
  1318. if (!mappings[colidx].mapper.fromXML(session, &update, colidx, &row, mappings[colidx].xpath, userVal))
  1319. update.bindNull(colidx);
  1320. colidx++;
  1321. }
  1322. check(cass_batch_add_statement(batch, update));
  1323. }
  1324. extern unsigned childCount(const ICassandraSession *session, const CassandraXmlMapping *mappings, const char *wuid)
  1325. {
  1326. VStringBuffer countQuery("SELECT count(*) FROM %s WHERE partition=? AND wuid=?;", queryTableName(mappings));
  1327. CassandraStatement count(session->prepareStatement(countQuery));
  1328. count.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
  1329. count.bindString(1, wuid);
  1330. CassandraFuture future(cass_session_execute(session->querySession(), count));
  1331. future.wait("select count(*)");
  1332. CassandraResult result(cass_future_get_result(future));
  1333. return getUnsignedResult(NULL, getSingleResult(result));
  1334. }
  1335. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, const char *userVal)
  1336. {
  1337. if (elements->first())
  1338. {
  1339. do
  1340. {
  1341. childXMLRowtoCassandra(session, batch, mappings, wuid, elements->query(), userVal);
  1342. }
  1343. while (elements->next());
  1344. }
  1345. }
  1346. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, const char *defaultValue)
  1347. {
  1348. Owned<IPTreeIterator> elements = inXML->getElements(xpath);
  1349. childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
  1350. }
  1351. static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
  1352. {
  1353. CassandraIterator cols(cass_iterator_from_row(row));
  1354. Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
  1355. if (xpath && *xpath && key && *key)
  1356. xml->setProp(xpath, key);
  1357. while (cass_iterator_next(cols))
  1358. {
  1359. assertex(mappings->columnName);
  1360. const CassValue *value = cass_iterator_get_column(cols);
  1361. if (value && !cass_value_is_null(value))
  1362. mappings->mapper.toXML(xml, mappings->xpath, value);
  1363. mappings++;
  1364. }
  1365. return xml.getClear();
  1366. }
  1367. /*
  1368. * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
  1369. */
  1370. interface IPostFilter : public IInterface
  1371. {
  1372. virtual bool matches(IPTree &p) const = 0;
  1373. virtual const char *queryValue() const = 0;
  1374. virtual const char *queryXPath() const = 0;
  1375. virtual WUSortField queryField() const = 0;
  1376. };
  1377. class PostFilter : public CInterfaceOf<IPostFilter>
  1378. {
  1379. public:
  1380. PostFilter(WUSortField _field, const char *_value, bool _wild)
  1381. : field(_field), xpath(queryFilterXPath(_field)), wild(_wild)
  1382. {
  1383. setValue(_value);
  1384. }
  1385. virtual bool matches(IPTree &p) const
  1386. {
  1387. const char *val = p.queryProp(xpath);
  1388. if (val)
  1389. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1390. else
  1391. return false;
  1392. }
  1393. virtual const char *queryValue() const
  1394. {
  1395. return value.str();
  1396. }
  1397. void setValue(const char *_value)
  1398. {
  1399. if (wild)
  1400. {
  1401. VStringBuffer filter("*%s*", _value);
  1402. pattern.set(filter);
  1403. }
  1404. else
  1405. pattern.set(_value);
  1406. value.set(_value);
  1407. }
  1408. virtual const char *queryXPath() const
  1409. {
  1410. return xpath;
  1411. }
  1412. virtual WUSortField queryField() const
  1413. {
  1414. return field;
  1415. }
  1416. protected:
  1417. const char *xpath;
  1418. StringAttr pattern;
  1419. StringAttr value;
  1420. WUSortField field;
  1421. bool wild;
  1422. };
  1423. class MultiValuePostFilter : public PostFilter
  1424. {
  1425. public:
  1426. MultiValuePostFilter(WUSortField _field, const char *_value)
  1427. : PostFilter(_field, _value, false)
  1428. {
  1429. setValue(_value);
  1430. }
  1431. virtual bool matches(IPTree &p) const
  1432. {
  1433. const char *val = p.queryProp(xpath);
  1434. if (val)
  1435. {
  1436. ForEachItemIn(idx, values)
  1437. {
  1438. if (strieq(val, values.item(idx)))
  1439. return true;
  1440. }
  1441. }
  1442. return false;
  1443. }
  1444. void setValue(const char *_value)
  1445. {
  1446. values.appendList(_value, "|");
  1447. }
  1448. private:
  1449. StringArray values;
  1450. };
  1451. class AppValuePostFilter : public CInterfaceOf<IPostFilter>
  1452. {
  1453. public:
  1454. AppValuePostFilter(const char *_name, const char *_value, bool _wild) : wild(_wild)
  1455. {
  1456. xpath.appendf("Application/%s", _name);
  1457. setValue(_value);
  1458. }
  1459. virtual bool matches(IPTree &p) const
  1460. {
  1461. const char *val = p.queryProp(xpath);
  1462. if (val)
  1463. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1464. else
  1465. return false;
  1466. }
  1467. virtual const char *queryValue() const
  1468. {
  1469. return value.str();
  1470. }
  1471. void setValue(const char *_value)
  1472. {
  1473. if (wild)
  1474. {
  1475. VStringBuffer filter("*%s*", _value);
  1476. pattern.set(filter);
  1477. }
  1478. else
  1479. pattern.set(_value);
  1480. value.set(_value);
  1481. }
  1482. virtual const char *queryXPath() const
  1483. {
  1484. return xpath;
  1485. }
  1486. virtual WUSortField queryField() const
  1487. {
  1488. return WUSFappvalue;
  1489. }
  1490. private:
  1491. StringBuffer xpath;
  1492. StringAttr pattern;
  1493. StringAttr value;
  1494. bool wild;
  1495. };
  1496. class CassSortableIterator : public CassandraIterator
  1497. {
  1498. public:
  1499. CassSortableIterator(CassIterator *_iterator, unsigned _idx, int _compareColumn, bool _descending)
  1500. : CassandraIterator(_iterator), idx(_idx), compareColumn(_compareColumn), descending(_descending)
  1501. {
  1502. }
  1503. const CassSortableIterator *nextRow()
  1504. {
  1505. if (iterator && cass_iterator_next(iterator))
  1506. {
  1507. if (compareColumn != -1)
  1508. {
  1509. const CassRow *row = cass_iterator_get_row(iterator);
  1510. getCassString(value.clear(), cass_row_get_column(row, compareColumn));
  1511. }
  1512. return this;
  1513. }
  1514. else
  1515. return NULL;
  1516. }
  1517. void stop()
  1518. {
  1519. value.clear();
  1520. set(NULL);
  1521. }
  1522. int compare(const CassSortableIterator *to) const
  1523. {
  1524. if (compareColumn==-1)
  1525. return idx - to->idx; // concat mode
  1526. int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
  1527. return descending ? -ret : ret;
  1528. }
  1529. private:
  1530. StringBuffer value;
  1531. unsigned idx;
  1532. int compareColumn;
  1533. bool descending;
  1534. };
  1535. interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
  1536. {
  1537. virtual bool hasPostFilters() const = 0;
  1538. virtual bool isMerging() const = 0;
  1539. virtual void notePosition() const = 0;
  1540. };
  1541. /*
  1542. *
  1543. * The cache entries serve two purposes:
  1544. *
  1545. * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
  1546. * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
  1547. */
  1548. class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
  1549. {
  1550. public:
  1551. CCassandraWuUQueryCacheEntry()
  1552. {
  1553. hint = get_cycles_now(); // MORE - should do better perhaps?
  1554. lastAccess = msTick();
  1555. }
  1556. __int64 queryHint() const
  1557. {
  1558. return hint;
  1559. }
  1560. void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
  1561. {
  1562. CriticalBlock b(crit);
  1563. // NOTE - we store one set of row information per page retrieved - and we normally traverse the pages
  1564. // in order so appending to the end is better than (for example) binchopping
  1565. ForEachItemInRev(idx, rows)
  1566. {
  1567. unsigned foundRow = rows.item(idx);
  1568. if (foundRow==row)
  1569. {
  1570. assert(streq(wuids.item(idx), wuid));
  1571. assert(streq(fieldValues.item(idx), fieldValue));
  1572. return;
  1573. }
  1574. if (foundRow < row)
  1575. break;
  1576. }
  1577. rows.add(row, idx+1);
  1578. wuids.add(wuid, idx+1);
  1579. fieldValues.add(fieldValue ? fieldValue : "", idx+1);
  1580. }
  1581. IConstWorkUnitIteratorEx *getResult() const
  1582. {
  1583. CriticalBlock b(crit);
  1584. return result.getLink();
  1585. }
  1586. void setResult(IConstWorkUnitIteratorEx *_result)
  1587. {
  1588. CriticalBlock b(crit);
  1589. result.set(_result);
  1590. }
  1591. unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset) const
  1592. {
  1593. // See if we can provide a base wuid to search above/below
  1594. CriticalBlock b(crit);
  1595. ForEachItemInRev(idx, rows)
  1596. {
  1597. unsigned foundRow = rows.item(idx);
  1598. if (foundRow <= startOffset)
  1599. {
  1600. wuid.set(wuids.item(idx));
  1601. fieldValue.set(fieldValues.item(idx));
  1602. return foundRow;
  1603. }
  1604. }
  1605. return 0;
  1606. }
  1607. void touch()
  1608. {
  1609. lastAccess = msTick();
  1610. }
  1611. inline unsigned queryLastAccess() const
  1612. {
  1613. return lastAccess;
  1614. }
  1615. private:
  1616. mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
  1617. UnsignedArray rows;
  1618. StringArray wuids;
  1619. StringArray fieldValues;
  1620. Owned<IConstWorkUnitIteratorEx> result;
  1621. __uint64 hint;
  1622. unsigned lastAccess;
  1623. };
  1624. class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
  1625. {
  1626. public:
  1627. IMPLEMENT_IINTERFACE;
  1628. CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, int _compareColumn, bool _descending)
  1629. : cache(_cache)
  1630. {
  1631. compareColumn = _compareColumn;
  1632. descending = _descending;
  1633. startRowNum = _startRowNum;
  1634. rowNum = 0;
  1635. }
  1636. void setStartOffset(unsigned start)
  1637. {
  1638. startRowNum = start; // we managed to do a seek forward via a filter
  1639. }
  1640. void setCompareColumn(int _compareColumn)
  1641. {
  1642. assert(!inputs.length());
  1643. compareColumn = _compareColumn;
  1644. }
  1645. void addResult(CassandraResult &result)
  1646. {
  1647. results.append(result);
  1648. }
  1649. void addPostFilters(IArrayOf<IPostFilter> &filters, unsigned start)
  1650. {
  1651. unsigned len = filters.length();
  1652. while (start<len)
  1653. postFilters.append(OLINK(filters.item(start++)));
  1654. }
  1655. void addPostFilter(PostFilter &filter)
  1656. {
  1657. postFilters.append(filter);
  1658. }
  1659. virtual bool hasPostFilters() const
  1660. {
  1661. return postFilters.length() != 0;
  1662. }
  1663. virtual bool isMerging() const
  1664. {
  1665. return results.length() > 1;
  1666. }
  1667. virtual bool first()
  1668. {
  1669. inputs.kill();
  1670. ForEachItemIn(idx, results)
  1671. {
  1672. inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending));
  1673. }
  1674. merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
  1675. rowNum = startRowNum;
  1676. return next();
  1677. }
  1678. virtual void notePosition() const
  1679. {
  1680. if (cache && current)
  1681. {
  1682. cache->noteWuid(current->queryWuid(), lastThorTime, rowNum);
  1683. }
  1684. }
  1685. virtual bool next()
  1686. {
  1687. Owned<IConstWorkUnitInfo> last = current.getClear();
  1688. for (;;)
  1689. {
  1690. const CassandraIterator *nextSource = nextMergedSource();
  1691. if (!nextSource)
  1692. {
  1693. if (cache && last)
  1694. {
  1695. cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
  1696. }
  1697. return false;
  1698. }
  1699. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
  1700. bool postFiltered = false;
  1701. ForEachItemIn(pfIdx, postFilters)
  1702. {
  1703. if (!postFilters.item(pfIdx).matches(*wuXML))
  1704. {
  1705. postFiltered = true;
  1706. break;
  1707. }
  1708. }
  1709. if (!postFiltered)
  1710. {
  1711. current.setown(createConstWorkUnitInfo(*wuXML));
  1712. lastThorTime.set(wuXML->queryProp("@totalThorTime"));
  1713. rowNum++;
  1714. return true;
  1715. }
  1716. }
  1717. }
  1718. virtual bool isValid()
  1719. {
  1720. return current != NULL;
  1721. }
  1722. virtual IConstWorkUnitInfo & query()
  1723. {
  1724. assertex(current);
  1725. return *current.get();
  1726. }
  1727. const CassandraIterator *nextMergedSource()
  1728. {
  1729. return (const CassSortableIterator *) merger->nextRow();
  1730. }
  1731. protected:
  1732. virtual void linkRow(const void *row) { }
  1733. virtual void releaseRow(const void *row) { }
  1734. virtual const void *nextRow(unsigned idx)
  1735. {
  1736. CassSortableIterator &it = inputs.item(idx);
  1737. return it.nextRow(); // returns either a pointer to the iterator, or NULL
  1738. }
  1739. virtual void stop(unsigned idx)
  1740. {
  1741. inputs.item(idx).stop();
  1742. }
  1743. virtual int docompare(const void *a, const void *b) const
  1744. {
  1745. // a and b point to to CassSortableIterator objects
  1746. const CassSortableIterator *aa = (const CassSortableIterator *) a;
  1747. const CassSortableIterator *bb = (const CassSortableIterator *) b;
  1748. return aa->compare(bb);
  1749. }
  1750. private:
  1751. IArrayOf<CassandraResult> results;
  1752. IArrayOf<CassSortableIterator> inputs;
  1753. Owned<IRowStream> merger; // NOTE - must be destroyed before inputs is destroyed
  1754. IArrayOf<IPostFilter> postFilters;
  1755. Owned<IConstWorkUnitInfo> current;
  1756. Linked<CCassandraWuUQueryCacheEntry> cache;
  1757. StringAttr lastThorTime;
  1758. int compareColumn;
  1759. unsigned startRowNum;
  1760. unsigned rowNum;
  1761. bool descending;
  1762. };
  1763. class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
  1764. {
  1765. public:
  1766. CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
  1767. : input(_input), sortorder(_sortorder), limit(_limit)
  1768. {
  1769. idx = 0;
  1770. }
  1771. virtual bool first()
  1772. {
  1773. if (input)
  1774. {
  1775. readFirst();
  1776. input.clear();
  1777. }
  1778. idx = 0;
  1779. return sorted.isItem(idx);
  1780. }
  1781. virtual bool next()
  1782. {
  1783. idx++;
  1784. if (sorted.isItem(idx))
  1785. return true;
  1786. return false;
  1787. }
  1788. virtual void notePosition() const
  1789. {
  1790. }
  1791. virtual bool isValid()
  1792. {
  1793. return sorted.isItem(idx);
  1794. }
  1795. virtual IConstWorkUnitInfo & query()
  1796. {
  1797. return sorted.item(idx);
  1798. }
  1799. virtual bool hasPostFilters() const
  1800. {
  1801. return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
  1802. }
  1803. virtual bool isMerging() const
  1804. {
  1805. return false;
  1806. }
  1807. private:
  1808. void readFirst()
  1809. {
  1810. ForEach(*input)
  1811. {
  1812. sorted.append(OLINK(input->query()));
  1813. if (sorted.length()>=limit)
  1814. break;
  1815. }
  1816. qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
  1817. }
  1818. virtual int docompare(const void *a, const void *b) const
  1819. {
  1820. // a and b point to to IConstWorkUnitInfo objects
  1821. const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
  1822. const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
  1823. int diff;
  1824. switch (sortorder & 0xff)
  1825. {
  1826. case WUSFuser:
  1827. diff = stricmp(aa->queryUser(), bb->queryUser());
  1828. break;
  1829. case WUSFcluster:
  1830. diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
  1831. break;
  1832. case WUSFjob:
  1833. diff = stricmp(aa->queryJobName(), bb->queryJobName());
  1834. break;
  1835. case WUSFstate:
  1836. diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
  1837. break;
  1838. case WUSFprotected:
  1839. diff = (int) bb->isProtected() - (int) aa->isProtected();
  1840. break;
  1841. case WUSFtotalthortime:
  1842. diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
  1843. break;
  1844. case WUSFwuid:
  1845. diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
  1846. break;
  1847. default:
  1848. throwUnexpected();
  1849. }
  1850. if (sortorder & WUSFreverse)
  1851. return -diff;
  1852. else
  1853. return diff;
  1854. }
  1855. Owned<IConstWorkUnitIterator> input;
  1856. IArrayOf<IConstWorkUnitInfo> sorted;
  1857. unsigned sortorder;
  1858. unsigned idx;
  1859. unsigned limit;
  1860. };
  1861. class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
  1862. {
  1863. public:
  1864. SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
  1865. : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
  1866. {
  1867. }
  1868. virtual bool first()
  1869. {
  1870. idx = 0;
  1871. // MORE - put a seek into the Ex interface
  1872. if (input->first())
  1873. {
  1874. for (int i = 0; i < startOffset;i++)
  1875. {
  1876. if (!input->next())
  1877. return false;
  1878. }
  1879. return true;
  1880. }
  1881. else
  1882. return false;
  1883. }
  1884. virtual bool next()
  1885. {
  1886. idx++;
  1887. if (idx >= pageSize)
  1888. {
  1889. input->notePosition();
  1890. return false;
  1891. }
  1892. return input->next();
  1893. }
  1894. virtual void notePosition() const
  1895. {
  1896. input->notePosition();
  1897. }
  1898. virtual bool isValid()
  1899. {
  1900. return idx < pageSize && input->isValid();
  1901. }
  1902. virtual IConstWorkUnitInfo & query()
  1903. {
  1904. return input->query();
  1905. }
  1906. virtual bool hasPostFilters() const
  1907. {
  1908. return false;
  1909. }
  1910. virtual bool isMerging() const
  1911. {
  1912. return false;
  1913. }
  1914. private:
  1915. Owned<IConstWorkUnitIteratorEx> input;
  1916. unsigned startOffset;
  1917. unsigned pageSize;
  1918. unsigned idx;
  1919. };
  1920. class CassJoinIterator : implements IConstWorkUnitIteratorEx, public CInterface
  1921. {
  1922. public:
  1923. IMPLEMENT_IINTERFACE;
  1924. CassJoinIterator(unsigned _compareColumn, bool _descending)
  1925. {
  1926. compareColumn = _compareColumn;
  1927. descending = _descending;
  1928. }
  1929. void addResult(CassandraResult &result)
  1930. {
  1931. results.append(result);
  1932. }
  1933. void addPostFilter(IPostFilter &post)
  1934. {
  1935. postFilters.append(post);
  1936. }
  1937. virtual bool first()
  1938. {
  1939. if (!results.length())
  1940. return false;
  1941. inputs.kill();
  1942. ForEachItemIn(idx, results)
  1943. {
  1944. Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending);
  1945. if (!input->nextRow())
  1946. return false;
  1947. inputs.append(*input.getClear());
  1948. }
  1949. return next();
  1950. }
  1951. virtual bool next()
  1952. {
  1953. current.clear();
  1954. for (;;)
  1955. {
  1956. unsigned idx = 0;
  1957. unsigned target = 0;
  1958. unsigned matches = 1; // I always match myself!
  1959. unsigned sources = inputs.length();
  1960. if (!sources)
  1961. return false;
  1962. while (matches < sources)
  1963. {
  1964. idx++;
  1965. if (idx==sources)
  1966. idx = 0;
  1967. int diff;
  1968. for (;;)
  1969. {
  1970. assert(idx != target);
  1971. diff = inputs.item(idx).compare(&inputs.item(target));
  1972. if (diff >= 0)
  1973. break;
  1974. if (!inputs.item(idx).nextRow())
  1975. {
  1976. inputs.kill(); // Once any reaches EOF, we are done
  1977. return false;
  1978. }
  1979. }
  1980. if (diff > 0)
  1981. {
  1982. target = idx;
  1983. matches = 1;
  1984. }
  1985. else
  1986. matches++;
  1987. }
  1988. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
  1989. bool postFiltered = false;
  1990. ForEachItemIn(pfIdx, postFilters)
  1991. {
  1992. if (!postFilters.item(pfIdx).matches(*wuXML))
  1993. {
  1994. postFiltered = true;
  1995. break;
  1996. }
  1997. }
  1998. if (!postFiltered)
  1999. {
  2000. current.setown(createConstWorkUnitInfo(*wuXML));
  2001. ForEachItemIn(idx2, inputs)
  2002. {
  2003. if (!inputs.item(idx2).nextRow())
  2004. {
  2005. inputs.clear(); // Make sure next() fails next time it is called
  2006. break;
  2007. }
  2008. }
  2009. return true;
  2010. }
  2011. }
  2012. }
  2013. virtual bool isValid()
  2014. {
  2015. return current != NULL;
  2016. }
  2017. virtual IConstWorkUnitInfo & query()
  2018. {
  2019. assertex(current);
  2020. return *current.get();
  2021. }
  2022. private:
  2023. IArrayOf<CassandraResult> results;
  2024. IArrayOf<CassSortableIterator> inputs;
  2025. IArrayOf<IPostFilter> postFilters;
  2026. Owned<IConstWorkUnitInfo> current;
  2027. unsigned compareColumn;
  2028. bool descending;
  2029. };
  2030. static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
  2031. {
  2032. VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
  2033. if (connection)
  2034. connection->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
  2035. else
  2036. connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
  2037. if (!connection)
  2038. throw makeStringExceptionV(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
  2039. }
  2040. class CCassandraWorkUnit : public CPersistedWorkUnit
  2041. {
  2042. public:
  2043. CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
  2044. : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
  2045. {
  2046. CPersistedWorkUnit::loadPTree(wuXML);
  2047. memset(childLoaded, 0, sizeof(childLoaded));
  2048. actionChanged = false;
  2049. stateChanged = false;
  2050. abortDirty = false;
  2051. }
  2052. ~CCassandraWorkUnit()
  2053. {
  2054. }
  2055. virtual void forceReload()
  2056. {
  2057. synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
  2058. loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
  2059. memset(childLoaded, 0, sizeof(childLoaded));
  2060. allDirty = false;
  2061. actionChanged = false;
  2062. stateChanged = false;
  2063. abortDirty = true;
  2064. }
  2065. void executeBatch(CassandraBatch &batch, const char * what) const
  2066. {
  2067. if (sessionCache->queryTraceLevel() > 1)
  2068. DBGLOG("Executing batch %s", what);
  2069. batch.execute(sessionCache->querySession(), what);
  2070. }
  2071. void executeAsync(CIArrayOf<CassandraStatement> &batch, const char * what) const
  2072. {
  2073. if (sessionCache->queryTraceLevel() > 1)
  2074. DBGLOG("Executing async batch %s (%d elements)", what, batch.length());
  2075. sessionCache->executeAsync(batch, what);
  2076. }
  2077. virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
  2078. {
  2079. const char *wuid = queryWuid();
  2080. CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
  2081. // Note we need to gather the information about what secondaries to delete before we delete the parent/children,
  2082. // but we actually do the deletion afterwards
  2083. CIArrayOf<CassandraStatement> deleteSearches;
  2084. deleteSecondaries(wuid, deleteSearches);
  2085. CassandraBatch mainBatch(CASS_BATCH_TYPE_UNLOGGED);
  2086. deleteChildren(wuid, mainBatch);
  2087. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, mainBatch);
  2088. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, mainBatch);
  2089. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, mainBatch);
  2090. // If the partitioning of the main workunits table does not match the partitioning of the other tables, then would be better to
  2091. // execute the deletes of the child tables and the main record as two separate batches.
  2092. CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
  2093. update.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2094. update.bindString(1, wuid);
  2095. check(cass_batch_add_statement(mainBatch, update));
  2096. executeBatch(mainBatch, "delete wu");
  2097. executeAsync(deleteSearches, "delete wu");
  2098. }
  2099. virtual void commit()
  2100. {
  2101. CPersistedWorkUnit::commit();
  2102. if (sessionCache->queryTraceLevel() >= 8)
  2103. {
  2104. StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
  2105. }
  2106. CIArrayOf<CassandraStatement> secondaryBatch;
  2107. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2108. Owned<CassandraBatch> deletesBatch;
  2109. const char *wuid = queryWuid();
  2110. bool isGlobal = streq(wuid, GLOBAL_WORKUNIT);
  2111. if (!isGlobal) // Global workunit only has child rows, no parent
  2112. {
  2113. if (prev) // Holds the values of the "basic" info at the last commit
  2114. updateSecondaries(wuid, secondaryBatch);
  2115. simpleXMLtoCassandra(sessionCache, batch, workunitsMappings, p); // This just does the parent row
  2116. }
  2117. if (allDirty && !isGlobal)
  2118. {
  2119. // MORE - this delete is technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
  2120. // empty newly-created WU, it is unnecessary.
  2121. // deleteChildren(wuid, deletesBatch);
  2122. // MORE can use the table?
  2123. childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, p, "Graphs/Graph", 0);
  2124. childXMLtoCassandra(sessionCache, batch, wuResultsMappings, p, "Results/Result", "0");
  2125. childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, p, "Variables/Variable", "-1"); // ResultSequenceStored
  2126. childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
  2127. childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
  2128. childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
  2129. childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, p, "FilesRead/File", 0);
  2130. childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, p, "Files/File", 0);
  2131. childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, p, "usedsources/datasource", 0);
  2132. IPTree *query = p->queryPropTree("Query");
  2133. if (query)
  2134. childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, wuid, *query, 0);
  2135. }
  2136. else
  2137. {
  2138. HashIterator iter(dirtyPaths);
  2139. ForEach (iter)
  2140. {
  2141. const char *path = (const char *) iter.query().getKey();
  2142. const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
  2143. if (sessionCache->queryTraceLevel()>2)
  2144. DBGLOG("Updating dirty path %s", path);
  2145. if (*path == '*')
  2146. {
  2147. if (!deletesBatch)
  2148. deletesBatch.setown(new CassandraBatch(CASS_BATCH_TYPE_UNLOGGED));
  2149. sessionCache->deleteChildByWuid(table, wuid, *deletesBatch);
  2150. childXMLtoCassandra(sessionCache, batch, table, p, path+1, 0);
  2151. }
  2152. else
  2153. {
  2154. IPTree *dirty = p->queryPropTree(path);
  2155. if (dirty)
  2156. childXMLRowtoCassandra(sessionCache, batch, table, wuid, *dirty, 0);
  2157. else if (sessionCache->queryTraceLevel())
  2158. {
  2159. StringBuffer xml;
  2160. toXML(p, xml);
  2161. DBGLOG("Missing dirty element %s in %s", path, xml.str());
  2162. }
  2163. }
  2164. }
  2165. ForEachItemIn(d, dirtyResults)
  2166. {
  2167. IWUResult &result = dirtyResults.item(d);
  2168. switch (result.getResultSequence())
  2169. {
  2170. case ResultSequenceStored:
  2171. childXMLRowtoCassandra(sessionCache, batch, wuVariablesMappings, wuid, *result.queryPTree(), "-1");
  2172. break;
  2173. case ResultSequenceInternal:
  2174. case ResultSequenceOnce:
  2175. childXMLRowtoCassandra(sessionCache, batch, wuTemporariesMappings, wuid, *result.queryPTree(), "-3");
  2176. break;
  2177. default:
  2178. childXMLRowtoCassandra(sessionCache, batch, wuResultsMappings, wuid, *result.queryPTree(), "0");
  2179. break;
  2180. }
  2181. }
  2182. }
  2183. if (sessionCache->queryTraceLevel() > 1)
  2184. DBGLOG("Executing commit batches");
  2185. if (deletesBatch)
  2186. {
  2187. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *deletesBatch));
  2188. futureBatch.wait("commit deletes");
  2189. }
  2190. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
  2191. futureBatch.wait("commit updates");
  2192. executeAsync(secondaryBatch, "commit");
  2193. if (stateChanged)
  2194. {
  2195. // Signal changes to state to anyone that might be watching via Dali
  2196. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  2197. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2198. conn->queryRoot()->setProp(NULL, p->queryProp("@state"));
  2199. }
  2200. if (actionChanged)
  2201. {
  2202. // Signal changes to action to anyone that might be watching via Dali
  2203. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  2204. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2205. conn->queryRoot()->setProp(NULL, p->queryProp("Action"));
  2206. }
  2207. prev.clear();
  2208. allDirty = false;
  2209. stateChanged = false;
  2210. actionChanged = false;
  2211. dirtyPaths.kill();
  2212. dirtyResults.kill();
  2213. }
  2214. virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
  2215. {
  2216. CPersistedWorkUnit::loadPTree(LINK(wuTree));
  2217. if (sessionCache->queryTraceLevel() >= 8)
  2218. {
  2219. StringBuffer s; toXML(wuTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
  2220. }
  2221. CIArrayOf<CassandraStatement> secondaryBatch;
  2222. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2223. updateSecondaries(secondaryBatch);
  2224. // MORE can use the table?
  2225. childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, wuTree, "Graphs/Graph", 0);
  2226. childXMLtoCassandra(sessionCache, batch, wuResultsMappings, wuTree, "Results/Result", "0");
  2227. childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, wuTree, "Variables/Variable", "-1"); // ResultSequenceStored
  2228. childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, wuTree, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
  2229. childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, wuTree, "Exceptions/Exception", 0);
  2230. childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, wuTree, "Statistics/Statistic", 0);
  2231. childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, wuTree, "FilesRead/File", 0);
  2232. childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, wuTree, "Files/File", 0);
  2233. childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, wuTree, "usedsources/datasource", 0);
  2234. IPTree *query = wuTree->queryPropTree("Query");
  2235. if (query)
  2236. childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, queryWuid(), *query, 0);
  2237. if (sessionCache->queryTraceLevel() > 1)
  2238. DBGLOG("Executing commit batches");
  2239. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
  2240. futureBatch.wait("commit updates");
  2241. executeAsync(secondaryBatch, "commit");
  2242. if (!graphProgressTree)
  2243. return;
  2244. if (sessionCache->queryTraceLevel() >= 8)
  2245. {
  2246. StringBuffer s; toXML(graphProgressTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
  2247. }
  2248. Owned<IPTreeIterator> graphs = graphProgressTree->getElements("*");
  2249. ForEach(*graphs)
  2250. {
  2251. IPTree &graph = graphs->query();
  2252. const char *graphName = graph.queryName();
  2253. Owned<IPTreeIterator> subs = graph.getElements("*");
  2254. ForEach(*subs)
  2255. {
  2256. IPTree &sub = subs->query();
  2257. const char *name=sub.queryName();
  2258. if (name[0]=='s' && name[1]=='g')
  2259. {
  2260. setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"), false);
  2261. }
  2262. else if (streq(name, "node"))
  2263. {
  2264. unsigned subid = sub.getPropInt("@id");
  2265. if (subid)
  2266. {
  2267. if (sub.hasChildren()) // Old format
  2268. setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"), false);
  2269. if (sub.hasProp("@_state"))
  2270. setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
  2271. }
  2272. }
  2273. }
  2274. if (graph.hasProp("@_state"))
  2275. setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
  2276. }
  2277. }
  2278. virtual IConstWUGraph *getGraph(const char *qname) const
  2279. {
  2280. // Just because we read one graph, does not mean we are likely to read more. So don't cache this result.
  2281. // Also note that graphs are generally read-only
  2282. CassandraResult result(sessionCache->fetchDataForWuidAndKey(wuGraphsMappings, queryWuid(), qname));
  2283. const CassRow *row = cass_result_first_row(result);
  2284. if (row)
  2285. {
  2286. Owned<IPTree> graph = createPTree("Graph");
  2287. unsigned colidx = 2; // We did not fetch wuid or partition
  2288. CassandraIterator cols(cass_iterator_from_row(row));
  2289. while (cass_iterator_next(cols))
  2290. {
  2291. assertex(wuGraphsMappings[colidx].columnName);
  2292. const CassValue *value = cass_iterator_get_column(cols);
  2293. if (value && !cass_value_is_null(value))
  2294. wuGraphsMappings[colidx].mapper.toXML(graph, wuGraphsMappings[colidx].xpath, value);
  2295. colidx++;
  2296. }
  2297. return new CLocalWUGraph(*this, graph.getClear());
  2298. }
  2299. else
  2300. return NULL;
  2301. }
  2302. virtual unsigned getResultCount() const
  2303. {
  2304. return childCount(sessionCache, wuResultsMappings, queryWuid());
  2305. }
  2306. virtual unsigned getGraphCount() const
  2307. {
  2308. return childCount(sessionCache, wuGraphsMappings, queryWuid());
  2309. }
  2310. virtual unsigned getSourceFileCount() const
  2311. {
  2312. return childCount(sessionCache, wuFilesReadMappings, queryWuid());
  2313. }
  2314. virtual unsigned getVariableCount() const
  2315. {
  2316. return childCount(sessionCache, wuVariablesMappings, queryWuid());
  2317. }
  2318. virtual void setUser(const char *user)
  2319. {
  2320. if (trackSecondaryChange(user, "@submitID"))
  2321. CPersistedWorkUnit::setUser(user);
  2322. }
  2323. virtual void setClusterName(const char *cluster)
  2324. {
  2325. if (trackSecondaryChange(cluster, "@clusterName"))
  2326. CPersistedWorkUnit::setClusterName(cluster);
  2327. }
  2328. virtual void setJobName(const char *jobname)
  2329. {
  2330. if (trackSecondaryChange(jobname, "@jobName"))
  2331. CPersistedWorkUnit::setJobName(jobname);
  2332. }
  2333. virtual void setState(WUState state)
  2334. {
  2335. if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
  2336. {
  2337. stateChanged = true;
  2338. CPersistedWorkUnit::setState(state);
  2339. }
  2340. }
  2341. virtual void setAction(WUAction action)
  2342. {
  2343. actionChanged = true;
  2344. CPersistedWorkUnit::setAction(action);
  2345. }
  2346. virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
  2347. {
  2348. VStringBuffer xpath("Application/%s/%s", app, propname);
  2349. if (trackSecondaryChange(value, xpath))
  2350. CPersistedWorkUnit::setApplicationValue(app, propname, value, overwrite);
  2351. }
  2352. virtual void _lockRemote()
  2353. {
  2354. lockWuid(daliLock, queryWuid());
  2355. }
  2356. virtual void _unlockRemote()
  2357. {
  2358. commit();
  2359. if (daliLock)
  2360. {
  2361. daliLock->close(true);
  2362. daliLock.clear();
  2363. }
  2364. }
  2365. virtual void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml, unsigned wfid)
  2366. {
  2367. CPersistedWorkUnit::createGraph(name, label, type, xgmml, wfid);
  2368. VStringBuffer xpath("Graphs/Graph[@name='%s']", name);
  2369. noteDirty(xpath, wuGraphsMappings);
  2370. }
  2371. virtual IWUResult * updateResultByName(const char * name)
  2372. {
  2373. return noteDirty(CPersistedWorkUnit::updateResultByName(name));
  2374. }
  2375. virtual IWUResult * updateResultBySequence(unsigned seq)
  2376. {
  2377. return noteDirty(CPersistedWorkUnit::updateResultBySequence(seq));
  2378. }
  2379. virtual IWUResult * updateTemporaryByName(const char * name)
  2380. {
  2381. return noteDirty(CPersistedWorkUnit::updateTemporaryByName(name));
  2382. }
  2383. virtual IWUResult * updateVariableByName(const char * name)
  2384. {
  2385. return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
  2386. }
  2387. virtual IWUQuery * updateQuery()
  2388. {
  2389. noteDirty("Query", wuQueryMappings);
  2390. return CPersistedWorkUnit::updateQuery();
  2391. }
  2392. virtual IConstWUQuery *getQuery() const
  2393. {
  2394. checkChildLoaded(wuQueriesTable);
  2395. return CPersistedWorkUnit::getQuery();
  2396. }
  2397. virtual IConstWUFileUsageIterator * getFieldUsage() const
  2398. {
  2399. checkChildLoaded(wuFieldUsageTable);
  2400. return CPersistedWorkUnit::getFieldUsage();
  2401. }
  2402. virtual IWUException *createException()
  2403. {
  2404. IWUException *result = CPersistedWorkUnit::createException();
  2405. VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
  2406. noteDirty(xpath, wuExceptionsMappings);
  2407. return result;
  2408. }
  2409. virtual void copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool all)
  2410. {
  2411. // Make sure that any required updates to the secondary files happen
  2412. IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
  2413. for (const char * const *search = searchPaths; *search; search++)
  2414. trackSecondaryChange(fromP->queryProp(*search), *search);
  2415. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2416. checkChildLoaded(**table);
  2417. CPersistedWorkUnit::copyWorkUnit(cached, copyStats, all);
  2418. memset(childLoaded, 1, sizeof(childLoaded));
  2419. allDirty = true;
  2420. actionChanged = true;
  2421. stateChanged = true;
  2422. }
  2423. virtual void noteFileRead(IDistributedFile *file)
  2424. {
  2425. if (file)
  2426. {
  2427. childLoaded[WuFilesReadChild] = true; // Prevent duplicates if someone tries to read back files read (unlikely)
  2428. CPersistedWorkUnit::noteFileRead(file);
  2429. VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
  2430. noteDirty(xpath, wuFilesReadMappings);
  2431. }
  2432. else
  2433. {
  2434. // A hack for testing!
  2435. Owned<IPropertyTreeIterator> files = p->getElements("FilesRead/File");
  2436. ForEach(*files)
  2437. {
  2438. VStringBuffer xpath("FilesRead/File[@name='%s']", files->query().queryProp("@name"));
  2439. noteDirty(xpath, wuFilesReadMappings);
  2440. }
  2441. }
  2442. }
  2443. virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
  2444. {
  2445. if (fileName)
  2446. {
  2447. childLoaded[WuFilesWrittenChild] = true; // Prevent duplicates if someone tries to read back files written from same object (unlikely)
  2448. CPersistedWorkUnit::addFile(fileName, clusters, usageCount, fileKind, graphOwner);
  2449. VStringBuffer xpath("Files/File[@name='%s']", fileName);
  2450. noteDirty(xpath, wuFilesWrittenMappings);
  2451. }
  2452. }
  2453. virtual void clearGraphProgress() const
  2454. {
  2455. const char *wuid = queryWuid();
  2456. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2457. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, batch);
  2458. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, batch);
  2459. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, batch);
  2460. executeBatch(batch, "clearGraphProgress");
  2461. }
  2462. virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
  2463. {
  2464. CassandraStatement statement(sessionCache->prepareStatement("SELECT graphID, subgraphID FROM wuGraphRunning where partition=? and wuid=?;"));
  2465. const char *wuid = queryWuid();
  2466. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2467. statement.bindString(1, wuid);
  2468. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2469. future.wait("getRunningGraph");
  2470. CassandraResult result(cass_future_get_result(future));
  2471. if (cass_result_row_count(result))
  2472. {
  2473. const CassRow *row = cass_result_first_row(result);
  2474. assertex(row);
  2475. StringBuffer b;
  2476. getCassString(b, cass_row_get_column(row, 0));
  2477. graphName.set(b);
  2478. subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2479. return true;
  2480. }
  2481. else
  2482. return false;
  2483. }
  2484. virtual IConstWUGraphProgress *getGraphProgress(const char *graphName) const
  2485. {
  2486. CassandraStatement statement(sessionCache->prepareStatement("SELECT subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=? and graphID=?;"));
  2487. const char *wuid = queryWuid();
  2488. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2489. statement.bindString(1, wuid);
  2490. statement.bindString(2, graphName);
  2491. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2492. future.wait("getGraphProgress");
  2493. CassandraResult result(cass_future_get_result(future));
  2494. CassandraIterator rows(cass_iterator_from_result(result));
  2495. if (!cass_result_row_count(result))
  2496. return NULL;
  2497. Owned<IPropertyTree> progress = createPTree(graphName);
  2498. progress->setPropBool("@stats", true);
  2499. progress->setPropInt("@format", PROGRESS_FORMAT_V);
  2500. while (cass_iterator_next(rows))
  2501. {
  2502. const CassRow *row = cass_iterator_get_row(rows);
  2503. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
  2504. StringBuffer creator, xml;
  2505. getCassString(creator, cass_row_get_column(row, 1));
  2506. getCassString(xml, cass_row_get_column(row, 2));
  2507. IPTree *stats = createPTreeFromXMLString(xml);
  2508. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2509. progress->addPropTree(stats->queryName(), stats);
  2510. }
  2511. return createConstGraphProgress(queryWuid(), graphName, progress); // Links progress
  2512. }
  2513. WUGraphState queryGraphState(const char *graphName) const
  2514. {
  2515. return queryNodeState(graphName, 0);
  2516. }
  2517. WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
  2518. {
  2519. CassandraStatement statement(sessionCache->prepareStatement("SELECT state FROM wuGraphState where partition=? and wuid=? and graphID=? and subgraphID=?;"));
  2520. const char *wuid = queryWuid();
  2521. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2522. statement.bindString(1, wuid);
  2523. statement.bindString(2, graphName);
  2524. statement.bindInt64(3, nodeId);
  2525. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2526. future.wait("queryNodeState");
  2527. CassandraResult result(cass_future_get_result(future));
  2528. if (cass_result_row_count(result))
  2529. return (WUGraphState) getUnsignedResult(NULL, getSingleResult(result));
  2530. else
  2531. return WUGraphUnknown;
  2532. }
  2533. void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
  2534. {
  2535. setNodeState(graphName, 0, state);
  2536. }
  2537. void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
  2538. {
  2539. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphState (partition, wuid, graphID, subgraphID, state) values (?,?,?,?,?);"));
  2540. const char *wuid = queryWuid();
  2541. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2542. statement.bindString(1, wuid);
  2543. statement.bindString(2, graphName);
  2544. statement.bindInt64(3, nodeId);
  2545. statement.bindInt32(4, (int) state);
  2546. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2547. future.wait("setNodeState update state");
  2548. if (nodeId)
  2549. {
  2550. switch (state)
  2551. {
  2552. case WUGraphRunning:
  2553. {
  2554. CassandraStatement statement2(sessionCache->prepareStatement("INSERT INTO wuGraphRunning (partition, wuid, graphID, subgraphID) values (?,?,?,?);"));
  2555. statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2556. statement2.bindString(1, wuid);
  2557. statement2.bindString(2, graphName);
  2558. statement2.bindInt64(3, nodeId);
  2559. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
  2560. future.wait("setNodeState update running");
  2561. break;
  2562. }
  2563. case WUGraphComplete:
  2564. {
  2565. CassandraStatement statement3(sessionCache->prepareStatement("DELETE FROM wuGraphRunning where partition=? and wuid=?;"));
  2566. statement3.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2567. statement3.bindString(1, wuid);
  2568. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement3));
  2569. future.wait("setNodeState remove running");
  2570. break;
  2571. }
  2572. }
  2573. }
  2574. }
  2575. class CCassandraWuGraphStats : public CWuGraphStats
  2576. {
  2577. public:
  2578. CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
  2579. : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge),
  2580. progress(createPTree(_rootScope)), parent(_parent)
  2581. {
  2582. }
  2583. virtual IPropertyTree &queryProgressTree() override
  2584. {
  2585. return *progress.get();
  2586. }
  2587. virtual void beforeDispose() override
  2588. {
  2589. CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
  2590. parent->setGraphProgress(progress, progress->queryName(), id, creator, merge);
  2591. }
  2592. protected:
  2593. Owned<IPropertyTree> progress;
  2594. Linked<const CCassandraWorkUnit> parent;
  2595. StringAttr wuid;
  2596. };
  2597. IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override
  2598. {
  2599. return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge);
  2600. }
  2601. virtual void _loadFilesRead() const
  2602. {
  2603. checkChildLoaded(wuFilesReadTable); // Lazy populate the FilesRead branch of p from Cassandra
  2604. CPersistedWorkUnit::_loadFilesRead();
  2605. }
  2606. virtual void _loadFilesWritten() const
  2607. {
  2608. checkChildLoaded(wuFilesWrittenTable); // Lazy populate the Files branch of p from Cassandra
  2609. CPersistedWorkUnit::_loadFilesWritten();
  2610. }
  2611. virtual void _loadResults() const
  2612. {
  2613. checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
  2614. CPersistedWorkUnit::_loadResults();
  2615. }
  2616. virtual void _loadGraphs(bool heavy) const
  2617. {
  2618. // Lazy populate the Graphs branch of p from Cassandra
  2619. if (heavy)
  2620. {
  2621. // If we loaded light before, and are now loading heavy, we need to force the reload. Unlikely to happen in practice.
  2622. if (graphsCached==1)
  2623. {
  2624. p->removeProp("Graphs");
  2625. childLoaded[WuGraphsChild] = false;
  2626. }
  2627. checkChildLoaded(wuGraphsTable);
  2628. }
  2629. else
  2630. {
  2631. checkChildLoaded(wuGraphMetasTable);
  2632. }
  2633. CPersistedWorkUnit::_loadGraphs(heavy);
  2634. }
  2635. virtual void _loadVariables() const
  2636. {
  2637. checkChildLoaded(wuVariablesTable); // Lazy populate the Variables branch of p from Cassandra
  2638. CPersistedWorkUnit::_loadVariables();
  2639. }
  2640. virtual void _loadTemporaries() const
  2641. {
  2642. checkChildLoaded(wuTemporariesTable); // Lazy populate the Temporaries branch of p from Cassandra
  2643. CPersistedWorkUnit::_loadTemporaries();
  2644. }
  2645. virtual void _loadStatistics() const
  2646. {
  2647. checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
  2648. CPersistedWorkUnit::_loadStatistics();
  2649. }
  2650. virtual void _loadExceptions() const
  2651. {
  2652. checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
  2653. CPersistedWorkUnit::_loadExceptions();
  2654. }
  2655. virtual void clearExceptions(const char * source=nullptr)
  2656. {
  2657. CriticalBlock b(crit);
  2658. noteDirty("*Exceptions/Exception", wuExceptionsMappings);
  2659. CPersistedWorkUnit::clearExceptions(source);
  2660. }
  2661. virtual IPropertyTree *getUnpackedTree(bool includeProgress) const
  2662. {
  2663. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2664. CriticalBlock b(crit);
  2665. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2666. checkChildLoaded(**table);
  2667. return CPersistedWorkUnit::getUnpackedTree(includeProgress);
  2668. }
  2669. virtual IPropertyTree *queryPTree() const
  2670. {
  2671. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2672. CriticalBlock b(crit);
  2673. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2674. checkChildLoaded(**table);
  2675. // And a hack for the fact that Dali stores state in both @state and <state>
  2676. const char *stateStr = p->queryProp("@state");
  2677. if (stateStr)
  2678. p->setProp("State", stateStr);
  2679. return p;
  2680. }
  2681. void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator, bool merge) const
  2682. {
  2683. if (merge)
  2684. UNIMPLEMENTED;
  2685. const char *wuid=queryWuid();
  2686. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
  2687. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2688. statement.bindString(1, wuid);
  2689. statement.bindString(2, gid);
  2690. statement.bindInt64(3, subid);
  2691. statement.bindString(4, creator);
  2692. StringBuffer tag;
  2693. tag.append("sg").append(subid);
  2694. IPTree *sq = progress->queryPropTree(tag);
  2695. assertex(sq);
  2696. StringBuffer xml;
  2697. toXML(sq, xml);
  2698. statement.bindString(5, xml);
  2699. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2700. future.wait("update stats");
  2701. }
  2702. virtual IPropertyTree *getGraphProgressTree() const
  2703. {
  2704. CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
  2705. const char *wuid = queryWuid();
  2706. graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2707. graphQuery.bindString(1, wuid);
  2708. CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
  2709. future.wait("getGraphProgress");
  2710. CassandraResult result(cass_future_get_result(future));
  2711. if (!cass_result_row_count(result))
  2712. return NULL;
  2713. Owned<IPTree> progress = createPTree("GraphProgress");
  2714. CassandraIterator rows(cass_iterator_from_result(result));
  2715. while (cass_iterator_next(rows))
  2716. {
  2717. const CassRow *row = cass_iterator_get_row(rows);
  2718. StringBuffer graphName, creator, xml;
  2719. getCassString(graphName, cass_row_get_column(row, 0));
  2720. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2721. getCassString(creator, cass_row_get_column(row, 2));
  2722. getCassString(xml, cass_row_get_column(row, 3));
  2723. if (!progress->hasProp(graphName))
  2724. progress->setPropTree(graphName, createPTree(graphName));
  2725. IPTree *graph = progress->queryPropTree(graphName);
  2726. graph->setPropBool("@stats", true);
  2727. graph->setPropInt("@format", PROGRESS_FORMAT_V);
  2728. IPTree *stats = createPTreeFromXMLString(xml);
  2729. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2730. graph->addPropTree(stats->queryName(), stats);
  2731. }
  2732. // Now fill in the graph/node states
  2733. CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
  2734. stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2735. stateQuery.bindString(1, wuid);
  2736. CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
  2737. stateFuture.wait("getGraphStateProgress");
  2738. CassandraResult stateResult(cass_future_get_result(stateFuture));
  2739. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2740. if (cass_result_row_count(stateResult))
  2741. {
  2742. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2743. while (cass_iterator_next(stateRows))
  2744. {
  2745. const CassRow *row = cass_iterator_get_row(stateRows);
  2746. StringBuffer graphName;
  2747. getCassString(graphName, cass_row_get_column(row, 0));
  2748. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2749. unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
  2750. IPTree *node = progress->queryPropTree(graphName);
  2751. if (node)
  2752. {
  2753. if (subId)
  2754. {
  2755. // This is what you might expect it to say...
  2756. //StringBuffer sg("sg");
  2757. //sg.append(subId);
  2758. //node = node->queryPropTree(sg);
  2759. // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
  2760. StringBuffer xpath("node[@id='");
  2761. xpath.append(subId).append("'])");
  2762. node->removeProp(xpath); // Shouldn't be one, just playing safe
  2763. node = node->addPropTree("node", createPTree("node"));
  2764. node->setPropInt("@id", subId);
  2765. node->setPropInt("@_state", state);
  2766. }
  2767. else
  2768. node->setPropInt("@_state", state);
  2769. }
  2770. }
  2771. }
  2772. return progress.getClear();
  2773. }
  2774. protected:
  2775. // Delete child table rows
  2776. void deleteChildren(const char *wuid, CassBatch *useBatch)
  2777. {
  2778. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2779. sessionCache->deleteChildByWuid(table[0]->mappings, wuid, useBatch);
  2780. }
  2781. // Lazy-populate a portion of WU xml from a child table
  2782. void checkChildLoaded(const ChildTableInfo &childTable) const
  2783. {
  2784. // NOTE - should be called inside critsec
  2785. if (!childLoaded[childTable.index])
  2786. {
  2787. const CassResult* cassResult;
  2788. try
  2789. {
  2790. cassResult = sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false);
  2791. }
  2792. catch (IException* e)
  2793. {
  2794. int errorCode = e->errorCode();
  2795. StringBuffer origErrorMsg;
  2796. e->errorMessage(origErrorMsg);
  2797. e->Release();
  2798. const char* tableName = queryTableName(childTable.mappings);
  2799. VStringBuffer newErrorMsg("Failed to read from cassandra table '%s' (Have you run wutool to initialize cassandra repository?), [%s]", tableName, origErrorMsg.str());
  2800. rtlFail(errorCode, newErrorMsg);
  2801. }
  2802. CassandraResult result(cassResult);
  2803. IPTree *results = p->queryPropTree(childTable.parentElement);
  2804. CassandraIterator rows(cass_iterator_from_result(result));
  2805. while (cass_iterator_next(rows))
  2806. {
  2807. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2808. Owned<IPTree> child;
  2809. if (!results)
  2810. results = ensurePTree(p, childTable.parentElement);
  2811. if (childTable.childElement)
  2812. child.setown(createPTree(childTable.childElement));
  2813. else
  2814. child.set(results);
  2815. unsigned colidx = 2; // We did not fetch wuid or partition
  2816. while (cass_iterator_next(cols))
  2817. {
  2818. assertex(childTable.mappings[colidx].columnName);
  2819. const CassValue *value = cass_iterator_get_column(cols);
  2820. if (value && !cass_value_is_null(value))
  2821. childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
  2822. colidx++;
  2823. }
  2824. if (childTable.childElement)
  2825. {
  2826. const char *childName = child->queryName();
  2827. results->addPropTree(childName, child.getClear());
  2828. }
  2829. }
  2830. childLoaded[childTable.index] = true;
  2831. }
  2832. }
  2833. // Update secondary tables (used to search wuids by owner, state, jobname etc)
  2834. void updateSecondaryTable(const char *xpath, const char *prevKey, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2835. {
  2836. if (prevKey && *prevKey)
  2837. deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, batch);
  2838. const char *value = p->queryProp(xpath);
  2839. if (value && *value)
  2840. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2841. }
  2842. void updateSecondaryTable(const char *xpath, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2843. {
  2844. const char *value = p->queryProp(xpath);
  2845. if (value && *value)
  2846. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2847. }
  2848. void deleteAppSecondaries(IPTree &pt, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2849. {
  2850. Owned<IPTreeIterator> apps = pt.getElements("Application");
  2851. ForEach(*apps)
  2852. {
  2853. IPTree &app = apps->query();
  2854. Owned<IPTreeIterator> names = app.getElements("*");
  2855. ForEach(*names)
  2856. {
  2857. IPTree &name = names->query();
  2858. Owned<IPTreeIterator> values = name.getElements("*");
  2859. ForEach(*values)
  2860. {
  2861. IPTree &value = values->query();
  2862. const char *appValue = value.queryProp(".");
  2863. if (appValue && *appValue)
  2864. {
  2865. VStringBuffer xpath("%s/%s/%s", app.queryName(), name.queryName(), value.queryName());
  2866. deleteSecondaryByKey(xpath, appValue, wuid, sessionCache, batch);
  2867. }
  2868. }
  2869. }
  2870. }
  2871. }
  2872. void deleteSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2873. {
  2874. for (const char * const *search = searchPaths; *search; search++)
  2875. deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, batch);
  2876. deleteAppSecondaries(*p, wuid, batch);
  2877. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2878. ForEach(*filesRead)
  2879. {
  2880. deleteFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2881. }
  2882. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2883. ForEach(*filesWritten)
  2884. {
  2885. deleteFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2886. }
  2887. }
  2888. void updateSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2889. {
  2890. const char * const *search;
  2891. for (search = searchPaths; *search; search++)
  2892. updateSecondaryTable(*search, prev->queryProp(*search), wuid, batch);
  2893. for (search = wildSearchPaths; *search; search++)
  2894. {
  2895. const char *value = p->queryProp(*search);
  2896. if (value && *value)
  2897. addUniqueValue(sessionCache, batch, *search, value);
  2898. }
  2899. deleteAppSecondaries(*prev, wuid, batch);
  2900. Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
  2901. ForEach(*appValues)
  2902. {
  2903. IConstWUAppValue& val=appValues->query();
  2904. addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
  2905. VStringBuffer key("@@%s", val.queryApplication());
  2906. addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
  2907. VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
  2908. addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
  2909. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2910. }
  2911. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2912. ForEach(*filesRead)
  2913. {
  2914. addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2915. }
  2916. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2917. ForEach(*filesWritten)
  2918. {
  2919. addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2920. }
  2921. }
  2922. void updateSecondaries(CIArrayOf<CassandraStatement> &batch)
  2923. {
  2924. const char *wuid = queryWuid();
  2925. const char * const *search;
  2926. for (search = searchPaths; *search; search++)
  2927. updateSecondaryTable(*search, wuid, batch);
  2928. for (search = wildSearchPaths; *search; search++)
  2929. {
  2930. const char *value = p->queryProp(*search);
  2931. if (value && *value)
  2932. addUniqueValue(sessionCache, batch, *search, value);
  2933. }
  2934. Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
  2935. ForEach(*appValues)
  2936. {
  2937. IConstWUAppValue& val=appValues->query();
  2938. addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
  2939. VStringBuffer key("@@%s", val.queryApplication());
  2940. addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
  2941. VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
  2942. addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
  2943. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2944. }
  2945. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2946. ForEach(*filesRead)
  2947. {
  2948. addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2949. }
  2950. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2951. ForEach(*filesWritten)
  2952. {
  2953. addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2954. }
  2955. }
  2956. // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
  2957. bool trackSecondaryChange(const char *newval, const char *xpath)
  2958. {
  2959. if (!newval)
  2960. newval = "";
  2961. const char *oldval = p->queryProp(xpath);
  2962. if (!oldval)
  2963. oldval = "";
  2964. if (streq(newval, oldval))
  2965. return false; // No change
  2966. bool add;
  2967. if (!prev)
  2968. {
  2969. prev.setown(createPTree());
  2970. add = true;
  2971. }
  2972. else add = !prev->hasProp(xpath);
  2973. if (add)
  2974. {
  2975. const char *tailptr = strrchr(xpath, '/');
  2976. if (tailptr)
  2977. {
  2978. StringBuffer head(tailptr-xpath, xpath);
  2979. ensurePTree(prev, head)->setProp(tailptr+1, oldval);
  2980. }
  2981. else
  2982. prev->setProp(xpath, oldval);
  2983. }
  2984. return true;
  2985. }
  2986. IWUResult *noteDirty(IWUResult *result)
  2987. {
  2988. if (result)
  2989. dirtyResults.append(*LINK(result));
  2990. return result;
  2991. }
  2992. void noteDirty(const char *xpath, const CassandraXmlMapping *table)
  2993. {
  2994. dirtyPaths.setValue(xpath, table);
  2995. }
  2996. Linked<const ICassandraSession> sessionCache;
  2997. mutable bool childLoaded[ChildTablesSize];
  2998. bool allDirty;
  2999. bool stateChanged;
  3000. bool actionChanged;
  3001. Owned<IPTree> prev;
  3002. MapStringTo<const CassandraXmlMapping *> dirtyPaths;
  3003. IArrayOf<IWUResult> dirtyResults;
  3004. Owned<IRemoteConnection> daliLock; // We still use dali for locking
  3005. };
  3006. class CCassandraWorkUnitWatcher : public CWorkUnitWatcher
  3007. {
  3008. public:
  3009. CCassandraWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
  3010. : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
  3011. {
  3012. if (flags & SubscribeOptionState)
  3013. {
  3014. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  3015. stateId = querySDS().subscribe(xpath.str(), *this);
  3016. }
  3017. if (flags & SubscribeOptionAction)
  3018. {
  3019. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  3020. actionId = querySDS().subscribe(xpath.str(), *this);
  3021. }
  3022. }
  3023. };
  3024. class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
  3025. {
  3026. IMPLEMENT_IINTERFACE;
  3027. public:
  3028. CCasssandraWorkUnitFactory(const SharedObject *_dll, const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
  3029. {
  3030. StringArray options;
  3031. options.append("write_bytes_high_water_mark=1000000"); // Set the default HWM - workunits get big. This can be overridden by supplied options
  3032. Owned<IPTreeIterator> it = props->getElements("Option");
  3033. ForEach(*it)
  3034. {
  3035. IPTree &item = it->query();
  3036. const char *opt = item.queryProp("@name");
  3037. const char *val = item.queryProp("@value");
  3038. if (opt && val)
  3039. {
  3040. if (strieq(opt, "randomWuidSuffix"))
  3041. randomizeSuffix = atoi(val);
  3042. else if (strieq(opt, "traceLevel"))
  3043. traceLevel = atoi(val);
  3044. else if (strieq(opt, "partitions"))
  3045. {
  3046. partitions = atoi(val); // Note this value is only used when creating a new repo
  3047. if (partitions < MIN_PARTITIONS)
  3048. partitions = MIN_PARTITIONS;
  3049. else if (partitions > MAX_PARTITIONS)
  3050. partitions = MAX_PARTITIONS;
  3051. }
  3052. else if (strieq(opt, "prefixSize"))
  3053. {
  3054. prefixSize = atoi(val); // Note this value is only used when creating a new repo
  3055. if (prefixSize < MIN_PREFIX_SIZE)
  3056. prefixSize = MIN_PREFIX_SIZE;
  3057. else if (prefixSize > MAX_PREFIX_SIZE)
  3058. prefixSize = MAX_PREFIX_SIZE;
  3059. }
  3060. else
  3061. {
  3062. VStringBuffer optstr("%s=%s", opt, val);
  3063. options.append(optstr);
  3064. }
  3065. }
  3066. }
  3067. cluster.setOptions(options);
  3068. if (!cluster.queryKeySpace())
  3069. cluster.setKeySpace("hpcc");
  3070. try
  3071. {
  3072. cluster.connect();
  3073. Owned<IPTree> versionInfo = getVersionInfo();
  3074. if (versionInfo)
  3075. {
  3076. int major = versionInfo->getPropInt("@major", 0);
  3077. int minor = versionInfo->getPropInt("@minor", 0);
  3078. partitions = versionInfo->getPropInt("@numPartitions", DEFAULT_PARTITIONS);
  3079. prefixSize = versionInfo->getPropInt("@searchPrefixSize", DEFAULT_PREFIX_SIZE);
  3080. if (major && minor)
  3081. {
  3082. // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
  3083. if (major != majorVersion)
  3084. throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
  3085. if (minor != minorVersion)
  3086. {
  3087. if (minor < minorVersion)
  3088. {
  3089. DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
  3090. switch (minor)
  3091. {
  3092. case 1:
  3093. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
  3094. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
  3095. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
  3096. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
  3097. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
  3098. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
  3099. break;
  3100. }
  3101. createVersionTable(true);
  3102. }
  3103. else
  3104. DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
  3105. }
  3106. }
  3107. }
  3108. else
  3109. {
  3110. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  3111. cluster.disconnect();
  3112. }
  3113. }
  3114. catch (IException *E)
  3115. {
  3116. EXCLOG(E);
  3117. E->Release();
  3118. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  3119. }
  3120. cacheRetirer.start();
  3121. LINK(_dll); // Yes, this leaks. Not really sure how to avoid that.
  3122. }
  3123. ~CCasssandraWorkUnitFactory()
  3124. {
  3125. cacheRetirer.stop();
  3126. cacheRetirer.join();
  3127. if (traceLevel)
  3128. DBGLOG("CCasssandraWorkUnitFactory destroyed");
  3129. }
  3130. virtual bool initializeStore()
  3131. {
  3132. createRepository();
  3133. return true;
  3134. }
  3135. virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
  3136. {
  3137. return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
  3138. }
  3139. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3140. {
  3141. unsigned suffix;
  3142. unsigned suffixLength;
  3143. if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
  3144. {
  3145. suffix = rand_r(&randState);
  3146. suffixLength = randomizeSuffix;
  3147. }
  3148. else
  3149. {
  3150. suffix = 0;
  3151. suffixLength = 0;
  3152. }
  3153. Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
  3154. for (;;)
  3155. {
  3156. // Create a unique WUID by adding suffixes until we managed to add a new value
  3157. StringBuffer useWuid(wuid);
  3158. if (suffix)
  3159. {
  3160. useWuid.append("-");
  3161. for (unsigned i = 0; i < suffixLength; i++)
  3162. {
  3163. useWuid.appendf("%c", '0'+suffix%10);
  3164. suffix /= 10;
  3165. }
  3166. }
  3167. CassandraStatement statement(prepared.getLink());
  3168. statement.bindInt32(0, rtlHash32VStr(useWuid.str(), 0) % partitions);
  3169. statement.bindString(1, useWuid.str());
  3170. if (traceLevel >= 2)
  3171. DBGLOG("Try creating %s", useWuid.str());
  3172. CassandraFuture future(cass_session_execute(querySession(), statement));
  3173. future.wait("execute");
  3174. CassandraResult result(cass_future_get_result(future));
  3175. if (cass_result_column_count(result)==1)
  3176. {
  3177. // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
  3178. // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
  3179. Owned<IPTree> wuXML = createPTree(useWuid);
  3180. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  3181. wuXML->setPropInt("@wuidVersion", WUID_VERSION); // we implement the latest version.
  3182. wuXML->setProp("@totalThorTime", ""); // must be non null, otherwise sorting by thor time excludes the values
  3183. Owned<IRemoteConnection> daliLock;
  3184. lockWuid(daliLock, useWuid);
  3185. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3186. return wu.getClear();
  3187. }
  3188. suffix = rand_r(&randState);
  3189. if (suffixLength<9)
  3190. suffixLength++;
  3191. }
  3192. }
  3193. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3194. {
  3195. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3196. if (wuXML)
  3197. return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
  3198. else
  3199. return NULL;
  3200. }
  3201. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3202. {
  3203. // We still use dali for the locks
  3204. Owned<IRemoteConnection> daliLock;
  3205. lockWuid(daliLock, wuid);
  3206. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3207. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3208. return wu.getClear();
  3209. }
  3210. virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
  3211. {
  3212. Owned<IPTree> pt(_pt);
  3213. try
  3214. {
  3215. Owned<IRemoteConnection> daliLock;
  3216. lockWuid(daliLock, wuid);
  3217. Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
  3218. Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
  3219. if (gProgress)
  3220. {
  3221. Owned<IPTreeIterator> graphs = gProgress->getElements("*");
  3222. ForEach(*graphs)
  3223. {
  3224. IPTree &graph = graphs->query();
  3225. const char *graphName = graph.queryName();
  3226. Owned<IPTreeIterator> subs = graph.getElements("*");
  3227. ForEach(*subs)
  3228. {
  3229. IPTree &sub = subs->query();
  3230. const char *name=sub.queryName();
  3231. if (name[0]=='s' && name[1]=='g')
  3232. {
  3233. wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"), false);
  3234. }
  3235. else if (streq(name, "node"))
  3236. {
  3237. unsigned subid = sub.getPropInt("@id");
  3238. if (subid)
  3239. {
  3240. if (sub.hasChildren()) // Old format
  3241. wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"), false);
  3242. if (sub.hasProp("@_state"))
  3243. wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
  3244. }
  3245. }
  3246. }
  3247. if (graph.hasProp("@_state"))
  3248. wu->setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
  3249. }
  3250. }
  3251. wu->commit();
  3252. return true;
  3253. }
  3254. catch (IException *E)
  3255. {
  3256. EXCLOG(E);
  3257. ::Release(E);
  3258. return false;
  3259. }
  3260. }
  3261. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
  3262. {
  3263. // MORE - should it check security? Dali version never did...
  3264. Owned<IRemoteConnection> daliLock;
  3265. lockWuid(daliLock, GLOBAL_WORKUNIT);
  3266. Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
  3267. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
  3268. return &wu->lockRemote(false);
  3269. }
  3270. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
  3271. {
  3272. return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
  3273. }
  3274. virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
  3275. {
  3276. return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
  3277. }
  3278. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
  3279. unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
  3280. ISecManager *secmgr, ISecUser *secuser)
  3281. {
  3282. // To assist in the efficient implementation of this function without requiring local sorting and filtering,
  3283. // we maintain a couple of additional search tables in addition to the main workunit table.
  3284. //
  3285. // The workunitsSearch table allows us to map from a given field's value to a workunit - to avoid the need
  3286. // for a second lookup this table contains a copy of all the 'lightweight' fields in the workunit. The table
  3287. // has a partition key of xpath, searchPrefix allowing it to be used for range lookups provided at least
  3288. // 2 characters are provided, while hopefully spreading the load a little between Cassandra partitions.
  3289. //
  3290. // The uniqueValues table is used to track what values are present for some wild-searchable fields, so we do
  3291. // two lookups - one to translate the wildcard to a set, then others to retrieve the wus matching each value
  3292. // in the set. These are done as N parallel reads rather than a single query (which might naively be expected
  3293. // to be more efficient) for two reasons. Firstly, we can get them back sorted that way and merge the results
  3294. // on the fly. Secondly, it is actually more efficient, at least in the case when there are multiple Cassandra
  3295. // partitions, since it in-effect cuts out the step of talking to a coordinator node which would talk to
  3296. // multiple other nodes to get the data.
  3297. //
  3298. // We go to some lengths to avoid post-sorting if we can, but any sort order other than by wuid or totalThorTime
  3299. // will post-sort it. If a post-sort is required, we will fetch up to WUID_LOCALSORT_LIMIT rows, - if there are
  3300. // more then we should fail, and the user should be invited to add filters.
  3301. //
  3302. // We can do at most one 'hard' filter, plus a filter on wuid range - anything else will require post-filtering.
  3303. // Most 'wild' searches can only be done with post-filtering, but some can be translated to multiple hard values
  3304. // using the unique values table. In such cases we merge results in the fly to avoid a post-sort if possible
  3305. //
  3306. // Note that Cassandra does not presently support filtering before returning the values except where a
  3307. // key or secondary index is available - even if ALLOW FILTERING is specified. If it did, some of the post-
  3308. // filtering would be better off done at the Cassandra side.
  3309. //
  3310. // We should encourage the UI to present drop-lists of users for filtering, to avoid the use of wildcard
  3311. // searches just because people can't remember the name.
  3312. //
  3313. // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids.
  3314. // This can perhaps be join-merged if other filters are present. This is still TBD at the moment.
  3315. Owned<CCassandraWuUQueryCacheEntry> cached;
  3316. if (cachehint && *cachehint)
  3317. {
  3318. CriticalBlock b(cacheCrit);
  3319. cached.set(cacheIdMap.getValue(*cachehint));
  3320. }
  3321. if (cached)
  3322. cached->touch();
  3323. else
  3324. cached.setown(new CCassandraWuUQueryCacheEntry());
  3325. if (pageSize > INT_MAX)
  3326. pageSize = INT_MAX;
  3327. const WUSortField *thisFilter = filters;
  3328. IArrayOf<IPostFilter> goodFilters;
  3329. IArrayOf<IPostFilter> wuidFilters;
  3330. IArrayOf<IPostFilter> poorFilters;
  3331. IArrayOf<IPostFilter> fileFilters;
  3332. IArrayOf<IPostFilter> remoteWildFilters;
  3333. Owned<IConstWorkUnitIteratorEx> result;
  3334. WUSortField baseSort = (WUSortField) (sortorder & 0xff);
  3335. StringBuffer thorTimeThreshold;
  3336. bool sortByThorTime = (baseSort == WUSFtotalthortime);
  3337. bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
  3338. bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
  3339. bool keepThorTimeFilter = sortByThorTime;
  3340. if (!keepThorTimeFilter)
  3341. {
  3342. const WUSortField *filterPtr = filters;
  3343. while (filterPtr && *filterPtr)
  3344. {
  3345. WUSortField field = (WUSortField) (*filterPtr & 0xff);
  3346. if (field == WUSFtotalthortime)
  3347. {
  3348. keepThorTimeFilter = true;
  3349. break;
  3350. }
  3351. filterPtr++;
  3352. }
  3353. }
  3354. if (!result)
  3355. {
  3356. Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
  3357. if (startOffset)
  3358. {
  3359. StringBuffer startWuid;
  3360. unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
  3361. if (found)
  3362. {
  3363. if (!keepThorTimeFilter)
  3364. {
  3365. if (sortDescending)
  3366. startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
  3367. else
  3368. startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
  3369. thorTimeThreshold.clear();
  3370. }
  3371. wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
  3372. startOffset -= found;
  3373. merger->setStartOffset(found);
  3374. }
  3375. }
  3376. const char *fv = (const char *) filterbuf;
  3377. while (thisFilter && *thisFilter)
  3378. {
  3379. WUSortField field = (WUSortField) (*thisFilter & 0xff);
  3380. bool isWild = (*thisFilter & WUSFwild) != 0;
  3381. switch (field)
  3382. {
  3383. case WUSFappvalue:
  3384. {
  3385. assertex(fv);
  3386. const char *name = fv;
  3387. fv = fv + strlen(fv)+1;
  3388. if (isWild)
  3389. {
  3390. StringBuffer s(fv);
  3391. if (s.charAt(s.length()-1)== '*')
  3392. s.remove(s.length()-1, 1);
  3393. if (s.length())
  3394. remoteWildFilters.append(*new AppValuePostFilter(name, s, true)); // Should we allow wild on the app and/or name too? Not at the moment
  3395. }
  3396. else
  3397. goodFilters.append(*new AppValuePostFilter(name, fv, false));
  3398. break;
  3399. }
  3400. case WUSFuser:
  3401. case WUSFcluster:
  3402. case WUSFjob:
  3403. if (isWild)
  3404. {
  3405. StringBuffer s(fv);
  3406. if (s.charAt(s.length()-1)== '*')
  3407. s.remove(s.length()-1, 1);
  3408. if (s.length())
  3409. remoteWildFilters.append(*new PostFilter(field, s, true)); // Trailing-only wildcards can be done remotely
  3410. }
  3411. else if (strchr(fv, '|'))
  3412. goodFilters.append(*new MultiValuePostFilter(field, fv));
  3413. else
  3414. goodFilters.append(*new PostFilter(field, fv, false));
  3415. break;
  3416. case WUSFstate:
  3417. case WUSFpriority:
  3418. case WUSFprotected:
  3419. // These can't be wild, but are not very good filters
  3420. if (strchr(fv, '|'))
  3421. poorFilters.append(*new MultiValuePostFilter(field, fv));
  3422. else
  3423. poorFilters.append(*new PostFilter(field, fv, false));
  3424. break;
  3425. case WUSFwuid: // Acts as wuidLo when specified as a filter
  3426. case WUSFwuidhigh:
  3427. // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
  3428. if (keepThorTimeFilter)
  3429. remoteWildFilters.append(*new PostFilter(field, fv, true));
  3430. else
  3431. mergeFilter(wuidFilters, field, fv);
  3432. break;
  3433. case WUSFfileread:
  3434. case WUSFfilewritten:
  3435. fileFilters.append(*new PostFilter(field, fv, true));
  3436. break;
  3437. case WUSFtotalthortime:
  3438. // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
  3439. if (thorTimeThreshold.isEmpty()) // If not a continuation
  3440. formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
  3441. break;
  3442. case WUSFwildwuid:
  3443. // Translate into a range - note that we only support trailing * wildcard.
  3444. if (fv && *fv)
  3445. {
  3446. StringBuffer s(fv);
  3447. if (s.charAt(s.length()-1)== '*')
  3448. s.remove(s.length()-1, 1);
  3449. if (s.length())
  3450. {
  3451. mergeFilter(wuidFilters, WUSFwuid, s);
  3452. s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
  3453. mergeFilter(wuidFilters, WUSFwuidhigh, s);
  3454. }
  3455. }
  3456. break;
  3457. case WUSFecl: // This is different...
  3458. if (isWild)
  3459. merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
  3460. else
  3461. goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
  3462. break;
  3463. default:
  3464. UNSUPPORTED("Workunit filter criteria");
  3465. }
  3466. thisFilter++;
  3467. if (fv)
  3468. fv = fv + strlen(fv)+1;
  3469. }
  3470. if (fileFilters.length())
  3471. {
  3472. // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
  3473. // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
  3474. // MORE read and written are not the same
  3475. assertex(fileFilters.length()==1); // If we supported more there would be a join phase here
  3476. merger->addPostFilters(goodFilters, 0);
  3477. merger->addPostFilters(poorFilters, 0);
  3478. merger->addPostFilters(remoteWildFilters, 0);
  3479. const IPostFilter &fileFilter = fileFilters.item(0);
  3480. CassandraResult wuids(fetchDataForFiles(fileFilter.queryValue(), wuidFilters, fileFilter.queryField()==WUSFfileread));
  3481. CassandraIterator rows(cass_iterator_from_result(wuids));
  3482. StringBuffer value;
  3483. while (cass_iterator_next(rows))
  3484. {
  3485. const CassRow *row = cass_iterator_get_row(rows);
  3486. getCassString(value.clear(), cass_row_get_column(row, 0));
  3487. merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
  3488. }
  3489. }
  3490. else if (sortByThorTime || !thorTimeThreshold.isEmpty())
  3491. {
  3492. merger->addPostFilters(goodFilters, 0);
  3493. merger->addPostFilters(poorFilters, 0);
  3494. merger->addPostFilters(remoteWildFilters, 0);
  3495. if (wuidFilters.length())
  3496. {
  3497. // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
  3498. // We need two queries - one where searchField==startSearchField and wuid > startWuid,
  3499. // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
  3500. // though there may be postfilters
  3501. assertex(wuidFilters.length()==1);
  3502. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3503. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3504. merger->setCompareColumn(-1); // we want to preserve the order of these two results
  3505. }
  3506. else
  3507. merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3508. }
  3509. else if (goodFilters.length())
  3510. {
  3511. merger->addPostFilters(goodFilters, 1);
  3512. merger->addPostFilters(poorFilters, 0);
  3513. merger->addPostFilters(remoteWildFilters, 0);
  3514. const IPostFilter &best = goodFilters.item(0);
  3515. const char *queryValue = best.queryValue();
  3516. if (strchr(queryValue, '|'))
  3517. {
  3518. StringArray values;
  3519. values.appendListUniq(queryValue, "|");
  3520. ForEachItemIn(vidx, values)
  3521. {
  3522. const char *thisValue = values.item(vidx);
  3523. if (!isEmptyString(thisValue))
  3524. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3525. }
  3526. }
  3527. else
  3528. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3529. }
  3530. else if (poorFilters.length())
  3531. {
  3532. merger->addPostFilters(poorFilters, 1);
  3533. merger->addPostFilters(remoteWildFilters, 0);
  3534. const IPostFilter &best= poorFilters.item(0);
  3535. const char *queryValue =best.queryValue();
  3536. if (strchr(queryValue, '|'))
  3537. {
  3538. StringArray values;
  3539. values.appendListUniq(queryValue, "|");
  3540. ForEachItemIn(vidx, values)
  3541. {
  3542. const char *thisValue = values.item(vidx);
  3543. if (!isEmptyString(thisValue))
  3544. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3545. }
  3546. }
  3547. else
  3548. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3549. }
  3550. else if (remoteWildFilters.length())
  3551. {
  3552. merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
  3553. // Convert into a value IN [] which we do via a merge
  3554. // NOTE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
  3555. StringArray fieldValues;
  3556. const IPostFilter &best= remoteWildFilters.item(0);
  3557. _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
  3558. ForEachItemIn(idx, fieldValues)
  3559. {
  3560. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), fieldValues.item(idx), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3561. }
  3562. }
  3563. else
  3564. {
  3565. // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
  3566. for (int i = 0; i < partitions; i++)
  3567. {
  3568. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3569. }
  3570. }
  3571. // The result we have will be sorted by wuid (ascending or descending)
  3572. if (needsPostSort)
  3573. {
  3574. // A post-sort will be required.
  3575. // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
  3576. result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
  3577. cached->setResult(result);
  3578. }
  3579. else
  3580. result.setown(merger.getClear());
  3581. }
  3582. if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
  3583. result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
  3584. if (cachehint)
  3585. {
  3586. *cachehint = cached->queryHint();
  3587. CriticalBlock b(cacheCrit);
  3588. cacheIdMap.setValue(*cachehint, cached); // Links its parameter
  3589. }
  3590. if (total)
  3591. *total = 0; // We don't know
  3592. return result.getClear();
  3593. }
  3594. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  3595. {
  3596. return _getUniqueValues(queryFilterXPath(field), prefix, result);
  3597. }
  3598. virtual unsigned numWorkUnits()
  3599. {
  3600. unsigned total = 0;
  3601. CIArrayOf<CassandraFuture> futures;
  3602. for (int i = 0; i < partitions; i++)
  3603. {
  3604. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=?;"));
  3605. statement.bindInt32(0, i);
  3606. futures.append(*new CassandraFuture(cass_session_execute(querySession(), statement)));
  3607. }
  3608. ForEachItemIn(idx, futures)
  3609. {
  3610. CassandraFuture &future = futures.item(idx);
  3611. future.wait("select count(*)");
  3612. CassandraResult result(cass_future_get_result(future));
  3613. total += getUnsignedResult(NULL, getSingleResult(result));
  3614. }
  3615. return total;
  3616. }
  3617. /*
  3618. virtual bool isAborting(const char *wuid) const - done in the base class using dali
  3619. virtual void clearAborting(const char *wuid) - done in the base class using dali
  3620. */
  3621. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates)
  3622. {
  3623. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
  3624. LocalIAbortHandler abortHandler(*waiter);
  3625. CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
  3626. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3627. statement.bindString(1, wuid);
  3628. SessionId agent = 0;
  3629. bool agentSessionStopped = false;
  3630. unsigned start = msTick();
  3631. for (;;)
  3632. {
  3633. CassandraFuture future(cass_session_execute(querySession(), statement));
  3634. future.wait("Lookup wu state");
  3635. CassandraResult result(cass_future_get_result(future));
  3636. const CassRow *row = cass_result_first_row(result);
  3637. if (!row)
  3638. return WUStateUnknown;
  3639. const CassValue *stateVal = cass_row_get_column(row, 0);
  3640. if (!stateVal)
  3641. return WUStateUnknown;
  3642. StringBuffer stateStr;
  3643. getCassString(stateStr, stateVal);
  3644. WUState state = getWorkUnitState(stateStr);
  3645. auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
  3646. if (it != expectedStates.end())
  3647. return state;
  3648. switch (state)
  3649. {
  3650. case WUStateCompiled:
  3651. case WUStateUploadingFiles:
  3652. if (compiled)
  3653. return state;
  3654. break;
  3655. case WUStateCompleted:
  3656. case WUStateFailed:
  3657. case WUStateAborted:
  3658. return state;
  3659. case WUStateWait:
  3660. break;
  3661. case WUStateCompiling:
  3662. case WUStateRunning:
  3663. case WUStateDebugPaused:
  3664. case WUStateDebugRunning:
  3665. case WUStateBlocked:
  3666. case WUStateAborting:
  3667. if (agentSessionStopped)
  3668. {
  3669. reportAbnormalTermination(wuid, state, agent);
  3670. return state;
  3671. }
  3672. if (queryDaliServerVersion().compare("2.1")>=0)
  3673. {
  3674. agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  3675. if(agent && querySessionManager().sessionStopped(agent, 0))
  3676. {
  3677. agentSessionStopped = true;
  3678. continue;
  3679. }
  3680. }
  3681. break;
  3682. }
  3683. agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
  3684. unsigned waited = msTick() - start;
  3685. if (timeout==-1 || waited + 20000 < timeout)
  3686. {
  3687. waiter->wait(20000); // recheck state every 20 seconds, in case eclagent has crashed.
  3688. if (waiter->isAborted())
  3689. return WUStateUnknown; // MORE - throw an exception?
  3690. }
  3691. else if (waited > timeout || !waiter->wait(timeout-waited))
  3692. return WUStateUnknown; // MORE - throw an exception?
  3693. }
  3694. }
  3695. virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
  3696. {
  3697. StringAttr origStr(getWorkunitActionStr(original));
  3698. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
  3699. LocalIAbortHandler abortHandler(*waiter);
  3700. CassandraStatement statement(prepareStatement("select action from workunits where partition=? and wuid=?;"));
  3701. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3702. statement.bindString(1, wuid);
  3703. WUAction ret = WUActionUnknown;
  3704. for (;;)
  3705. {
  3706. CassandraFuture future(cass_session_execute(querySession(), statement));
  3707. future.wait("Lookup wu action");
  3708. CassandraResult result(cass_future_get_result(future));
  3709. const CassRow *row = cass_result_first_row(result);
  3710. if (!row)
  3711. {
  3712. PROGLOG("While waiting for job %s, WU no longer exists", wuid);
  3713. break;
  3714. }
  3715. const CassValue *actionVal = cass_row_get_column(row, 0);
  3716. if (!actionVal)
  3717. {
  3718. PROGLOG("While waiting for job %s, WU action cannot be read", wuid);
  3719. break;
  3720. }
  3721. StringBuffer actionStr;
  3722. getCassString(actionStr, actionVal);
  3723. if (!streq(actionStr, origStr))
  3724. {
  3725. ret = getWorkunitAction(actionStr);
  3726. break;
  3727. }
  3728. waiter->wait(10000); // recheck state every 20 seconds even if no notifications... just because we used to before
  3729. if (waiter->isAborted())
  3730. break;
  3731. }
  3732. return ret;
  3733. }
  3734. unsigned validateRepository(bool fix)
  3735. {
  3736. unsigned errCount = 0;
  3737. // 1. Check that every entry in main wu table has matching entries in secondary tables
  3738. CassandraResult result(fetchData(workunitInfoMappings+1));
  3739. CassandraIterator rows(cass_iterator_from_result(result));
  3740. if (fix)
  3741. {
  3742. // Delete the unique values table - the validate process recreates it afresh
  3743. executeSimpleCommand(querySession(), "TRUNCATE uniqueSearchValues;");
  3744. }
  3745. while (cass_iterator_next(rows))
  3746. {
  3747. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
  3748. const char *wuid = wuXML->queryName();
  3749. // For each search entry, check that we get matching XML
  3750. for (const char * const *search = searchPaths; *search; search++)
  3751. errCount += validateSearch(*search, wuid, wuXML, fix);
  3752. }
  3753. // 2. Check that there are no orphaned entries in search or child tables
  3754. errCount += checkOrphans(searchMappings, 3, fix);
  3755. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3756. errCount += checkOrphans(table[0]->mappings, 1, fix);
  3757. errCount += checkOrphans(wuGraphProgressMappings, 1, fix);
  3758. errCount += checkOrphans(wuGraphStateMappings, 1, fix);
  3759. errCount += checkOrphans(wuGraphRunningMappings, 1, fix);
  3760. return errCount;
  3761. }
  3762. virtual void deleteRepository(bool recreate)
  3763. {
  3764. // USE WITH CARE!
  3765. CassandraSession s(cass_session_new());
  3766. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3767. future.wait("connect without keyspace to delete");
  3768. VStringBuffer deleteKeyspace("DROP KEYSPACE IF EXISTS %s;", cluster.queryKeySpace());
  3769. executeSimpleCommand(s, deleteKeyspace);
  3770. s.set(NULL);
  3771. cluster.disconnect();
  3772. if (recreate)
  3773. createRepository();
  3774. }
  3775. virtual void createRepository()
  3776. {
  3777. cluster.disconnect();
  3778. CassandraSession s(cass_session_new());
  3779. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3780. future.wait("connect without keyspace");
  3781. VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
  3782. executeSimpleCommand(s, create);
  3783. s.set(NULL);
  3784. cluster.connect();
  3785. createVersionTable(false);
  3786. ensureTable(querySession(), workunitsMappings);
  3787. ensureTable(querySession(), searchMappings);
  3788. ensureTable(querySession(), uniqueSearchMappings);
  3789. ensureTable(querySession(), filesSearchMappings);
  3790. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3791. ensureTable(querySession(), table[0]->mappings);
  3792. ensureTable(querySession(), wuGraphProgressMappings);
  3793. ensureTable(querySession(), wuGraphStateMappings);
  3794. ensureTable(querySession(), wuGraphRunningMappings);
  3795. }
  3796. virtual const char *queryStoreType() const
  3797. {
  3798. return "Cassandra";
  3799. }
  3800. // Interface ICassandraSession
  3801. virtual CassSession *querySession() const { return cluster.querySession(); };
  3802. virtual unsigned queryTraceLevel() const { return traceLevel; };
  3803. virtual CassandraPrepared *prepareStatement(const char *query) const
  3804. {
  3805. return cluster.prepareStatement(query, traceLevel>=2);
  3806. }
  3807. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const override
  3808. {
  3809. if (batch.ordinality())
  3810. {
  3811. if (queryTraceLevel() > 1)
  3812. DBGLOG("Executing async batch %s", what);
  3813. cluster.executeAsync(batch, what);
  3814. }
  3815. }
  3816. virtual unsigned queryPartitions() const override
  3817. {
  3818. return partitions;
  3819. }
  3820. virtual unsigned queryPrefixSize() const override
  3821. {
  3822. return prefixSize;
  3823. }
  3824. private:
  3825. virtual void executeBatch(CassandraBatch &batch, const char *what) const
  3826. {
  3827. if (queryTraceLevel() > 1)
  3828. DBGLOG("Executing batch %s", what);
  3829. CassandraFuture futureBatch(cass_session_execute_batch(querySession(), batch));
  3830. futureBatch.wait(what);
  3831. }
  3832. void createVersionTable(bool force)
  3833. {
  3834. StringBuffer schema;
  3835. executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
  3836. Owned<IPTree> oldVersion = getVersionInfo();
  3837. if (force || !oldVersion)
  3838. {
  3839. VStringBuffer versionInfo("<Version major='%d' minor='%d' numPartitions='%d' searchPrefixSize='%d'/>", majorVersion, minorVersion, partitions, prefixSize);
  3840. CassandraBatch versionBatch(CASS_BATCH_TYPE_LOGGED);
  3841. Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
  3842. for (int i = 0; i < DEFAULT_PARTITIONS; i++) // NOTE - version table always has DEFAULT_PARTITIONS partitions
  3843. {
  3844. pt->setPropInt("@partition", i);
  3845. simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
  3846. }
  3847. executeBatch(versionBatch, "createVersionTable");
  3848. }
  3849. }
  3850. IPTree *getVersionInfo()
  3851. {
  3852. try
  3853. {
  3854. StringBuffer names;
  3855. StringBuffer tableName;
  3856. getFieldNames(versionMappings, names, tableName);
  3857. VStringBuffer selectQuery("select %s from %s where partition=?;", names.str()+1, tableName.str());
  3858. CassandraStatement select(prepareStatement(selectQuery));
  3859. select.bindInt32(0, rand_r(&randState) % DEFAULT_PARTITIONS); // NOTE - version table always has DEFAULT_PARTITIONS partitions
  3860. CassandraFuture future(cass_session_execute(querySession(), select));
  3861. future.wait("read version");
  3862. CassandraResult result(cass_future_get_result(future));
  3863. const CassRow *row = cass_result_first_row(result);
  3864. if (row)
  3865. return rowToPTree(NULL, NULL, versionMappings, row);
  3866. }
  3867. catch (IException *E)
  3868. {
  3869. EXCLOG(E);
  3870. E->Release();
  3871. }
  3872. catch (...)
  3873. {
  3874. DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
  3875. }
  3876. return NULL;
  3877. }
  3878. bool checkWuExists(const char *wuid)
  3879. {
  3880. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));
  3881. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3882. statement.bindString(1, wuid);
  3883. CassandraFuture future(cass_session_execute(querySession(), statement));
  3884. future.wait("select count(*)");
  3885. CassandraResult result(cass_future_get_result(future));
  3886. return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
  3887. }
  3888. void mergeFilter(IArrayOf<IPostFilter> &filters, WUSortField field, const char *value)
  3889. {
  3890. // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
  3891. ForEachItemIn(idx, filters)
  3892. {
  3893. PostFilter &filter = static_cast<PostFilter &>(filters.item(idx));
  3894. if (filter.queryField()==field)
  3895. {
  3896. const char *prevLimit = filter.queryValue();
  3897. int diff = strcmp(prevLimit, value);
  3898. if (diff && ((diff < 0) == (field==WUSFwuid)))
  3899. filter.setValue(value);
  3900. return;
  3901. }
  3902. }
  3903. // Not found - add new filter
  3904. filters.append(*new PostFilter(field, value, true));
  3905. }
  3906. IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
  3907. {
  3908. Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
  3909. if (!key || !*key)
  3910. {
  3911. IArrayOf<IPostFilter> wuidFilters;
  3912. for (int i = 0; i < partitions; i++)
  3913. {
  3914. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
  3915. }
  3916. }
  3917. else
  3918. merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
  3919. return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
  3920. }
  3921. StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
  3922. {
  3923. if (prefix && strlen(prefix) >= prefixSize)
  3924. {
  3925. CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
  3926. CassandraIterator rows(cass_iterator_from_result(r));
  3927. StringBuffer value;
  3928. while (cass_iterator_next(rows))
  3929. {
  3930. const CassRow *row = cass_iterator_get_row(rows);
  3931. getCassString(value.clear(), cass_row_get_column(row, 0));
  3932. result.append(value);
  3933. }
  3934. }
  3935. return result;
  3936. }
  3937. unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, bool fix)
  3938. {
  3939. unsigned errCount = 0;
  3940. const char *childKey = wuXML->queryProp(xpath);
  3941. if (childKey && *childKey)
  3942. {
  3943. CIArrayOf<CassandraStatement> batch;
  3944. CIArrayOf<CassandraStatement> deletes;
  3945. CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
  3946. if (fix)
  3947. simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
  3948. switch (cass_result_row_count(result))
  3949. {
  3950. case 0:
  3951. DBGLOG("Missing search data for %s for wuid=%s key=%s", xpath, wuid, childKey);
  3952. if (fix)
  3953. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3954. errCount++;
  3955. break;
  3956. case 1:
  3957. {
  3958. Owned<IPTree> secXML = rowToPTree(xpath, childKey, searchMappings+4, cass_result_first_row(result)); // type, prefix, key, and wuid are not returned
  3959. secXML->renameProp("/", wuid);
  3960. if (!areMatchingPTrees(wuXML, secXML))
  3961. {
  3962. DBGLOG("Mismatched search data for %s for wuid %s", xpath, wuid);
  3963. if (fix)
  3964. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3965. errCount++;
  3966. }
  3967. break;
  3968. }
  3969. default:
  3970. DBGLOG("Multiple secondary data %d for %s for wuid %s", (int) cass_result_row_count(result), xpath, wuid); // This should be impossible!
  3971. if (fix)
  3972. {
  3973. deleteSecondaryByKey(xpath, childKey, wuid, this, deletes);
  3974. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3975. }
  3976. break;
  3977. }
  3978. if (fix)
  3979. {
  3980. executeAsync(deletes, "delete search");
  3981. executeAsync(batch, "fix search");
  3982. }
  3983. }
  3984. return errCount;
  3985. }
  3986. unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, bool fix)
  3987. {
  3988. unsigned errCount = 0;
  3989. CassandraResult result(fetchData(mappings));
  3990. CassandraIterator rows(cass_iterator_from_result(result));
  3991. while (cass_iterator_next(rows))
  3992. {
  3993. const CassRow *row = cass_iterator_get_row(rows);
  3994. StringBuffer wuid;
  3995. getCassString(wuid, cass_row_get_column(row, wuidIndex));
  3996. if (!streq(wuid, GLOBAL_WORKUNIT) && !checkWuExists(wuid))
  3997. {
  3998. DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
  3999. if (fix)
  4000. {
  4001. if (wuidIndex)
  4002. {
  4003. CIArrayOf<CassandraStatement> secondaryBatch;
  4004. StringBuffer xpath, fieldValue;
  4005. getCassString(xpath, cass_row_get_column(row, 0));
  4006. getCassString(fieldValue, cass_row_get_column(row, 2));
  4007. deleteSecondaryByKey(xpath, fieldValue, wuid, this, secondaryBatch);
  4008. executeAsync(secondaryBatch, "Delete orphans");
  4009. }
  4010. else
  4011. {
  4012. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  4013. deleteChildByWuid(mappings, wuid, batch);
  4014. executeBatch(batch, "Delete orphans");
  4015. }
  4016. }
  4017. errCount++;
  4018. }
  4019. }
  4020. return errCount;
  4021. }
  4022. IPTree *cassandraToWorkunitXML(const char *wuid) const
  4023. {
  4024. CassandraResult result(fetchDataForWuid(workunitsMappings, wuid, false));
  4025. CassandraIterator rows(cass_iterator_from_result(result));
  4026. if (cass_iterator_next(rows)) // should just be one
  4027. {
  4028. Owned<IPTree> wuXML = createPTree(wuid);
  4029. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  4030. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  4031. unsigned colidx = 2; // wuid and partition are not returned
  4032. while (cass_iterator_next(cols))
  4033. {
  4034. assertex(workunitsMappings[colidx].columnName);
  4035. const CassValue *value = cass_iterator_get_column(cols);
  4036. if (value && !cass_value_is_null(value))
  4037. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  4038. colidx++;
  4039. }
  4040. return wuXML.getClear();
  4041. }
  4042. else
  4043. return NULL;
  4044. }
  4045. // Fetch all rows from a table
  4046. const CassResult *fetchData(const CassandraXmlMapping *mappings) const
  4047. {
  4048. StringBuffer names;
  4049. StringBuffer tableName;
  4050. getFieldNames(mappings, names, tableName);
  4051. VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
  4052. if (traceLevel >= 2)
  4053. DBGLOG("%s", selectQuery.str());
  4054. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  4055. return executeQuery(querySession(), statement);
  4056. }
  4057. // Fetch all rows from a single partition of a table
  4058. const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
  4059. {
  4060. StringBuffer names;
  4061. StringBuffer tableName;
  4062. getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
  4063. VStringBuffer selectQuery("select %s from %s where partition=?", names.str()+1, tableName.str());
  4064. ForEachItemIn(idx, wuidFilters)
  4065. {
  4066. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4067. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4068. }
  4069. switch (sortOrder)
  4070. {
  4071. case WUSFwuid:
  4072. selectQuery.append(" ORDER BY WUID ASC");
  4073. break;
  4074. case WUSFwuid|WUSFreverse:
  4075. // If not wuid, descending, we will have to post-sort
  4076. selectQuery.append(" ORDER BY WUID DESC");
  4077. break;
  4078. default:
  4079. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  4080. selectQuery.append(" ORDER BY WUID DESC");
  4081. if (!limit)
  4082. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  4083. break;
  4084. }
  4085. if (limit)
  4086. selectQuery.appendf(" LIMIT %u", limit);
  4087. selectQuery.append(';');
  4088. CassandraStatement select(prepareStatement(selectQuery));
  4089. select.bindInt32(0, partition);
  4090. ForEachItemIn(idx2, wuidFilters)
  4091. {
  4092. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4093. select.bindString(idx2+1, wuidFilter.queryValue());
  4094. }
  4095. return executeQuery(querySession(), select);
  4096. }
  4097. // Fetch matching rows from a child table, or the main wu table
  4098. const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const
  4099. {
  4100. assertex(wuid && *wuid);
  4101. StringBuffer names;
  4102. StringBuffer tableName;
  4103. getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName); // mappings+2 means we don't return the partition or wuid columns
  4104. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=?;", names.str()+1, tableName.str());
  4105. CassandraStatement select(prepareStatement(selectQuery));
  4106. select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  4107. select.bindString(1, wuid);
  4108. return executeQuery(querySession(), select);
  4109. }
  4110. const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const
  4111. {
  4112. assertex(wuid && *wuid);
  4113. StringBuffer names;
  4114. StringBuffer tableName;
  4115. getFieldNames(mappings+2, names, tableName); // mappings+2 means we don't return the partition or wuid columns. We do return the key.
  4116. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=? and %s=?;", names.str()+1, tableName.str(), mappings[2].columnName);
  4117. CassandraStatement select(prepareStatement(selectQuery));
  4118. select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  4119. select.bindString(1, wuid);
  4120. select.bindString(2, key);
  4121. return executeQuery(querySession(), select);
  4122. }
  4123. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  4124. const CassResult *fetchDataForKey(const char *xpath, const char *key) const
  4125. {
  4126. assertex(key);
  4127. StringBuffer names;
  4128. StringBuffer tableName;
  4129. StringBuffer ucKey(key);
  4130. ucKey.toUpperCase();
  4131. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4132. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  4133. selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
  4134. CassandraStatement select(prepareStatement(selectQuery));
  4135. select.bindString(0, xpath);
  4136. select.bindString_n(1, ucKey, prefixSize);
  4137. select.bindString(2, ucKey);
  4138. return executeQuery(querySession(), select);
  4139. }
  4140. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  4141. const CassResult *fetchDataForKeyWithFilter(const char *xpath, const char *key, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
  4142. {
  4143. StringBuffer names;
  4144. StringBuffer tableName;
  4145. StringBuffer ucKey(key);
  4146. ucKey.toUpperCase();
  4147. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4148. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  4149. ForEachItemIn(idx, wuidFilters)
  4150. {
  4151. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4152. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4153. }
  4154. switch (sortOrder)
  4155. {
  4156. case WUSFwuid:
  4157. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  4158. break;
  4159. case WUSFwuid|WUSFreverse:
  4160. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  4161. break;
  4162. default:
  4163. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  4164. selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
  4165. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  4166. break;
  4167. }
  4168. if (limit)
  4169. selectQuery.appendf(" LIMIT %u", limit);
  4170. CassandraStatement select(prepareStatement(selectQuery));
  4171. select.bindString(0, xpath);
  4172. select.bindString_n(1, ucKey, prefixSize);
  4173. select.bindString(2, ucKey);
  4174. ForEachItemIn(idx2, wuidFilters)
  4175. {
  4176. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4177. select.bindString(3+idx2, wuidFilter.queryValue());
  4178. }
  4179. return executeQuery(querySession(), select);
  4180. }
  4181. // Fetch matching rows from the search or uniqueSearch table, for a given prefix
  4182. const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
  4183. {
  4184. assertex(prefix && *prefix);
  4185. StringBuffer names;
  4186. StringBuffer tableName;
  4187. StringBuffer ucKey(prefix);
  4188. ucKey.toUpperCase();
  4189. StringBuffer ucKeyEnd(ucKey);
  4190. size32_t len = ucKeyEnd.length();
  4191. assertex(len);
  4192. ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
  4193. getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4194. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue>=? and fieldValue<?;", names.str()+1, tableName.str());
  4195. CassandraStatement select(prepareStatement(selectQuery));
  4196. select.bindString(0, xpath);
  4197. select.bindString_n(1, ucKey, prefixSize);
  4198. select.bindString(2, ucKey);
  4199. select.bindString(3, ucKeyEnd);
  4200. return executeQuery(querySession(), select);
  4201. }
  4202. // Fetch rows from the search table, by thorTime, above a threshold
  4203. const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
  4204. {
  4205. StringBuffer names;
  4206. StringBuffer tableName;
  4207. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4208. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
  4209. if (threshold && *threshold)
  4210. selectQuery.appendf(" and fieldValue >= ?");
  4211. if (descending)
  4212. selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
  4213. else
  4214. selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
  4215. if (limit)
  4216. selectQuery.appendf(" LIMIT %u", limit);
  4217. selectQuery.append(';');
  4218. CassandraStatement select(prepareStatement(selectQuery));
  4219. select.bindString(0, "@totalThorTime");
  4220. select.bindString_n(1, " ", prefixSize); // This would stop working if we ever set the search prefix to > 8 chars. So don't.
  4221. if (threshold && *threshold)
  4222. select.bindString(2, threshold);
  4223. return executeQuery(querySession(), select);
  4224. }
  4225. // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
  4226. // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
  4227. const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
  4228. {
  4229. StringBuffer names;
  4230. StringBuffer tableName;
  4231. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4232. const char *wuidTest;
  4233. const char *fieldTest;
  4234. if (descending)
  4235. {
  4236. wuidTest = ">";
  4237. fieldTest = wuid ? "=" : "<";
  4238. }
  4239. else
  4240. {
  4241. wuidTest = "<";
  4242. fieldTest = wuid ? "=" : ">";
  4243. }
  4244. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue %s ?", names.str()+1, tableName.str(), fieldTest);
  4245. if (wuid)
  4246. selectQuery.appendf(" and wuid %s ?", wuidTest);
  4247. if (descending)
  4248. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  4249. else
  4250. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  4251. if (limit)
  4252. selectQuery.appendf(" LIMIT %u", limit);
  4253. selectQuery.append(';');
  4254. CassandraStatement select(prepareStatement(selectQuery));
  4255. select.bindString(0, "@totalThorTime");
  4256. select.bindString_n(1, threshold, prefixSize);
  4257. select.bindString(2, threshold);
  4258. if (wuid)
  4259. select.bindString(3, wuid);
  4260. return executeQuery(querySession(), select);
  4261. }
  4262. // Fetch rows from the file search table (covers files read and files written)
  4263. const CassResult *fetchDataForFiles(const char *name, const IArrayOf<IPostFilter> &wuidFilters, bool read) const
  4264. {
  4265. StringBuffer names;
  4266. StringBuffer tableName;
  4267. getFieldNames(filesSearchMappings+2, names, tableName); // mappings+2 means we don't return the key columns (name and readmode)
  4268. VStringBuffer selectQuery("select %s from %s where name=? and read=?", names.str()+1, tableName.str());
  4269. ForEachItemIn(idx, wuidFilters)
  4270. {
  4271. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4272. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4273. }
  4274. CassandraStatement select(prepareStatement(selectQuery));
  4275. select.bindString(0, name);
  4276. select.bindBool(1, read ? cass_true : cass_false);
  4277. ForEachItemIn(idx2, wuidFilters)
  4278. {
  4279. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4280. select.bindString(idx2+2, wuidFilter.queryValue());
  4281. }
  4282. return executeQuery(querySession(), select);
  4283. }
  4284. // Fetch matching rows from the search table, for a single wuid
  4285. const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
  4286. {
  4287. assertex(key);
  4288. StringBuffer names;
  4289. StringBuffer tableName;
  4290. StringBuffer ucKey(key);
  4291. ucKey.toUpperCase();
  4292. getFieldNames(searchMappings+4, names, tableName); // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
  4293. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue =? and wuid=?;", names.str()+1, tableName.str());
  4294. CassandraStatement select(prepareStatement(selectQuery));
  4295. select.bindString(0, xpath);
  4296. select.bindString_n(1, ucKey, prefixSize);
  4297. select.bindString(2, ucKey);
  4298. select.bindString(3, wuid);
  4299. return executeQuery(querySession(), select);
  4300. }
  4301. // Delete matching rows from a child table
  4302. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const
  4303. {
  4304. StringBuffer names;
  4305. StringBuffer tableName;
  4306. getFieldNames(mappings, names, tableName);
  4307. VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
  4308. CassandraStatement update(prepareStatement(insertQuery));
  4309. update.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  4310. update.bindString(1, wuid);
  4311. check(cass_batch_add_statement(batch, update));
  4312. }
  4313. unsigned retireCache()
  4314. {
  4315. CriticalBlock b(cacheCrit); // Is this too coarse-grained?
  4316. unsigned expires = CASS_WU_QUERY_EXPIRES;
  4317. unsigned now = msTick();
  4318. ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
  4319. HashIterator iter(cacheIdMap);
  4320. ForEach(iter)
  4321. {
  4322. CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
  4323. unsigned age = now - entry->queryLastAccess();
  4324. int ttl = CASS_WU_QUERY_EXPIRES-age;
  4325. if (ttl<= 0)
  4326. goers.append(*entry);
  4327. else if (ttl< expires)
  4328. expires = ttl;
  4329. }
  4330. ForEachItemIn(idx, goers)
  4331. {
  4332. DBGLOG("Expiring cache entry %p", &goers.item(idx));
  4333. cacheIdMap.remove(goers.item(idx).queryHint());
  4334. }
  4335. return expires;
  4336. }
  4337. class CacheRetirer : public Thread
  4338. {
  4339. public:
  4340. CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
  4341. {
  4342. stopping = false;
  4343. }
  4344. virtual int run()
  4345. {
  4346. while (!stopping)
  4347. {
  4348. unsigned delay = parent.retireCache();
  4349. sem.wait(delay);
  4350. }
  4351. return 0;
  4352. }
  4353. void stop()
  4354. {
  4355. stopping = true;
  4356. sem.signal();
  4357. }
  4358. private:
  4359. Semaphore sem;
  4360. CCasssandraWorkUnitFactory &parent;
  4361. bool stopping;
  4362. } cacheRetirer;
  4363. unsigned randomizeSuffix;
  4364. unsigned traceLevel;
  4365. unsigned randState;
  4366. int partitions = DEFAULT_PARTITIONS;
  4367. int prefixSize = DEFAULT_PREFIX_SIZE;
  4368. CassandraClusterSession cluster;
  4369. mutable CriticalSection cacheCrit;
  4370. mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
  4371. };
  4372. } // namespace
  4373. extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const SharedObject *dll, const IPropertyTree *props)
  4374. {
  4375. return new cassandraembed::CCasssandraWorkUnitFactory(dll, props);
  4376. }