cassandraembed.cpp 210 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jsort.hpp"
  28. #include "jptree.hpp"
  29. #include "jregexp.hpp"
  30. #include "workunit.hpp"
  31. #include "workunit.ipp"
  32. #ifdef _WIN32
  33. #define EXPORT __declspec(dllexport)
  34. #else
  35. #define EXPORT
  36. #endif
  37. static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  38. static void UNSUPPORTED(const char *feature)
  39. {
  40. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  41. }
  42. static const char * compatibleVersions[] = {
  43. "Cassandra Embed Helper 1.0.0",
  44. NULL };
  45. static const char *version = "Cassandra Embed Helper 1.0.0";
  46. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  47. {
  48. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  49. {
  50. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  51. pbx->compatibleVersions = compatibleVersions;
  52. }
  53. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  54. return false;
  55. pb->magicVersion = PLUGIN_VERSION;
  56. pb->version = version;
  57. pb->moduleName = "cassandra";
  58. pb->ECL = NULL;
  59. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  60. pb->description = "Cassandra Embed Helper";
  61. return true;
  62. }
  63. namespace cassandraembed {
  64. static void logCallBack(const CassLogMessage *message, void *data)
  65. {
  66. DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
  67. }
  68. MODULE_INIT(INIT_PRIORITY_STANDARD)
  69. {
  70. cass_log_set_callback(logCallBack, NULL);
  71. cass_log_set_level(CASS_LOG_WARN);
  72. return true;
  73. }
  74. static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  75. static void fail(const char *msg) __attribute__((noreturn));
  76. static void failx(const char *message, ...)
  77. {
  78. va_list args;
  79. va_start(args,message);
  80. StringBuffer msg;
  81. msg.append("cassandra: ").valist_appendf(message,args);
  82. va_end(args);
  83. rtlFail(0, msg.str());
  84. }
  85. static void fail(const char *message)
  86. {
  87. StringBuffer msg;
  88. msg.append("cassandra: ").append(message);
  89. rtlFail(0, msg.str());
  90. }
  91. // Wrappers to Cassandra structures that require corresponding releases
  92. class CassandraCluster : public CInterface
  93. {
  94. public:
  95. CassandraCluster(CassCluster *_cluster) : cluster(_cluster), batchMode((CassBatchType) -1)
  96. {
  97. }
  98. void setOptions(const StringArray &options)
  99. {
  100. const char *contact_points = "localhost";
  101. const char *user = "";
  102. const char *password = "";
  103. ForEachItemIn(idx, options)
  104. {
  105. const char *opt = options.item(idx);
  106. const char *val = strchr(opt, '=');
  107. if (val)
  108. {
  109. StringBuffer optName(val-opt, opt);
  110. val++;
  111. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  112. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  113. else if (stricmp(optName, "user")==0)
  114. user = val;
  115. else if (stricmp(optName, "password")==0)
  116. password = val;
  117. else if (stricmp(optName, "keyspace")==0)
  118. keyspace.set(val);
  119. else if (stricmp(optName, "batch")==0)
  120. {
  121. if (stricmp(val, "LOGGED")==0)
  122. batchMode = CASS_BATCH_TYPE_LOGGED;
  123. else if (stricmp(val, "UNLOGGED")==0)
  124. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  125. else if (stricmp(val, "COUNTER")==0)
  126. batchMode = CASS_BATCH_TYPE_COUNTER;
  127. }
  128. else if (stricmp(optName, "port")==0)
  129. {
  130. unsigned port = getUnsignedOption(val, "port");
  131. checkSetOption(cass_cluster_set_port(cluster, port), "port");
  132. }
  133. else if (stricmp(optName, "protocol_version")==0)
  134. {
  135. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  136. checkSetOption(cass_cluster_set_protocol_version(cluster, protocol_version), "protocol_version");
  137. }
  138. else if (stricmp(optName, "num_threads_io")==0)
  139. {
  140. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  141. cass_cluster_set_num_threads_io(cluster, num_threads_io); // No status return
  142. }
  143. else if (stricmp(optName, "queue_size_io")==0)
  144. {
  145. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  146. checkSetOption(cass_cluster_set_queue_size_io(cluster, queue_size_io), "queue_size_io");
  147. }
  148. else if (stricmp(optName, "core_connections_per_host")==0)
  149. {
  150. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  151. checkSetOption(cass_cluster_set_core_connections_per_host(cluster, core_connections_per_host), "core_connections_per_host");
  152. }
  153. else if (stricmp(optName, "max_connections_per_host")==0)
  154. {
  155. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  156. checkSetOption(cass_cluster_set_max_connections_per_host(cluster, max_connections_per_host), "max_connections_per_host");
  157. }
  158. else if (stricmp(optName, "max_concurrent_creation")==0)
  159. {
  160. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  161. checkSetOption(cass_cluster_set_max_concurrent_creation(cluster, max_concurrent_creation), "max_concurrent_creation");
  162. }
  163. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  164. {
  165. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  166. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  167. }
  168. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  169. {
  170. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  171. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  172. }
  173. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  174. {
  175. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  176. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  177. }
  178. else if (stricmp(optName, "connect_timeout")==0)
  179. {
  180. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  181. cass_cluster_set_connect_timeout(cluster, connect_timeout);
  182. }
  183. else if (stricmp(optName, "request_timeout")==0)
  184. {
  185. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  186. cass_cluster_set_request_timeout(cluster, request_timeout);
  187. }
  188. else if (stricmp(optName, "load_balance_round_robin")==0)
  189. {
  190. cass_bool_t enable = getBoolOption(val, "load_balance_round_robin");
  191. if (enable==cass_true)
  192. cass_cluster_set_load_balance_round_robin(cluster);
  193. }
  194. else if (stricmp(optName, "load_balance_dc_aware")==0)
  195. {
  196. StringArray lbargs;
  197. lbargs.appendList(val, "|");
  198. if (lbargs.length() != 3)
  199. failx("Invalid value '%s' for option %s - expected 3 subvalues (separate with |)", val, optName.str());
  200. unsigned usedPerRemote = getUnsignedOption(lbargs.item(2), "load_balance_dc_aware");
  201. cass_bool_t allowRemote = getBoolOption(lbargs.item(2), "load_balance_dc_aware");
  202. checkSetOption(cass_cluster_set_load_balance_dc_aware(cluster, lbargs.item(0), usedPerRemote, allowRemote), "load_balance_dc_aware");
  203. }
  204. else if (stricmp(optName, "token_aware_routing")==0)
  205. {
  206. cass_bool_t enable = getBoolOption(val, "token_aware_routing");
  207. cass_cluster_set_token_aware_routing(cluster, enable);
  208. }
  209. else if (stricmp(optName, "latency_aware_routing")==0)
  210. {
  211. cass_bool_t enable = getBoolOption(val, "latency_aware_routing");
  212. cass_cluster_set_latency_aware_routing(cluster, enable);
  213. }
  214. else if (stricmp(optName, "latency_aware_routing_settings")==0)
  215. {
  216. StringArray subargs;
  217. subargs.appendList(val, "|");
  218. if (subargs.length() != 5)
  219. failx("Invalid value '%s' for option %s - expected 5 subvalues (separate with |)", val, optName.str());
  220. cass_double_t exclusion_threshold = getDoubleOption(subargs.item(0), "exclusion_threshold");
  221. cass_uint64_t scale_ms = getUnsigned64Option(subargs.item(1), "scale_ms");
  222. cass_uint64_t retry_period_ms = getUnsigned64Option(subargs.item(2), "retry_period_ms");
  223. cass_uint64_t update_rate_ms = getUnsigned64Option(subargs.item(3), "update_rate_ms");
  224. cass_uint64_t min_measured = getUnsigned64Option(subargs.item(4), "min_measured");
  225. cass_cluster_set_latency_aware_routing_settings(cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
  226. }
  227. else if (stricmp(optName, "tcp_nodelay")==0)
  228. {
  229. cass_bool_t enable = getBoolOption(val, "tcp_nodelay");
  230. cass_cluster_set_tcp_nodelay(cluster, enable);
  231. }
  232. else if (stricmp(optName, "tcp_keepalive")==0)
  233. {
  234. StringArray subargs;
  235. subargs.appendList(val, "|");
  236. if (subargs.length() != 2)
  237. failx("Invalid value '%s' for option %s - expected 2 subvalues (separate with |)", val, optName.str());
  238. cass_bool_t enabled = getBoolOption(subargs.item(0), "enabled");
  239. unsigned delay_secs = getUnsignedOption(subargs.item(0), "delay_secs");
  240. cass_cluster_set_tcp_keepalive(cluster, enabled, delay_secs);
  241. }
  242. else
  243. failx("Unrecognized option %s", optName.str());
  244. }
  245. }
  246. cass_cluster_set_contact_points(cluster, contact_points);
  247. if (*user || *password)
  248. cass_cluster_set_credentials(cluster, user, password);
  249. }
  250. ~CassandraCluster()
  251. {
  252. if (cluster)
  253. cass_cluster_free(cluster);
  254. }
  255. inline operator CassCluster *() const
  256. {
  257. return cluster;
  258. }
  259. private:
  260. void checkSetOption(CassError rc, const char *name)
  261. {
  262. if (rc != CASS_OK)
  263. {
  264. failx("While setting option %s: %s", name, cass_error_desc(rc));
  265. }
  266. }
  267. cass_bool_t getBoolOption(const char *val, const char *option)
  268. {
  269. return strToBool(val) ? cass_true : cass_false;
  270. }
  271. unsigned getUnsignedOption(const char *val, const char *option)
  272. {
  273. char *endp;
  274. long value = strtoul(val, &endp, 0);
  275. if (endp==val || *endp != '\0' || value > UINT_MAX || value < 0)
  276. failx("Invalid value '%s' for option %s", val, option);
  277. return (unsigned) value;
  278. }
  279. unsigned getDoubleOption(const char *val, const char *option)
  280. {
  281. char *endp;
  282. double value = strtod(val, &endp);
  283. if (endp==val || *endp != '\0')
  284. failx("Invalid value '%s' for option %s", val, option);
  285. return value;
  286. }
  287. __uint64 getUnsigned64Option(const char *val, const char *option)
  288. {
  289. // MORE - could check it's all digits (with optional leading spaces...), if we cared.
  290. return rtlVStrToUInt8(val);
  291. }
  292. CassandraCluster(const CassandraCluster &);
  293. CassCluster *cluster;
  294. public:
  295. // These are here as convenient to set from same options string. They are really properties of the session
  296. // rather than the cluster, but we have one session per cluster so we get away with it.
  297. CassBatchType batchMode;
  298. StringAttr keyspace;
  299. };
  300. class CassandraFuture : public CInterface
  301. {
  302. public:
  303. CassandraFuture(CassFuture *_future) : future(_future)
  304. {
  305. }
  306. ~CassandraFuture()
  307. {
  308. if (future)
  309. cass_future_free(future);
  310. }
  311. inline operator CassFuture *() const
  312. {
  313. return future;
  314. }
  315. void wait(const char *why)
  316. {
  317. cass_future_wait(future);
  318. CassError rc = cass_future_error_code(future);
  319. if(rc != CASS_OK)
  320. {
  321. const char *message;
  322. size_t length;
  323. cass_future_error_message(future, &message, &length);
  324. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
  325. #ifdef _DEBUG
  326. DBGLOG("%s", err.str());
  327. #endif
  328. rtlFail(0, err.str());
  329. }
  330. }
  331. void set(CassFuture *_future)
  332. {
  333. if (future)
  334. cass_future_free(future);
  335. future = _future;
  336. }
  337. private:
  338. CassandraFuture(const CassandraFuture &);
  339. CassFuture *future;
  340. };
  341. class CassandraSession : public CInterface
  342. {
  343. public:
  344. CassandraSession() : session(NULL) {}
  345. CassandraSession(CassSession *_session) : session(_session)
  346. {
  347. }
  348. ~CassandraSession()
  349. {
  350. set(NULL);
  351. }
  352. void set(CassSession *_session)
  353. {
  354. if (session)
  355. {
  356. CassandraFuture close_future(cass_session_close(session));
  357. cass_future_wait(close_future);
  358. cass_session_free(session);
  359. }
  360. session = _session;
  361. }
  362. inline operator CassSession *() const
  363. {
  364. return session;
  365. }
  366. private:
  367. CassandraSession(const CassandraSession &);
  368. CassSession *session;
  369. };
  370. class CassandraBatch : public CInterface
  371. {
  372. public:
  373. CassandraBatch(CassBatch *_batch) : batch(_batch)
  374. {
  375. }
  376. ~CassandraBatch()
  377. {
  378. if (batch)
  379. cass_batch_free(batch);
  380. }
  381. inline operator CassBatch *() const
  382. {
  383. return batch;
  384. }
  385. private:
  386. CassandraBatch(const CassandraBatch &);
  387. CassBatch *batch;
  388. };
  389. class CassandraStatement : public CInterface
  390. {
  391. public:
  392. CassandraStatement(CassStatement *_statement) : statement(_statement)
  393. {
  394. }
  395. CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
  396. {
  397. }
  398. ~CassandraStatement()
  399. {
  400. if (statement)
  401. cass_statement_free(statement);
  402. }
  403. inline operator CassStatement *() const
  404. {
  405. return statement;
  406. }
  407. private:
  408. CassandraStatement(const CassandraStatement &);
  409. CassStatement *statement;
  410. };
  411. class CassandraPrepared : public CInterfaceOf<IInterface>
  412. {
  413. public:
  414. CassandraPrepared(const CassPrepared *_prepared) : prepared(_prepared)
  415. {
  416. }
  417. ~CassandraPrepared()
  418. {
  419. if (prepared)
  420. cass_prepared_free(prepared);
  421. }
  422. inline operator const CassPrepared *() const
  423. {
  424. return prepared;
  425. }
  426. private:
  427. CassandraPrepared(const CassandraPrepared &);
  428. const CassPrepared *prepared;
  429. };
  430. class CassandraResult : public CInterfaceOf<IInterface>
  431. {
  432. public:
  433. CassandraResult(const CassResult *_result) : result(_result)
  434. {
  435. }
  436. ~CassandraResult()
  437. {
  438. if (result)
  439. cass_result_free(result);
  440. }
  441. inline operator const CassResult *() const
  442. {
  443. return result;
  444. }
  445. private:
  446. CassandraResult(const CassandraResult &);
  447. const CassResult *result;
  448. };
  449. class CassandraIterator : public CInterfaceOf<IInterface>
  450. {
  451. public:
  452. CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  453. {
  454. }
  455. ~CassandraIterator()
  456. {
  457. if (iterator)
  458. cass_iterator_free(iterator);
  459. }
  460. inline void set(CassIterator *_iterator)
  461. {
  462. if (iterator)
  463. cass_iterator_free(iterator);
  464. iterator = _iterator;
  465. }
  466. inline operator CassIterator *() const
  467. {
  468. return iterator;
  469. }
  470. protected:
  471. CassandraIterator(const CassandraIterator &);
  472. CassIterator *iterator;
  473. };
  474. class CassandraCollection : public CInterface
  475. {
  476. public:
  477. CassandraCollection(CassCollection *_collection) : collection(_collection)
  478. {
  479. }
  480. ~CassandraCollection()
  481. {
  482. if (collection)
  483. cass_collection_free(collection);
  484. }
  485. inline operator CassCollection *() const
  486. {
  487. return collection;
  488. }
  489. private:
  490. CassandraCollection(const CassandraCollection &);
  491. CassCollection *collection;
  492. };
  493. void check(CassError rc)
  494. {
  495. if (rc != CASS_OK)
  496. {
  497. fail(cass_error_desc(rc));
  498. }
  499. }
  500. class CassandraStatementInfo : public CInterface
  501. {
  502. public:
  503. IMPLEMENT_IINTERFACE;
  504. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode)
  505. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
  506. {
  507. assertex(prepared && *prepared);
  508. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  509. }
  510. ~CassandraStatementInfo()
  511. {
  512. stop();
  513. }
  514. inline void stop()
  515. {
  516. iterator.clear();
  517. result.clear();
  518. prepared.clear();
  519. }
  520. bool next()
  521. {
  522. if (!iterator)
  523. return false;
  524. return cass_iterator_next(*iterator);
  525. }
  526. void startStream()
  527. {
  528. if (batchMode != (CassBatchType) -1)
  529. {
  530. batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
  531. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  532. }
  533. }
  534. void endStream()
  535. {
  536. if (batch)
  537. {
  538. CassandraFuture future(cass_session_execute_batch(*session, *batch));
  539. future.wait("execute");
  540. result.setown(new CassandraResult(cass_future_get_result(future)));
  541. assertex (rowCount() == 0);
  542. }
  543. }
  544. void execute()
  545. {
  546. assertex(statement && *statement);
  547. if (batch)
  548. {
  549. check(cass_batch_add_statement(*batch, *statement));
  550. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  551. }
  552. else
  553. {
  554. CassandraFuture future(cass_session_execute(*session, *statement));
  555. future.wait("execute");
  556. result.setown(new CassandraResult(cass_future_get_result(future)));
  557. if (rowCount() > 0)
  558. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  559. }
  560. }
  561. inline size_t rowCount() const
  562. {
  563. return cass_result_row_count(*result);
  564. }
  565. inline bool hasResult() const
  566. {
  567. return result != NULL;
  568. }
  569. inline const CassRow *queryRow() const
  570. {
  571. assertex(iterator && *iterator);
  572. return cass_iterator_get_row(*iterator);
  573. }
  574. inline CassStatement *queryStatement() const
  575. {
  576. assertex(statement && *statement);
  577. return *statement;
  578. }
  579. protected:
  580. Linked<CassandraSession> session;
  581. Linked<CassandraPrepared> prepared;
  582. Owned<CassandraBatch> batch;
  583. Owned<CassandraStatement> statement;
  584. Owned<CassandraResult> result;
  585. Owned<CassandraIterator> iterator;
  586. unsigned numBindings;
  587. CassBatchType batchMode;
  588. };
  589. // Conversions from Cassandra values to ECL data
  590. static const char *getTypeName(CassValueType type)
  591. {
  592. switch (type)
  593. {
  594. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  595. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  596. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  597. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  598. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  599. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  600. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  601. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  602. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  603. case CASS_VALUE_TYPE_INT: return "INT";
  604. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  605. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  606. case CASS_VALUE_TYPE_UUID: return "UUID";
  607. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  608. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  609. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  610. case CASS_VALUE_TYPE_INET: return "INET";
  611. case CASS_VALUE_TYPE_LIST: return "LIST";
  612. case CASS_VALUE_TYPE_MAP: return "MAP";
  613. case CASS_VALUE_TYPE_SET: return "SET";
  614. default: return "UNKNOWN";
  615. }
  616. }
  617. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  618. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  619. {
  620. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  621. if (field)
  622. msg.appendf(" for field %s", field->name->str());
  623. if (value)
  624. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  625. rtlFail(0, msg.str());
  626. }
  627. static bool isInteger(const CassValueType t)
  628. {
  629. switch (t)
  630. {
  631. case CASS_VALUE_TYPE_TIMESTAMP:
  632. case CASS_VALUE_TYPE_INT:
  633. case CASS_VALUE_TYPE_BIGINT:
  634. case CASS_VALUE_TYPE_COUNTER:
  635. case CASS_VALUE_TYPE_VARINT:
  636. return true;
  637. default:
  638. return false;
  639. }
  640. }
  641. static bool isString(CassValueType t)
  642. {
  643. switch (t)
  644. {
  645. case CASS_VALUE_TYPE_VARCHAR:
  646. case CASS_VALUE_TYPE_TEXT:
  647. case CASS_VALUE_TYPE_ASCII:
  648. return true;
  649. default:
  650. return false;
  651. }
  652. }
  653. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  654. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  655. {
  656. const RtlTypeInfo *type = field->type;
  657. if ((type->fieldType & RFTMkind) == type_set)
  658. return type->queryChildType();
  659. else
  660. return type;
  661. }
  662. static int getNumFields(const RtlTypeInfo *record)
  663. {
  664. int count = 0;
  665. const RtlFieldInfo * const *fields = record->queryFields();
  666. assertex(fields);
  667. while (*fields++)
  668. count++;
  669. return count;
  670. }
  671. static bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  672. {
  673. if (cass_value_is_null(value))
  674. {
  675. NullFieldProcessor p(field);
  676. return p.boolResult;
  677. }
  678. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  679. typeError("boolean", value, field);
  680. cass_bool_t output;
  681. check(cass_value_get_bool(value, &output));
  682. return output != cass_false;
  683. }
  684. static void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  685. {
  686. if (cass_value_is_null(value))
  687. {
  688. NullFieldProcessor p(field);
  689. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  690. return;
  691. }
  692. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  693. // it seems like it could be more useful to support anything
  694. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  695. // typeError("blob", value, field);
  696. const cass_byte_t *bytes;
  697. size_t size;
  698. check(cass_value_get_bytes(value, &bytes, &size));
  699. rtlStrToDataX(chars, result, size, bytes);
  700. }
  701. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  702. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  703. static double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  704. {
  705. if (cass_value_is_null(value))
  706. {
  707. NullFieldProcessor p(field);
  708. return p.doubleResult;
  709. }
  710. else if (isInteger(cass_value_type(value)))
  711. return (double) getSignedResult(field, value);
  712. else switch (cass_value_type(value))
  713. {
  714. case CASS_VALUE_TYPE_FLOAT:
  715. {
  716. cass_float_t output_f;
  717. check(cass_value_get_float(value, &output_f));
  718. return output_f;
  719. }
  720. case CASS_VALUE_TYPE_DOUBLE:
  721. {
  722. cass_double_t output_d;
  723. check(cass_value_get_double(value, &output_d));
  724. return output_d;
  725. }
  726. default:
  727. typeError("double", value, field);
  728. }
  729. }
  730. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  731. {
  732. if (cass_value_is_null(value))
  733. {
  734. NullFieldProcessor p(field);
  735. return p.intResult;
  736. }
  737. switch (cass_value_type(value))
  738. {
  739. case CASS_VALUE_TYPE_INT:
  740. {
  741. cass_int32_t output;
  742. check(cass_value_get_int32(value, &output));
  743. return output;
  744. }
  745. case CASS_VALUE_TYPE_TIMESTAMP:
  746. case CASS_VALUE_TYPE_BIGINT:
  747. case CASS_VALUE_TYPE_COUNTER:
  748. case CASS_VALUE_TYPE_VARINT:
  749. {
  750. cass_int64_t output;
  751. check(cass_value_get_int64(value, &output));
  752. return output;
  753. }
  754. default:
  755. typeError("integer", value, field);
  756. }
  757. }
  758. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  759. {
  760. if (cass_value_is_null(value))
  761. {
  762. NullFieldProcessor p(field);
  763. return p.uintResult;
  764. }
  765. return (__uint64) getSignedResult(field, value);
  766. }
  767. static void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  768. {
  769. if (cass_value_is_null(value))
  770. {
  771. NullFieldProcessor p(field);
  772. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  773. return;
  774. }
  775. switch (cass_value_type(value))
  776. {
  777. case CASS_VALUE_TYPE_ASCII:
  778. {
  779. const char *output;
  780. size_t length;
  781. check(cass_value_get_string(value, &output, &length));
  782. rtlStrToStrX(chars, result, length, output);
  783. break;
  784. }
  785. case CASS_VALUE_TYPE_VARCHAR:
  786. case CASS_VALUE_TYPE_TEXT:
  787. {
  788. const char *output;
  789. size_t length;
  790. check(cass_value_get_string(value, &output, &length));
  791. unsigned numchars = rtlUtf8Length(length, output);
  792. rtlUtf8ToStrX(chars, result, numchars, output);
  793. break;
  794. }
  795. default:
  796. typeError("string", value, field);
  797. }
  798. }
  799. static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  800. {
  801. if (cass_value_is_null(value))
  802. {
  803. NullFieldProcessor p(field);
  804. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  805. return;
  806. }
  807. switch (cass_value_type(value))
  808. {
  809. case CASS_VALUE_TYPE_ASCII:
  810. {
  811. const char *output;
  812. size_t length;
  813. check(cass_value_get_string(value, &output, &length));
  814. rtlStrToUtf8X(chars, result, length, output);
  815. break;
  816. }
  817. case CASS_VALUE_TYPE_VARCHAR:
  818. case CASS_VALUE_TYPE_TEXT:
  819. {
  820. const char * output;
  821. size_t length;
  822. check(cass_value_get_string(value, &output, &length));
  823. unsigned numchars = rtlUtf8Length(length, output);
  824. rtlUtf8ToUtf8X(chars, result, numchars, output);
  825. break;
  826. }
  827. default:
  828. typeError("string", value, field);
  829. }
  830. }
  831. static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  832. {
  833. if (cass_value_is_null(value))
  834. {
  835. NullFieldProcessor p(field);
  836. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  837. return;
  838. }
  839. switch (cass_value_type(value))
  840. {
  841. case CASS_VALUE_TYPE_ASCII:
  842. {
  843. const char * output;
  844. size_t length;
  845. check(cass_value_get_string(value, &output, &length));
  846. rtlStrToUnicodeX(chars, result, length, output);
  847. break;
  848. }
  849. case CASS_VALUE_TYPE_VARCHAR:
  850. case CASS_VALUE_TYPE_TEXT:
  851. {
  852. const char * output;
  853. size_t length;
  854. check(cass_value_get_string(value, &output, &length));
  855. unsigned numchars = rtlUtf8Length(length, output);
  856. rtlUtf8ToUnicodeX(chars, result, numchars, output);
  857. break;
  858. }
  859. default:
  860. typeError("string", value, field);
  861. }
  862. }
  863. static void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  864. {
  865. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  866. if (cass_value_is_null(value))
  867. {
  868. NullFieldProcessor p(field);
  869. result.set(p.decimalResult);
  870. return;
  871. }
  872. size32_t chars;
  873. rtlDataAttr tempStr;
  874. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  875. result.setString(chars, tempStr.getstr());
  876. if (field)
  877. {
  878. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  879. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  880. }
  881. }
  882. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  883. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  884. {
  885. public:
  886. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  887. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  888. {
  889. }
  890. virtual bool getBooleanResult(const RtlFieldInfo *field)
  891. {
  892. return cassandraembed::getBooleanResult(field, nextField(field));
  893. }
  894. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  895. {
  896. cassandraembed::getDataResult(field, nextField(field), len, result);
  897. }
  898. virtual double getRealResult(const RtlFieldInfo *field)
  899. {
  900. return cassandraembed::getRealResult(field, nextField(field));
  901. }
  902. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  903. {
  904. return cassandraembed::getSignedResult(field, nextField(field));
  905. }
  906. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  907. {
  908. return cassandraembed::getUnsignedResult(field, nextField(field));
  909. }
  910. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  911. {
  912. cassandraembed::getStringResult(field, nextField(field), chars, result);
  913. }
  914. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  915. {
  916. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  917. }
  918. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  919. {
  920. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  921. }
  922. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  923. {
  924. cassandraembed::getDecimalResult(field, nextField(field), value);
  925. }
  926. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  927. {
  928. isAll = false;
  929. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  930. }
  931. virtual bool processNextSet(const RtlFieldInfo * field)
  932. {
  933. numIteratorFields = 1;
  934. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  935. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  936. }
  937. virtual void processBeginDataset(const RtlFieldInfo * field)
  938. {
  939. numIteratorFields = getNumFields(field->type->queryChildType());
  940. switch (numIteratorFields)
  941. {
  942. case 1:
  943. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  944. break;
  945. case 2:
  946. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  947. break;
  948. default:
  949. UNSUPPORTED("Nested datasets with > 2 fields");
  950. }
  951. }
  952. virtual void processBeginRow(const RtlFieldInfo * field)
  953. {
  954. }
  955. virtual bool processNextRow(const RtlFieldInfo * field)
  956. {
  957. nextIteratedField = 0;
  958. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  959. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  960. }
  961. virtual void processEndSet(const RtlFieldInfo * field)
  962. {
  963. iterator.clear();
  964. numIteratorFields = 0;
  965. }
  966. virtual void processEndDataset(const RtlFieldInfo * field)
  967. {
  968. iterator.clear();
  969. numIteratorFields = 0;
  970. }
  971. virtual void processEndRow(const RtlFieldInfo * field)
  972. {
  973. }
  974. protected:
  975. const CassValue *nextField(const RtlFieldInfo * field)
  976. {
  977. const CassValue *ret;
  978. if (iterator)
  979. {
  980. switch (numIteratorFields)
  981. {
  982. case 1:
  983. ret = cass_iterator_get_value(*iterator);
  984. break;
  985. case 2:
  986. if (nextIteratedField==0)
  987. ret = cass_iterator_get_map_key(*iterator);
  988. else
  989. ret = cass_iterator_get_map_value(*iterator);
  990. nextIteratedField++;
  991. break;
  992. default:
  993. throwUnexpected();
  994. }
  995. }
  996. else
  997. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  998. if (!ret)
  999. failx("Too many fields in ECL output row, reading field %s", field->name->getAtomNamePtr());
  1000. return ret;
  1001. }
  1002. const CassandraStatementInfo *stmtInfo;
  1003. Owned<CassandraIterator> iterator;
  1004. int colIdx;
  1005. int numIteratorFields;
  1006. int nextIteratedField;
  1007. };
  1008. // Bind Cassandra columns from an ECL record
  1009. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  1010. {
  1011. public:
  1012. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  1013. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  1014. {
  1015. }
  1016. int numFields()
  1017. {
  1018. int count = 0;
  1019. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  1020. assertex(fields);
  1021. while (*fields++)
  1022. count++;
  1023. return count;
  1024. }
  1025. void processRow(const byte *row)
  1026. {
  1027. thisParam = firstParam;
  1028. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  1029. }
  1030. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  1031. {
  1032. size32_t utf8chars;
  1033. rtlDataAttr utfText;
  1034. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  1035. if (collection)
  1036. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1037. field);
  1038. else
  1039. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1040. checkNextParam(field),
  1041. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1042. field);
  1043. }
  1044. virtual void processBool(bool value, const RtlFieldInfo * field)
  1045. {
  1046. if (collection)
  1047. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  1048. else
  1049. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  1050. }
  1051. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  1052. {
  1053. if (collection)
  1054. checkBind(cass_collection_append_bytes(*collection, (const cass_byte_t*) value, len), field);
  1055. else
  1056. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), (const cass_byte_t*) value, len), field);
  1057. }
  1058. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  1059. {
  1060. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  1061. {
  1062. if (collection)
  1063. checkBind(cass_collection_append_int64(*collection, value), field);
  1064. else
  1065. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1066. }
  1067. else
  1068. {
  1069. if (collection)
  1070. checkBind(cass_collection_append_int32(*collection, value), field);
  1071. else
  1072. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1073. }
  1074. }
  1075. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  1076. {
  1077. UNSUPPORTED("UNSIGNED columns");
  1078. }
  1079. virtual void processReal(double value, const RtlFieldInfo * field)
  1080. {
  1081. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  1082. {
  1083. if (collection)
  1084. checkBind(cass_collection_append_double(*collection, value), field);
  1085. else
  1086. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1087. }
  1088. else
  1089. {
  1090. if (collection)
  1091. checkBind(cass_collection_append_float(*collection, (float) value), field);
  1092. else
  1093. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  1094. }
  1095. }
  1096. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1097. {
  1098. Decimal val;
  1099. size32_t bytes;
  1100. rtlDataAttr decText;
  1101. val.setDecimal(digits, precision, value);
  1102. val.getStringX(bytes, decText.refstr());
  1103. processUtf8(bytes, decText.getstr(), field);
  1104. }
  1105. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1106. {
  1107. UNSUPPORTED("UNSIGNED decimals");
  1108. }
  1109. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  1110. {
  1111. size32_t utf8chars;
  1112. rtlDataAttr utfText;
  1113. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  1114. if (collection)
  1115. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1116. field);
  1117. else
  1118. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1119. checkNextParam(field),
  1120. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1121. field);
  1122. }
  1123. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1124. {
  1125. size32_t charCount;
  1126. rtlDataAttr text;
  1127. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1128. processUtf8(charCount, text.getstr(), field);
  1129. }
  1130. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  1131. {
  1132. if (collection)
  1133. checkBind(cass_collection_append_string_n(*collection, value, rtlUtf8Size(chars, value)), field);
  1134. else
  1135. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(field), value, rtlUtf8Size(chars, value)), field);
  1136. }
  1137. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1138. {
  1139. if (isAll)
  1140. UNSUPPORTED("SET(ALL)");
  1141. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  1142. return true;
  1143. }
  1144. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1145. {
  1146. // If there's a single field, assume we are mapping to a SET/LIST
  1147. // If there are two, assume it's a MAP
  1148. // Otherwise, fail
  1149. int numFields = getNumFields(field->type->queryChildType());
  1150. if (numFields < 1 || numFields > 2)
  1151. {
  1152. UNSUPPORTED("Nested datasets with > 2 fields");
  1153. }
  1154. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  1155. return true;
  1156. }
  1157. virtual bool processBeginRow(const RtlFieldInfo * field)
  1158. {
  1159. return true;
  1160. }
  1161. virtual void processEndSet(const RtlFieldInfo * field)
  1162. {
  1163. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1164. collection.clear();
  1165. }
  1166. virtual void processEndDataset(const RtlFieldInfo * field)
  1167. {
  1168. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1169. collection.clear();
  1170. }
  1171. virtual void processEndRow(const RtlFieldInfo * field)
  1172. {
  1173. }
  1174. protected:
  1175. inline unsigned checkNextParam(const RtlFieldInfo * field)
  1176. {
  1177. if (logctx.queryTraceLevel() > 4)
  1178. logctx.CTXLOG("Binding %s to %d", field->name->str(), thisParam);
  1179. return thisParam++;
  1180. }
  1181. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  1182. {
  1183. if (rc != CASS_OK)
  1184. {
  1185. failx("While binding parameter %s: %s", field->name->getAtomNamePtr(), cass_error_desc(rc));
  1186. }
  1187. }
  1188. const RtlTypeInfo *typeInfo;
  1189. const CassandraStatementInfo *stmtInfo;
  1190. Owned<CassandraCollection> collection;
  1191. const IContextLogger &logctx;
  1192. int firstParam;
  1193. RtlFieldStrInfo dummyField;
  1194. int thisParam;
  1195. };
  1196. //
  1197. class CassandraDatasetBinder : public CassandraRecordBinder
  1198. {
  1199. public:
  1200. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  1201. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  1202. {
  1203. }
  1204. bool bindNext()
  1205. {
  1206. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1207. if (!nextRow)
  1208. return false;
  1209. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1210. return true;
  1211. }
  1212. void executeAll(CassandraStatementInfo *stmtInfo)
  1213. {
  1214. stmtInfo->startStream();
  1215. while (bindNext())
  1216. {
  1217. stmtInfo->execute();
  1218. }
  1219. stmtInfo->endStream();
  1220. }
  1221. protected:
  1222. Owned<IRowStream> input;
  1223. };
  1224. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  1225. // interrogated to return each row of the result in turn
  1226. class CassandraRowStream : public CInterfaceOf<IRowStream>
  1227. {
  1228. public:
  1229. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1230. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1231. {
  1232. executePending = true;
  1233. eof = false;
  1234. }
  1235. virtual const void *nextRow()
  1236. {
  1237. // A little complex when streaming data in as well as out - want to execute for every input record
  1238. if (eof)
  1239. return NULL;
  1240. loop
  1241. {
  1242. if (executePending)
  1243. {
  1244. executePending = false;
  1245. if (inputStream && !inputStream->bindNext())
  1246. {
  1247. noteEOF();
  1248. return NULL;
  1249. }
  1250. stmtInfo->execute();
  1251. }
  1252. if (stmtInfo->next())
  1253. break;
  1254. if (inputStream)
  1255. executePending = true;
  1256. else
  1257. {
  1258. noteEOF();
  1259. return NULL;
  1260. }
  1261. }
  1262. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1263. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1264. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1265. assertex(typeInfo);
  1266. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1267. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1268. return rowBuilder.finalizeRowClear(len);
  1269. }
  1270. virtual void stop()
  1271. {
  1272. resultAllocator.clear();
  1273. stmtInfo->stop();
  1274. }
  1275. protected:
  1276. void noteEOF()
  1277. {
  1278. if (!eof)
  1279. {
  1280. eof = true;
  1281. stop();
  1282. }
  1283. }
  1284. Linked<CassandraDatasetBinder> inputStream;
  1285. Linked<CassandraStatementInfo> stmtInfo;
  1286. Linked<IEngineRowAllocator> resultAllocator;
  1287. bool executePending;
  1288. bool eof;
  1289. };
  1290. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1291. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1292. {
  1293. public:
  1294. CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
  1295. : logctx(_logctx), flags(_flags), nextParam(0), numParams(0)
  1296. {
  1297. StringArray opts;
  1298. opts.appendList(options, ",");
  1299. cluster.setown(new CassandraCluster(cass_cluster_new()));
  1300. cluster->setOptions(opts);
  1301. session.setown(new CassandraSession(cass_session_new()));
  1302. CassandraFuture future(cluster->keyspace.isEmpty() ? cass_session_connect(*session, *cluster) : cass_session_connect_keyspace(*session, *cluster, cluster->keyspace));
  1303. future.wait("connect");
  1304. }
  1305. virtual bool getBooleanResult()
  1306. {
  1307. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1308. checkSingleRow();
  1309. return ret;
  1310. }
  1311. virtual void getDataResult(size32_t &len, void * &result)
  1312. {
  1313. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1314. checkSingleRow();
  1315. }
  1316. virtual double getRealResult()
  1317. {
  1318. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1319. checkSingleRow();
  1320. return ret;
  1321. }
  1322. virtual __int64 getSignedResult()
  1323. {
  1324. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1325. checkSingleRow();
  1326. return ret;
  1327. }
  1328. virtual unsigned __int64 getUnsignedResult()
  1329. {
  1330. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1331. checkSingleRow();
  1332. return ret;
  1333. }
  1334. virtual void getStringResult(size32_t &chars, char * &result)
  1335. {
  1336. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1337. checkSingleRow();
  1338. }
  1339. virtual void getUTF8Result(size32_t &chars, char * &result)
  1340. {
  1341. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1342. checkSingleRow();
  1343. }
  1344. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1345. {
  1346. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1347. checkSingleRow();
  1348. }
  1349. virtual void getDecimalResult(Decimal &value)
  1350. {
  1351. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1352. checkSingleRow();
  1353. }
  1354. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1355. {
  1356. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1357. rtlRowBuilder out;
  1358. byte *outData = NULL;
  1359. size32_t outBytes = 0;
  1360. while (cass_iterator_next(iterator))
  1361. {
  1362. const CassValue *value = cass_iterator_get_value(iterator);
  1363. assertex(value);
  1364. if (elemSize != UNKNOWN_LENGTH)
  1365. {
  1366. out.ensureAvailable(outBytes + elemSize);
  1367. outData = out.getbytes() + outBytes;
  1368. }
  1369. switch ((type_t) elemType)
  1370. {
  1371. case type_int:
  1372. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1373. break;
  1374. case type_unsigned:
  1375. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1376. break;
  1377. case type_real:
  1378. if (elemSize == sizeof(double))
  1379. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1380. else
  1381. {
  1382. assertex(elemSize == sizeof(float));
  1383. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1384. }
  1385. break;
  1386. case type_boolean:
  1387. assertex(elemSize == sizeof(bool));
  1388. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1389. break;
  1390. case type_string:
  1391. case type_varstring:
  1392. {
  1393. rtlDataAttr str;
  1394. size32_t lenBytes;
  1395. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1396. if (elemSize == UNKNOWN_LENGTH)
  1397. {
  1398. if (elemType == type_string)
  1399. {
  1400. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1401. outData = out.getbytes() + outBytes;
  1402. * (size32_t *) outData = lenBytes;
  1403. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1404. outBytes += lenBytes + sizeof(size32_t);
  1405. }
  1406. else
  1407. {
  1408. out.ensureAvailable(outBytes + lenBytes + 1);
  1409. outData = out.getbytes() + outBytes;
  1410. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1411. outBytes += lenBytes + 1;
  1412. }
  1413. }
  1414. else
  1415. {
  1416. if (elemType == type_string)
  1417. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1418. else
  1419. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1420. }
  1421. break;
  1422. }
  1423. case type_unicode:
  1424. case type_utf8:
  1425. {
  1426. rtlDataAttr str;
  1427. size32_t lenChars;
  1428. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1429. const char * text = str.getstr();
  1430. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1431. if (elemType == type_utf8)
  1432. {
  1433. assertex (elemSize == UNKNOWN_LENGTH);
  1434. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1435. outData = out.getbytes() + outBytes;
  1436. * (size32_t *) outData = lenChars;
  1437. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1438. outBytes += lenBytes + sizeof(size32_t);
  1439. }
  1440. else
  1441. {
  1442. if (elemSize == UNKNOWN_LENGTH)
  1443. {
  1444. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1445. size32_t numchars16;
  1446. rtlDataAttr unicode16;
  1447. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1448. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1449. outData = out.getbytes() + outBytes;
  1450. * (size32_t *) outData = numchars16;
  1451. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1452. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1453. }
  1454. else
  1455. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1456. }
  1457. break;
  1458. }
  1459. default:
  1460. fail("type mismatch - unsupported return type");
  1461. }
  1462. if (elemSize != UNKNOWN_LENGTH)
  1463. outBytes += elemSize;
  1464. }
  1465. __isAllResult = false;
  1466. __resultBytes = outBytes;
  1467. __result = out.detachdata();
  1468. }
  1469. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1470. {
  1471. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1472. }
  1473. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1474. {
  1475. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1476. typeError("row", NULL, NULL);
  1477. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1478. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1479. stream.stop();
  1480. if (ret == NULL) // Check for exactly one returned row
  1481. typeError("row", NULL, NULL);
  1482. return (byte *) ret.getClear();
  1483. }
  1484. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1485. {
  1486. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1487. typeError("row", NULL, NULL);
  1488. if (!stmtInfo->next())
  1489. fail("Failed to read row");
  1490. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1491. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1492. assertex(typeInfo);
  1493. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1494. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1495. }
  1496. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1497. {
  1498. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1499. binder.processRow(val);
  1500. nextParam += binder.numFields();
  1501. }
  1502. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1503. {
  1504. // We only support a single dataset parameter...
  1505. // MORE - look into batch?
  1506. if (inputStream)
  1507. {
  1508. fail("At most one dataset parameter supported");
  1509. }
  1510. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1511. nextParam += inputStream->numFields();
  1512. }
  1513. virtual void bindBooleanParam(const char *name, bool val)
  1514. {
  1515. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1516. }
  1517. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1518. {
  1519. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), (const cass_byte_t*) val, len), name);
  1520. }
  1521. virtual void bindFloatParam(const char *name, float val)
  1522. {
  1523. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1524. }
  1525. virtual void bindRealParam(const char *name, double val)
  1526. {
  1527. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1528. }
  1529. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1530. {
  1531. if (size > 4)
  1532. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1533. else
  1534. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1535. }
  1536. virtual void bindSignedParam(const char *name, __int64 val)
  1537. {
  1538. bindSignedSizeParam(name, 8, val);
  1539. }
  1540. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1541. {
  1542. UNSUPPORTED("UNSIGNED columns");
  1543. }
  1544. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1545. {
  1546. UNSUPPORTED("UNSIGNED columns");
  1547. }
  1548. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1549. {
  1550. size32_t utf8chars;
  1551. rtlDataAttr utfText;
  1552. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1553. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1554. checkNextParam(name),
  1555. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1556. name);
  1557. }
  1558. virtual void bindVStringParam(const char *name, const char *val)
  1559. {
  1560. bindStringParam(name, strlen(val), val);
  1561. }
  1562. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1563. {
  1564. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(name), val, rtlUtf8Size(chars, val)), name);
  1565. }
  1566. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1567. {
  1568. size32_t utf8chars;
  1569. rtlDataAttr utfText;
  1570. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1571. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1572. checkNextParam(name),
  1573. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1574. name);
  1575. }
  1576. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1577. {
  1578. if (isAll)
  1579. UNSUPPORTED("SET(ALL)");
  1580. type_t typecode = (type_t) elemType;
  1581. const byte *inData = (const byte *) setData;
  1582. const byte *endData = inData + totalBytes;
  1583. int numElems;
  1584. if (elemSize == UNKNOWN_LENGTH)
  1585. {
  1586. numElems = 0;
  1587. // Will need 2 passes to work out how many elements there are in the set :(
  1588. while (inData < endData)
  1589. {
  1590. int thisSize;
  1591. switch (elemType)
  1592. {
  1593. case type_varstring:
  1594. thisSize = strlen((const char *) inData) + 1;
  1595. break;
  1596. case type_string:
  1597. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1598. break;
  1599. case type_unicode:
  1600. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1601. break;
  1602. case type_utf8:
  1603. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1604. break;
  1605. default:
  1606. fail("Unsupported parameter type");
  1607. break;
  1608. }
  1609. inData += thisSize;
  1610. numElems++;
  1611. }
  1612. inData = (const byte *) setData;
  1613. }
  1614. else
  1615. numElems = totalBytes / elemSize;
  1616. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
  1617. while (inData < endData)
  1618. {
  1619. size32_t thisSize = elemSize;
  1620. CassError rc;
  1621. switch (typecode)
  1622. {
  1623. case type_int:
  1624. if (elemSize > 4)
  1625. rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
  1626. else
  1627. rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
  1628. break;
  1629. case type_unsigned:
  1630. UNSUPPORTED("UNSIGNED columns");
  1631. break;
  1632. case type_varstring:
  1633. {
  1634. size32_t numChars = strlen((const char *) inData);
  1635. if (elemSize == UNKNOWN_LENGTH)
  1636. thisSize = numChars + 1;
  1637. size32_t utf8chars;
  1638. rtlDataAttr utfText;
  1639. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1640. rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1641. break;
  1642. }
  1643. case type_string:
  1644. {
  1645. if (elemSize == UNKNOWN_LENGTH)
  1646. {
  1647. thisSize = * (size32_t *) inData;
  1648. inData += sizeof(size32_t);
  1649. }
  1650. size32_t utf8chars;
  1651. rtlDataAttr utfText;
  1652. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1653. rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1654. break;
  1655. }
  1656. case type_real:
  1657. if (elemSize == sizeof(double))
  1658. rc = cass_collection_append_double(collection, * (double *) inData);
  1659. else
  1660. rc = cass_collection_append_float(collection, * (float *) inData);
  1661. break;
  1662. case type_boolean:
  1663. assertex(elemSize == sizeof(bool));
  1664. rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
  1665. break;
  1666. case type_unicode:
  1667. {
  1668. if (elemSize == UNKNOWN_LENGTH)
  1669. {
  1670. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1671. inData += sizeof(size32_t);
  1672. }
  1673. unsigned unicodeChars;
  1674. rtlDataAttr unicode;
  1675. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1676. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1677. rc = cass_collection_append_string_n(collection, unicode.getstr(), sizeBytes);
  1678. break;
  1679. }
  1680. case type_utf8:
  1681. {
  1682. assertex (elemSize == UNKNOWN_LENGTH);
  1683. size32_t numChars = * (size32_t *) inData;
  1684. inData += sizeof(size32_t);
  1685. thisSize = rtlUtf8Size(numChars, inData);
  1686. rc = cass_collection_append_string_n(collection, (const char *) inData, thisSize);
  1687. break;
  1688. }
  1689. case type_data:
  1690. if (elemSize == UNKNOWN_LENGTH)
  1691. {
  1692. thisSize = * (size32_t *) inData;
  1693. inData += sizeof(size32_t);
  1694. }
  1695. rc = cass_collection_append_bytes(collection, (const cass_byte_t*) inData, thisSize);
  1696. break;
  1697. }
  1698. checkBind(rc, name);
  1699. inData += thisSize;
  1700. }
  1701. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1702. checkNextParam(name),
  1703. collection),
  1704. name);
  1705. }
  1706. virtual void importFunction(size32_t lenChars, const char *text)
  1707. {
  1708. throwUnexpected();
  1709. }
  1710. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1711. {
  1712. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1713. size32_t len = rtlUtf8Size(chars, _script);
  1714. queryString.set(_script, len);
  1715. const char *script = queryString.get(); // Now null terminated
  1716. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1717. {
  1718. loop
  1719. {
  1720. const char *nextScript = findUnquoted(script, ';');
  1721. if (!nextScript)
  1722. {
  1723. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1724. break;
  1725. }
  1726. CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
  1727. CassandraFuture future(cass_session_execute(*session, statement));
  1728. future.wait("execute statement");
  1729. script = nextScript;
  1730. }
  1731. }
  1732. else
  1733. {
  1734. // MORE - can cache this, perhaps, if script is same as last time?
  1735. CassandraFuture future(cass_session_prepare(*session, script));
  1736. future.wait("prepare statement");
  1737. Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
  1738. if ((flags & EFnoparams) == 0)
  1739. numParams = countBindings(script);
  1740. else
  1741. numParams = 0;
  1742. stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, cluster->batchMode));
  1743. }
  1744. }
  1745. virtual void callFunction()
  1746. {
  1747. // Does not seem to be a way to check number of parameters expected...
  1748. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1749. // fail("Not enough parameters");
  1750. try
  1751. {
  1752. if (stmtInfo && !stmtInfo->hasResult())
  1753. lazyExecute();
  1754. }
  1755. catch (IException *E)
  1756. {
  1757. StringBuffer msg;
  1758. E->errorMessage(msg);
  1759. msg.appendf(" (processing query %s)", queryString.get());
  1760. throw makeStringException(E->errorCode(), msg);
  1761. }
  1762. }
  1763. protected:
  1764. void lazyExecute()
  1765. {
  1766. if (inputStream)
  1767. inputStream->executeAll(stmtInfo);
  1768. else
  1769. stmtInfo->execute();
  1770. }
  1771. const CassValue *getScalarResult()
  1772. {
  1773. if (!stmtInfo->next())
  1774. typeError("scalar", NULL, NULL);
  1775. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1776. typeError("scalar", NULL, NULL);
  1777. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1778. if (!result)
  1779. typeError("scalar", NULL, NULL);
  1780. return result;
  1781. }
  1782. void checkSingleRow()
  1783. {
  1784. if (stmtInfo->rowCount() != 1)
  1785. typeError("scalar", NULL, NULL);
  1786. }
  1787. unsigned countBindings(const char *query)
  1788. {
  1789. unsigned queryCount = 0;
  1790. while ((query = findUnquoted(query, '?')) != NULL)
  1791. queryCount++;
  1792. return queryCount;
  1793. }
  1794. const char *findUnquoted(const char *query, char searchFor)
  1795. {
  1796. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1797. char inStr = '\0';
  1798. char ch;
  1799. while ((ch = *query++) != 0)
  1800. {
  1801. if (ch == inStr)
  1802. inStr = false;
  1803. else switch (ch)
  1804. {
  1805. case '\'':
  1806. case '"':
  1807. inStr = ch;
  1808. break;
  1809. case '\\':
  1810. if (inStr && *query)
  1811. query++;
  1812. break;
  1813. case '/':
  1814. if (!inStr)
  1815. {
  1816. if (*query=='/')
  1817. {
  1818. while (*query && *query != '\n')
  1819. query++;
  1820. }
  1821. else if (*query=='*')
  1822. {
  1823. query++;
  1824. loop
  1825. {
  1826. if (!*query)
  1827. fail("Unterminated comment in query string");
  1828. if (*query=='*' && query[1]=='/')
  1829. {
  1830. query+= 2;
  1831. break;
  1832. }
  1833. query++;
  1834. }
  1835. }
  1836. }
  1837. break;
  1838. default:
  1839. if (!inStr && ch==searchFor)
  1840. return query;
  1841. break;
  1842. }
  1843. }
  1844. return NULL;
  1845. }
  1846. inline unsigned checkNextParam(const char *name)
  1847. {
  1848. if (nextParam == numParams)
  1849. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1850. return nextParam++;
  1851. }
  1852. inline void checkBind(CassError rc, const char *name)
  1853. {
  1854. if (rc != CASS_OK)
  1855. {
  1856. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1857. }
  1858. }
  1859. Owned<CassandraCluster> cluster;
  1860. Owned<CassandraSession> session;
  1861. Owned<CassandraStatementInfo> stmtInfo;
  1862. Owned<CassandraDatasetBinder> inputStream;
  1863. const IContextLogger &logctx;
  1864. unsigned flags;
  1865. unsigned nextParam;
  1866. unsigned numParams;
  1867. StringAttr queryString;
  1868. };
  1869. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1870. {
  1871. public:
  1872. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1873. {
  1874. return createFunctionContextEx(NULL, flags, options);
  1875. }
  1876. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1877. {
  1878. if (flags & EFimport)
  1879. UNSUPPORTED("IMPORT");
  1880. else
  1881. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
  1882. }
  1883. };
  1884. extern IEmbedContext* getEmbedContext()
  1885. {
  1886. return new CassandraEmbedContext();
  1887. }
  1888. extern bool syntaxCheck(const char *script)
  1889. {
  1890. return true; // MORE
  1891. }
  1892. //--------------------------------------------
  1893. #define ATTRIBUTES_NAME "attributes"
  1894. void addElement(IPTree *parent, const char *name, const CassValue *value)
  1895. {
  1896. switch (cass_value_type(value))
  1897. {
  1898. case CASS_VALUE_TYPE_UNKNOWN:
  1899. // It's a NULL - ignore it (or we could add empty element...)
  1900. break;
  1901. case CASS_VALUE_TYPE_ASCII:
  1902. case CASS_VALUE_TYPE_TEXT:
  1903. case CASS_VALUE_TYPE_VARCHAR:
  1904. {
  1905. rtlDataAttr str;
  1906. unsigned chars;
  1907. getUTF8Result(NULL, value, chars, str.refstr());
  1908. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  1909. parent->addProp(name, s);
  1910. break;
  1911. }
  1912. case CASS_VALUE_TYPE_INT:
  1913. case CASS_VALUE_TYPE_BIGINT:
  1914. case CASS_VALUE_TYPE_VARINT:
  1915. parent->addPropInt64(name, getSignedResult(NULL, value));
  1916. break;
  1917. case CASS_VALUE_TYPE_BLOB:
  1918. {
  1919. rtlDataAttr data;
  1920. unsigned bytes;
  1921. getDataResult(NULL, value, bytes, data.refdata());
  1922. parent->addPropBin(name, bytes, data.getbytes());
  1923. break;
  1924. }
  1925. case CASS_VALUE_TYPE_BOOLEAN:
  1926. parent->addPropBool(name, getBooleanResult(NULL, value));
  1927. break;
  1928. case CASS_VALUE_TYPE_DOUBLE:
  1929. case CASS_VALUE_TYPE_FLOAT:
  1930. {
  1931. double v = getRealResult(NULL, value);
  1932. StringBuffer s;
  1933. s.append(v);
  1934. parent->addProp(name, s);
  1935. break;
  1936. }
  1937. case CASS_VALUE_TYPE_LIST:
  1938. case CASS_VALUE_TYPE_SET:
  1939. {
  1940. CassandraIterator elems(cass_iterator_from_collection(value));
  1941. Owned<IPTree> list = createPTree(name);
  1942. while (cass_iterator_next(elems))
  1943. addElement(list, "item", cass_iterator_get_value(elems));
  1944. parent->addPropTree(name, list.getClear());
  1945. break;
  1946. }
  1947. case CASS_VALUE_TYPE_MAP:
  1948. {
  1949. CassandraIterator elems(cass_iterator_from_map(value));
  1950. if (strcmp(name, ATTRIBUTES_NAME)==0 && isString(cass_value_primary_sub_type(value)))
  1951. {
  1952. while (cass_iterator_next(elems))
  1953. {
  1954. rtlDataAttr str;
  1955. unsigned chars;
  1956. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1957. StringBuffer s("@");
  1958. s.append(chars, str.getstr());
  1959. addElement(parent, s, cass_iterator_get_map_value(elems));
  1960. }
  1961. }
  1962. else
  1963. {
  1964. Owned<IPTree> map = createPTree(name);
  1965. while (cass_iterator_next(elems))
  1966. {
  1967. if (isString(cass_value_primary_sub_type(value)))
  1968. {
  1969. rtlDataAttr str;
  1970. unsigned chars;
  1971. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1972. StringAttr s(str.getstr(), chars);
  1973. addElement(map, s, cass_iterator_get_map_value(elems));
  1974. }
  1975. else
  1976. {
  1977. Owned<IPTree> mapping = createPTree("mapping");
  1978. addElement(mapping, "key", cass_iterator_get_map_key(elems));
  1979. addElement(mapping, "value", cass_iterator_get_map_value(elems));
  1980. map->addPropTree("mapping", mapping.getClear());
  1981. }
  1982. }
  1983. parent->addPropTree(name, map.getClear());
  1984. }
  1985. break;
  1986. }
  1987. default:
  1988. DBGLOG("Column type %d not supported", cass_value_type(value));
  1989. UNSUPPORTED("Column type");
  1990. }
  1991. }
  1992. void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const char *name, CassValueType type)
  1993. {
  1994. if (parent->hasProp(name) || strcmp(name, ATTRIBUTES_NAME)==0)
  1995. {
  1996. switch (type)
  1997. {
  1998. case CASS_VALUE_TYPE_ASCII:
  1999. case CASS_VALUE_TYPE_TEXT:
  2000. case CASS_VALUE_TYPE_VARCHAR:
  2001. {
  2002. const char *value = parent->queryProp(name);
  2003. if (value)
  2004. check(cass_statement_bind_string(statement, idx, value));
  2005. break;
  2006. }
  2007. case CASS_VALUE_TYPE_INT:
  2008. check(cass_statement_bind_int32(statement, idx, parent->getPropInt(name)));
  2009. break;
  2010. case CASS_VALUE_TYPE_BIGINT:
  2011. case CASS_VALUE_TYPE_VARINT:
  2012. check(cass_statement_bind_int64(statement, idx, parent->getPropInt64(name)));
  2013. break;
  2014. case CASS_VALUE_TYPE_BLOB:
  2015. {
  2016. MemoryBuffer buf;
  2017. parent->getPropBin(name, buf);
  2018. check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t*)buf.toByteArray(), buf.length()));
  2019. break;
  2020. }
  2021. case CASS_VALUE_TYPE_BOOLEAN:
  2022. check(cass_statement_bind_bool(statement, idx, (cass_bool_t) parent->getPropBool(name)));
  2023. break;
  2024. case CASS_VALUE_TYPE_DOUBLE:
  2025. check(cass_statement_bind_double(statement, idx, atof(parent->queryProp(name))));
  2026. break;
  2027. case CASS_VALUE_TYPE_FLOAT:
  2028. check(cass_statement_bind_float(statement, idx, atof(parent->queryProp(name))));
  2029. break;
  2030. case CASS_VALUE_TYPE_LIST:
  2031. case CASS_VALUE_TYPE_SET:
  2032. {
  2033. Owned<IPTree> child = parent->getPropTree(name);
  2034. unsigned numItems = child->getCount("item");
  2035. if (numItems)
  2036. {
  2037. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numItems));
  2038. Owned<IPTreeIterator> items = child->getElements("item");
  2039. ForEach(*items)
  2040. {
  2041. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  2042. if (strcmp(name, "list1")==0)
  2043. check(cass_collection_append_int32(collection, items->query().getPropInt(NULL)));
  2044. else
  2045. check(cass_collection_append_string(collection, items->query().queryProp(NULL)));
  2046. }
  2047. check(cass_statement_bind_collection(statement, idx, collection));
  2048. }
  2049. break;
  2050. }
  2051. case CASS_VALUE_TYPE_MAP:
  2052. {
  2053. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  2054. if (strcmp(name, ATTRIBUTES_NAME)==0)
  2055. {
  2056. Owned<IAttributeIterator> attrs = parent->getAttributes();
  2057. unsigned numItems = attrs->count();
  2058. ForEach(*attrs)
  2059. {
  2060. numItems++;
  2061. }
  2062. if (numItems)
  2063. {
  2064. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2065. ForEach(*attrs)
  2066. {
  2067. const char *key = attrs->queryName();
  2068. const char *value = attrs->queryValue();
  2069. check(cass_collection_append_string(collection, key+1)); // skip the @
  2070. check(cass_collection_append_string(collection, value));
  2071. }
  2072. check(cass_statement_bind_collection(statement, idx, collection));
  2073. }
  2074. }
  2075. else
  2076. {
  2077. Owned<IPTree> child = parent->getPropTree(name);
  2078. unsigned numItems = child->numChildren();
  2079. // MORE - if the cassandra driver objects to there being fewer than numItems supplied, we may need to recode using a second pass.
  2080. if (numItems)
  2081. {
  2082. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2083. Owned<IPTreeIterator> items = child->getElements("*");
  2084. ForEach(*items)
  2085. {
  2086. IPTree &item = items->query();
  2087. const char *key = item.queryName();
  2088. const char *value = item.queryProp(NULL);
  2089. if (key && value)
  2090. {
  2091. check(cass_collection_append_string(collection, key));
  2092. check(cass_collection_append_string(collection, value));
  2093. }
  2094. }
  2095. check(cass_statement_bind_collection(statement, idx, collection));
  2096. }
  2097. }
  2098. break;
  2099. }
  2100. default:
  2101. DBGLOG("Column type %d not supported", type);
  2102. UNSUPPORTED("Column type");
  2103. }
  2104. }
  2105. }
  2106. extern void cassandraToGenericXML()
  2107. {
  2108. CassandraCluster cluster(cass_cluster_new());
  2109. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2110. CassandraSession session(cass_session_new());
  2111. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "test"));
  2112. future.wait("connect");
  2113. CassandraStatement statement(cass_statement_new("select * from tbl1 where name = 'name1';", 0));
  2114. CassandraFuture future2(cass_session_execute(session, statement));
  2115. future2.wait("execute");
  2116. CassandraResult result(cass_future_get_result(future2));
  2117. StringArray names;
  2118. UnsignedArray types;
  2119. for (int i = 0; i < cass_result_column_count(result); i++)
  2120. {
  2121. const char *column;
  2122. size_t length;
  2123. cass_result_column_name(result, i, &column, &length);
  2124. StringBuffer name(length, column);
  2125. names.append(name);
  2126. types.append(cass_result_column_type(result, i));
  2127. }
  2128. // Now fetch the rows
  2129. Owned<IPTree> xml = createPTree("tbl1");
  2130. CassandraIterator rows(cass_iterator_from_result(result));
  2131. while (cass_iterator_next(rows))
  2132. {
  2133. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2134. Owned<IPTree> row = createPTree("row");
  2135. unsigned colidx = 0;
  2136. while (cass_iterator_next(cols))
  2137. {
  2138. const CassValue *value = cass_iterator_get_column(cols);
  2139. const char *name = names.item(colidx);
  2140. addElement(row, name, value);
  2141. colidx++;
  2142. }
  2143. xml->addPropTree("row", row.getClear());
  2144. }
  2145. xml->setProp("row[1]/name", "newname");
  2146. StringBuffer buf;
  2147. toXML(xml, buf);
  2148. DBGLOG("%s", buf.str());
  2149. // Now try going the other way...
  2150. // For this we need to know the expected names (can fetch them from system table) and types (ditto, potentially, though a dummy select may be easier)
  2151. StringBuffer colNames;
  2152. StringBuffer values;
  2153. ForEachItemIn(idx, names)
  2154. {
  2155. colNames.append(",").append(names.item(idx));
  2156. values.append(",?");
  2157. }
  2158. VStringBuffer insertQuery("INSERT into tbl1 (%s) values (%s);", colNames.str()+1, values.str()+1);
  2159. Owned<IPTreeIterator> xmlRows = xml->getElements("row");
  2160. ForEach(*xmlRows)
  2161. {
  2162. IPropertyTree *xmlrow = &xmlRows->query();
  2163. CassandraStatement update(cass_statement_new(insertQuery.str(), names.length()));
  2164. ForEachItemIn(idx, names)
  2165. {
  2166. bindElement(update, xmlrow, idx, names.item(idx), (CassValueType) types.item(idx));
  2167. }
  2168. // MORE - use a batch
  2169. CassandraFuture future3(cass_session_execute(session, update));
  2170. future2.wait("insert");
  2171. }
  2172. }
  2173. //--------------------------------------------
  2174. #define CASS_WU_QUERY_EXPIRES (1000*60*5)
  2175. #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
  2176. #define CASS_SEARCH_PREFIX_SIZE 2
  2177. #define NUM_PARTITIONS 2
  2178. static const CassValue *getSingleResult(const CassResult *result)
  2179. {
  2180. const CassRow *row = cass_result_first_row(result);
  2181. if (row)
  2182. return cass_row_get_column(row, 0);
  2183. else
  2184. return NULL;
  2185. }
  2186. static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
  2187. {
  2188. const char *output;
  2189. size_t length;
  2190. check(cass_value_get_string(value, &output, &length));
  2191. return str.append(length, output);
  2192. }
  2193. struct CassandraColumnMapper
  2194. {
  2195. virtual ~CassandraColumnMapper() {}
  2196. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  2197. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) = 0;
  2198. };
  2199. static class StringColumnMapper : implements CassandraColumnMapper
  2200. {
  2201. public:
  2202. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2203. {
  2204. rtlDataAttr str;
  2205. unsigned chars;
  2206. getUTF8Result(NULL, value, chars, str.refstr());
  2207. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2208. row->setProp(name, s);
  2209. return row;
  2210. }
  2211. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2212. {
  2213. const char *value = row->queryProp(name);
  2214. if (!value)
  2215. return false;
  2216. if (statement)
  2217. check(cass_statement_bind_string(statement, idx, value));
  2218. return true;
  2219. }
  2220. } stringColumnMapper;
  2221. static class RequiredStringColumnMapper : public StringColumnMapper
  2222. {
  2223. public:
  2224. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2225. {
  2226. const char *value = row->queryProp(name);
  2227. if (!value)
  2228. value = "";
  2229. if (statement)
  2230. check(cass_statement_bind_string(statement, idx, value));
  2231. return true;
  2232. }
  2233. } requiredStringColumnMapper;
  2234. static class SuppliedStringColumnMapper : public StringColumnMapper
  2235. {
  2236. public:
  2237. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  2238. {
  2239. if (statement)
  2240. check(cass_statement_bind_string(statement, idx, userVal));
  2241. return true;
  2242. }
  2243. } suppliedStringColumnMapper;
  2244. static class BlobColumnMapper : implements CassandraColumnMapper
  2245. {
  2246. public:
  2247. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2248. {
  2249. rtlDataAttr str;
  2250. unsigned chars;
  2251. getDataResult(NULL, value, chars, str.refdata());
  2252. row->setPropBin(name, chars, str.getbytes());
  2253. return row;
  2254. }
  2255. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2256. {
  2257. MemoryBuffer value;
  2258. row->getPropBin(name, value);
  2259. if (value.length())
  2260. {
  2261. if (statement)
  2262. check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t *) value.toByteArray(), value.length()));
  2263. return true;
  2264. }
  2265. else
  2266. return false;
  2267. }
  2268. } blobColumnMapper;
  2269. static class TimeStampColumnMapper : implements CassandraColumnMapper
  2270. {
  2271. public:
  2272. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2273. {
  2274. // never fetched (that may change?)
  2275. return row;
  2276. }
  2277. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2278. {
  2279. // never bound, but does need to be included in the ?
  2280. return true;
  2281. }
  2282. } timestampColumnMapper;
  2283. static class HashRootNameColumnMapper : implements CassandraColumnMapper
  2284. {
  2285. public:
  2286. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2287. {
  2288. throwUnexpected(); // we never return the partition column
  2289. }
  2290. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2291. {
  2292. if (statement)
  2293. {
  2294. int hash = rtlHash32VStr(row->queryName(), 0) % NUM_PARTITIONS;
  2295. check(cass_statement_bind_int32(statement, idx, hash));
  2296. }
  2297. return true;
  2298. }
  2299. } hashRootNameColumnMapper;
  2300. static class RootNameColumnMapper : implements CassandraColumnMapper
  2301. {
  2302. public:
  2303. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2304. {
  2305. rtlDataAttr str;
  2306. unsigned chars;
  2307. getUTF8Result(NULL, value, chars, str.refstr());
  2308. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2309. row->renameProp("/", s);
  2310. return row;
  2311. }
  2312. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2313. {
  2314. if (statement)
  2315. {
  2316. const char *value = row->queryName();
  2317. check(cass_statement_bind_string(statement, idx, value));
  2318. }
  2319. return true;
  2320. }
  2321. } rootNameColumnMapper;
  2322. // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
  2323. // is an error to try to map such a column to/from the XML representation
  2324. static class WuidColumnMapper : implements CassandraColumnMapper
  2325. {
  2326. public:
  2327. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2328. {
  2329. throwUnexpected();
  2330. }
  2331. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2332. {
  2333. throwUnexpected();
  2334. }
  2335. } wuidColumnMapper;
  2336. static class GraphIdColumnMapper : implements CassandraColumnMapper
  2337. {
  2338. public:
  2339. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2340. {
  2341. rtlDataAttr str;
  2342. unsigned chars;
  2343. getUTF8Result(NULL, value, chars, str.refstr());
  2344. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2345. if (strcmp(s, "Running")==0) // The input XML structure is a little odd
  2346. return row;
  2347. else
  2348. {
  2349. if (!row->hasProp(s))
  2350. row->addPropTree(s, createPTree());
  2351. return row->queryPropTree(s);
  2352. }
  2353. }
  2354. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2355. {
  2356. const char *value = row->queryName();
  2357. if (!value)
  2358. return false;
  2359. if (statement)
  2360. check(cass_statement_bind_string(statement, idx, value));
  2361. return true;
  2362. }
  2363. } graphIdColumnMapper;
  2364. static class ProgressColumnMapper : implements CassandraColumnMapper
  2365. {
  2366. public:
  2367. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2368. {
  2369. rtlDataAttr str;
  2370. unsigned chars;
  2371. getDataResult(NULL, value, chars, str.refdata()); // Stored as a blob in case we want to compress
  2372. IPTree *child = createPTreeFromXMLString(chars, str.getstr()); // For now, assume we did not compress!
  2373. row->addPropTree(child->queryName(), child);
  2374. return child;
  2375. }
  2376. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2377. {
  2378. // MORE - may need to read, and probably should write, compressed.
  2379. StringBuffer value;
  2380. ::toXML(row, value, 0, 0);
  2381. if (value.length())
  2382. {
  2383. if (statement)
  2384. check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t *) value.str(), value.length()));
  2385. return true;
  2386. }
  2387. else
  2388. return false;
  2389. }
  2390. } progressColumnMapper;
  2391. static class BoolColumnMapper : implements CassandraColumnMapper
  2392. {
  2393. public:
  2394. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2395. {
  2396. row->addPropBool(name, getBooleanResult(NULL, value));
  2397. return row;
  2398. }
  2399. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  2400. {
  2401. if (row->hasProp(name))
  2402. {
  2403. if (statement)
  2404. {
  2405. bool value = row->getPropBool(name, false);
  2406. check(cass_statement_bind_bool(statement, idx, value ? cass_true : cass_false));
  2407. }
  2408. return true;
  2409. }
  2410. else
  2411. return false;
  2412. }
  2413. } boolColumnMapper;
  2414. static class PrefixSearchColumnMapper : implements CassandraColumnMapper
  2415. {
  2416. public:
  2417. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2418. {
  2419. return row;
  2420. }
  2421. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  2422. {
  2423. return _fromXML(statement, idx, row, userVal, CASS_SEARCH_PREFIX_SIZE, true);
  2424. }
  2425. protected:
  2426. static bool _fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
  2427. {
  2428. const char *columnVal = row->queryProp(xpath);
  2429. if (columnVal)
  2430. {
  2431. if (statement)
  2432. {
  2433. StringBuffer buf(columnVal);
  2434. if (uc)
  2435. buf.toUpperCase();
  2436. if (prefixLength && prefixLength < buf.length())
  2437. check(cass_statement_bind_string_n(statement, idx, buf, prefixLength));
  2438. else
  2439. check(cass_statement_bind_string(statement, idx, buf));
  2440. }
  2441. return true;
  2442. }
  2443. else
  2444. return false;
  2445. }
  2446. } prefixSearchColumnMapper;
  2447. static class SearchColumnMapper : public PrefixSearchColumnMapper
  2448. {
  2449. public:
  2450. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  2451. {
  2452. return _fromXML(statement, idx, row, userVal, 0, true);
  2453. }
  2454. } searchColumnMapper;
  2455. static class LCSearchColumnMapper : public PrefixSearchColumnMapper
  2456. {
  2457. public:
  2458. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  2459. {
  2460. return _fromXML(statement, idx, row, userVal, 0, false);
  2461. }
  2462. } lcSearchColumnMapper;
  2463. static class IntColumnMapper : implements CassandraColumnMapper
  2464. {
  2465. public:
  2466. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2467. {
  2468. if (name)
  2469. row->addPropInt(name, getSignedResult(NULL, value));
  2470. return row;
  2471. }
  2472. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2473. {
  2474. if (row->hasProp(name))
  2475. {
  2476. if (statement)
  2477. {
  2478. int value = row->getPropInt(name);
  2479. check(cass_statement_bind_int32(statement, idx, value));
  2480. }
  2481. return true;
  2482. }
  2483. else
  2484. return false;
  2485. }
  2486. } intColumnMapper;
  2487. static class DefaultedIntColumnMapper : public IntColumnMapper
  2488. {
  2489. public:
  2490. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * defaultValue)
  2491. {
  2492. if (statement)
  2493. {
  2494. int value = row->getPropInt(name, atoi(defaultValue));
  2495. check(cass_statement_bind_int32(statement, idx, value));
  2496. }
  2497. return true;
  2498. }
  2499. } defaultedIntColumnMapper;
  2500. static class BigIntColumnMapper : implements CassandraColumnMapper
  2501. {
  2502. public:
  2503. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2504. {
  2505. row->addPropInt64(name, getSignedResult(NULL, value));
  2506. return row;
  2507. }
  2508. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2509. {
  2510. if (row->hasProp(name))
  2511. {
  2512. if (statement)
  2513. {
  2514. __int64 value = row->getPropInt64(name);
  2515. check(cass_statement_bind_int64(statement, idx, value));
  2516. }
  2517. return true;
  2518. }
  2519. else
  2520. return false;
  2521. }
  2522. } bigintColumnMapper;
  2523. static class SubgraphIdColumnMapper : implements CassandraColumnMapper
  2524. {
  2525. public:
  2526. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2527. {
  2528. __int64 id = getSignedResult(NULL, value);
  2529. if (id)
  2530. row->addPropInt64(name, id);
  2531. return row;
  2532. }
  2533. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2534. {
  2535. if (statement)
  2536. {
  2537. int value = row->getPropInt(name);
  2538. check(cass_statement_bind_int64(statement, idx, value));
  2539. }
  2540. return true;
  2541. }
  2542. } subgraphIdColumnMapper;
  2543. static class SimpleMapColumnMapper : implements CassandraColumnMapper
  2544. {
  2545. public:
  2546. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2547. {
  2548. Owned<IPTree> map = createPTree(name);
  2549. CassandraIterator elems(cass_iterator_from_map(value));
  2550. while (cass_iterator_next(elems))
  2551. {
  2552. rtlDataAttr str;
  2553. unsigned chars;
  2554. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2555. StringAttr s(str.getstr(), chars);
  2556. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  2557. }
  2558. row->addPropTree(name, map.getClear());
  2559. return row;
  2560. }
  2561. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2562. {
  2563. Owned<IPTree> child = row->getPropTree(name);
  2564. if (child)
  2565. {
  2566. unsigned numItems = child->numChildren();
  2567. if (numItems)
  2568. {
  2569. if (statement)
  2570. {
  2571. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2572. Owned<IPTreeIterator> items = child->getElements("*");
  2573. ForEach(*items)
  2574. {
  2575. IPTree &item = items->query();
  2576. const char *key = item.queryName();
  2577. const char *value = item.queryProp(NULL);
  2578. if (key && value)
  2579. {
  2580. check(cass_collection_append_string(collection, key));
  2581. check(cass_collection_append_string(collection, value));
  2582. }
  2583. }
  2584. check(cass_statement_bind_collection(statement, idx, collection));
  2585. }
  2586. return true;
  2587. }
  2588. }
  2589. return false;
  2590. }
  2591. } simpleMapColumnMapper;
  2592. static class AttributeMapColumnMapper : implements CassandraColumnMapper
  2593. {
  2594. public:
  2595. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2596. {
  2597. CassandraIterator elems(cass_iterator_from_map(value));
  2598. while (cass_iterator_next(elems))
  2599. {
  2600. rtlDataAttr str;
  2601. unsigned chars;
  2602. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2603. StringBuffer s("@");
  2604. s.append(chars, str.getstr());
  2605. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  2606. }
  2607. return row;
  2608. }
  2609. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2610. {
  2611. // NOTE - name here provides a list of attributes that we should NOT be mapping
  2612. Owned<IAttributeIterator> attrs = row->getAttributes();
  2613. unsigned numItems = 0;
  2614. ForEach(*attrs)
  2615. {
  2616. StringBuffer key = attrs->queryName();
  2617. key.append('@');
  2618. if (strstr(name, key) == NULL)
  2619. numItems++;
  2620. }
  2621. if (numItems)
  2622. {
  2623. if (statement)
  2624. {
  2625. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2626. ForEach(*attrs)
  2627. {
  2628. StringBuffer key = attrs->queryName();
  2629. key.append('@');
  2630. if (strstr(name, key) == NULL)
  2631. {
  2632. const char *value = attrs->queryValue();
  2633. check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
  2634. check(cass_collection_append_string(collection, value));
  2635. }
  2636. }
  2637. check(cass_statement_bind_collection(statement, idx, collection));
  2638. }
  2639. return true;
  2640. }
  2641. else
  2642. return false;
  2643. }
  2644. } attributeMapColumnMapper;
  2645. static class ElementMapColumnMapper : implements CassandraColumnMapper
  2646. {
  2647. public:
  2648. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2649. {
  2650. CassandraIterator elems(cass_iterator_from_map(value));
  2651. while (cass_iterator_next(elems))
  2652. {
  2653. rtlDataAttr str;
  2654. unsigned chars;
  2655. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2656. StringBuffer elemName(chars, str.getstr());
  2657. stringColumnMapper.toXML(row, elemName, cass_iterator_get_map_value(elems));
  2658. }
  2659. return row;
  2660. }
  2661. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2662. {
  2663. // NOTE - name here provides a list of elements that we should NOT be mapping
  2664. Owned<IPTreeIterator> elems = row->getElements("*");
  2665. unsigned numItems = 0;
  2666. ForEach(*elems)
  2667. {
  2668. IPTree &item = elems->query();
  2669. StringBuffer key('@');
  2670. key.append(item.queryName());
  2671. key.append('@');
  2672. if (strstr(name, key) == NULL)
  2673. {
  2674. const char *value = item.queryProp(".");
  2675. if (value)
  2676. numItems++;
  2677. }
  2678. }
  2679. if (numItems)
  2680. {
  2681. if (statement)
  2682. {
  2683. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2684. ForEach(*elems)
  2685. {
  2686. IPTree &item = elems->query();
  2687. StringBuffer key('@');
  2688. key.append(item.queryName());
  2689. key.append('@');
  2690. if (strstr(name, key) == NULL)
  2691. {
  2692. const char *value = item.queryProp(".");
  2693. if (value)
  2694. {
  2695. check(cass_collection_append_string(collection, item.queryName()));
  2696. check(cass_collection_append_string(collection, value));
  2697. }
  2698. }
  2699. }
  2700. check(cass_statement_bind_collection(statement, idx, collection));
  2701. }
  2702. return true;
  2703. }
  2704. else
  2705. return false;
  2706. }
  2707. } elementMapColumnMapper;
  2708. static class SubtreeMapColumnMapper : implements CassandraColumnMapper
  2709. {
  2710. public:
  2711. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2712. {
  2713. CassandraIterator elems(cass_iterator_from_map(value));
  2714. while (cass_iterator_next(elems))
  2715. {
  2716. rtlDataAttr str;
  2717. unsigned chars;
  2718. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2719. StringBuffer elemName(chars, str.getstr());
  2720. const CassValue *value = cass_iterator_get_map_value(elems);
  2721. StringBuffer valStr;
  2722. getCassString(valStr, value);
  2723. if (valStr.length() && valStr.charAt(0)== '<')
  2724. row->setPropTree(elemName, createPTreeFromXMLString(valStr));
  2725. }
  2726. return row;
  2727. }
  2728. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2729. {
  2730. // NOTE - name here provides a list of elements that we SHOULD be mapping
  2731. Owned<IPTreeIterator> elems = row->getElements("*");
  2732. unsigned numItems = 0;
  2733. ForEach(*elems)
  2734. {
  2735. IPTree &item = elems->query();
  2736. StringBuffer key("@");
  2737. key.append(item.queryName());
  2738. key.append('@');
  2739. if (strstr(name, key) != NULL)
  2740. {
  2741. if (item.numChildren())
  2742. numItems++;
  2743. }
  2744. }
  2745. if (numItems)
  2746. {
  2747. if (statement)
  2748. {
  2749. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2750. ForEach(*elems)
  2751. {
  2752. IPTree &item = elems->query();
  2753. StringBuffer key("@");
  2754. key.append(item.queryName());
  2755. key.append('@');
  2756. if (strstr(name, key) != NULL)
  2757. {
  2758. if (item.numChildren())
  2759. {
  2760. StringBuffer x;
  2761. ::toXML(&item, x);
  2762. check(cass_collection_append_string(collection, item.queryName()));
  2763. check(cass_collection_append_string(collection, x));
  2764. }
  2765. }
  2766. }
  2767. check(cass_statement_bind_collection(statement, idx, collection));
  2768. }
  2769. return true;
  2770. }
  2771. else
  2772. return false;
  2773. }
  2774. } subTreeMapColumnMapper;
  2775. static class QueryTextColumnMapper : public StringColumnMapper
  2776. {
  2777. public:
  2778. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2779. {
  2780. // Name is "Query/Text ...
  2781. IPTree *query = row->queryPropTree("Query");
  2782. if (!query)
  2783. {
  2784. query = createPTree("Query");
  2785. query = row->setPropTree("Query", query);
  2786. row->setProp("Query/@fetchEntire", "1"); // Compatibility...
  2787. }
  2788. return StringColumnMapper::toXML(query, "Text", value);
  2789. }
  2790. } queryTextColumnMapper;
  2791. static class GraphMapColumnMapper : implements CassandraColumnMapper
  2792. {
  2793. public:
  2794. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2795. : elemName(_elemName), nameAttr(_nameAttr)
  2796. {
  2797. }
  2798. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2799. {
  2800. Owned<IPTree> map = createPTree(name);
  2801. CassandraIterator elems(cass_iterator_from_map(value));
  2802. while (cass_iterator_next(elems))
  2803. {
  2804. rtlDataAttr str;
  2805. unsigned chars;
  2806. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  2807. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  2808. map->addPropTree(elemName, child.getClear());
  2809. }
  2810. row->addPropTree(name, map.getClear());
  2811. return row;
  2812. }
  2813. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2814. {
  2815. Owned<IPTree> child = row->getPropTree(name);
  2816. if (child)
  2817. {
  2818. unsigned numItems = child->numChildren();
  2819. if (numItems)
  2820. {
  2821. if (statement)
  2822. {
  2823. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2824. Owned<IPTreeIterator> items = child->getElements("*");
  2825. ForEach(*items)
  2826. {
  2827. IPTree &item = items->query();
  2828. const char *key = item.queryProp(nameAttr);
  2829. // MORE - may need to read, and probably should write, compressed. At least for graphs
  2830. StringBuffer value;
  2831. ::toXML(&item, value, 0, 0);
  2832. if (key && value.length())
  2833. {
  2834. check(cass_collection_append_string(collection, key));
  2835. check(cass_collection_append_string(collection, value));
  2836. }
  2837. }
  2838. check(cass_statement_bind_collection(statement, idx, collection));
  2839. }
  2840. return true;
  2841. }
  2842. }
  2843. return false;
  2844. }
  2845. private:
  2846. const char *elemName;
  2847. const char *nameAttr;
  2848. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid");
  2849. static class AssociationsMapColumnMapper : public GraphMapColumnMapper
  2850. {
  2851. public:
  2852. AssociationsMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2853. : GraphMapColumnMapper(_elemName, _nameAttr)
  2854. {
  2855. }
  2856. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2857. {
  2858. // Name is "Query/Associated ...
  2859. IPTree *query = row->queryPropTree("Query");
  2860. if (!query)
  2861. {
  2862. query = createPTree("Query");
  2863. row->setPropTree("Query", query);
  2864. row->setProp("Query/@fetchEntire", "1"); // Compatibility...
  2865. }
  2866. return GraphMapColumnMapper::toXML(query, "Associated", value);
  2867. }
  2868. } associationsMapColumnMapper("File", "@filename");
  2869. static class WarningsMapColumnMapper : implements CassandraColumnMapper
  2870. {
  2871. public:
  2872. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2873. {
  2874. CassandraIterator elems(cass_iterator_from_map(value));
  2875. while (cass_iterator_next(elems))
  2876. {
  2877. unsigned code = getUnsignedResult(NULL, cass_iterator_get_map_key(elems));
  2878. VStringBuffer xpath("OnWarnings/OnWarning[@code='%u']", code);
  2879. IPropertyTree * mapping = row->queryPropTree(xpath);
  2880. if (!mapping)
  2881. {
  2882. IPropertyTree * onWarnings = ensurePTree(row, "OnWarnings");
  2883. mapping = onWarnings->addPropTree("OnWarning", createPTree());
  2884. mapping->setPropInt("@code", code);
  2885. }
  2886. rtlDataAttr str;
  2887. unsigned chars;
  2888. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  2889. StringBuffer s(chars, str.getstr());
  2890. mapping->setProp("@severity", s);
  2891. }
  2892. return row;
  2893. }
  2894. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2895. {
  2896. if (!row->hasProp("OnWarnings/OnWarning"))
  2897. return false;
  2898. else
  2899. {
  2900. if (statement)
  2901. {
  2902. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, 5));
  2903. Owned<IPTreeIterator> elems = row->getElements("OnWarnings/OnWarning");
  2904. ForEach(*elems)
  2905. {
  2906. IPTree &item = elems->query();
  2907. unsigned code = item.getPropInt("@code", 0);
  2908. const char *value = item.queryProp("@severity");
  2909. if (value)
  2910. {
  2911. check(cass_collection_append_int32(collection, code));
  2912. check(cass_collection_append_string(collection, value));
  2913. }
  2914. }
  2915. check(cass_statement_bind_collection(statement, idx, collection));
  2916. }
  2917. return true;
  2918. }
  2919. }
  2920. } warningsMapColumnMapper;
  2921. static class PluginListColumnMapper : implements CassandraColumnMapper
  2922. {
  2923. public:
  2924. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  2925. : elemName(_elemName), nameAttr(_nameAttr)
  2926. {
  2927. }
  2928. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2929. {
  2930. Owned<IPTree> map = createPTree(name);
  2931. CassandraIterator elems(cass_iterator_from_collection(value));
  2932. while (cass_iterator_next(elems))
  2933. {
  2934. Owned<IPTree> child = createPTree(elemName);
  2935. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  2936. map->addPropTree(elemName, child.getClear());
  2937. }
  2938. row->addPropTree(name, map.getClear());
  2939. return row;
  2940. }
  2941. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  2942. {
  2943. Owned<IPTree> child = row->getPropTree(name);
  2944. if (child)
  2945. {
  2946. unsigned numItems = child->numChildren();
  2947. if (numItems)
  2948. {
  2949. if (statement)
  2950. {
  2951. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  2952. Owned<IPTreeIterator> items = child->getElements("*");
  2953. ForEach(*items)
  2954. {
  2955. IPTree &item = items->query();
  2956. const char *value = item.queryProp(nameAttr);
  2957. if (value)
  2958. check(cass_collection_append_string(collection, value));
  2959. }
  2960. check(cass_statement_bind_collection(statement, idx, collection));
  2961. }
  2962. return true;
  2963. }
  2964. }
  2965. return false;
  2966. }
  2967. private:
  2968. const char *elemName;
  2969. const char *nameAttr;
  2970. } pluginListColumnMapper("Plugin", "@dllname");
  2971. struct CassandraXmlMapping
  2972. {
  2973. const char *columnName;
  2974. const char *columnType;
  2975. const char *xpath;
  2976. CassandraColumnMapper &mapper;
  2977. };
  2978. struct CassandraTableInfo
  2979. {
  2980. const char *x;
  2981. const CassandraXmlMapping *mappings;
  2982. };
  2983. static const CassandraXmlMapping workunitsMappings [] =
  2984. {
  2985. {"partition", "int", NULL, hashRootNameColumnMapper},
  2986. {"wuid", "text", NULL, rootNameColumnMapper},
  2987. {"clustername", "text", "@clusterName", stringColumnMapper},
  2988. {"jobname", "text", "@jobName", stringColumnMapper},
  2989. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  2990. {"wuScope", "text", "@scope", stringColumnMapper},
  2991. {"submitID", "text", "@submitID", stringColumnMapper},
  2992. {"state", "text", "@state", stringColumnMapper},
  2993. {"action", "text", "Action", stringColumnMapper},
  2994. {"protected", "boolean", "@protected", boolColumnMapper},
  2995. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  2996. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  2997. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
  2998. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  2999. {"attributes", "map<text, text>", "@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  3000. {"graphs", "map<text, text>", "Graphs", graphMapColumnMapper}, // MORE - make me lazy...
  3001. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  3002. {"query", "text", "Query/Text", queryTextColumnMapper}, // MORE - make me lazy...
  3003. {"associations", "map<text, text>", "Query/Associated", associationsMapColumnMapper},
  3004. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  3005. {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
  3006. // These are catchalls for anything not processed above or in a child table
  3007. {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper}, // name is the suppression list, note trailing @
  3008. {"subtrees", "map<text, text>", "@Process@Tracing@", subTreeMapColumnMapper}, // name is the INCLUSION list, note trailing @
  3009. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  3010. };
  3011. static const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
  3012. {
  3013. {"partition", "int", NULL, hashRootNameColumnMapper},
  3014. {"wuid", "text", NULL, rootNameColumnMapper},
  3015. {"clustername", "text", "@clusterName", stringColumnMapper},
  3016. {"jobname", "text", "@jobName", stringColumnMapper},
  3017. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  3018. {"wuScope", "text", "@scope", stringColumnMapper},
  3019. {"submitID", "text", "@submitID", stringColumnMapper},
  3020. {"state", "text", "@state", stringColumnMapper},
  3021. {"action", "text", "Action", stringColumnMapper},
  3022. {"protected", "boolean", "@protected", boolColumnMapper},
  3023. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  3024. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  3025. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
  3026. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  3027. };
  3028. // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
  3029. static const CassandraXmlMapping searchMappings [] =
  3030. {
  3031. {"xpath", "text", NULL, suppliedStringColumnMapper},
  3032. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper},
  3033. {"fieldValue", "text", NULL, searchColumnMapper},
  3034. {"wuid", "text", NULL, rootNameColumnMapper},
  3035. {"clustername", "text", "@clusterName", stringColumnMapper},
  3036. {"jobname", "text", "@jobName", stringColumnMapper},
  3037. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  3038. {"scope", "text", "@scope", stringColumnMapper},
  3039. {"submitID", "text", "@submitID", stringColumnMapper},
  3040. {"state", "text", "@state", stringColumnMapper},
  3041. {"action", "text", "Action", stringColumnMapper},
  3042. {"protected", "boolean", "@protected", boolColumnMapper},
  3043. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  3044. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  3045. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
  3046. { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
  3047. };
  3048. // The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
  3049. const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
  3050. static const CassandraXmlMapping uniqueSearchMappings [] =
  3051. {
  3052. {"xpath", "text", NULL, suppliedStringColumnMapper},
  3053. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
  3054. {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
  3055. {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
  3056. { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
  3057. };
  3058. // The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
  3059. const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
  3060. /*
  3061. * Some thoughts on the secondary tables:
  3062. * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
  3063. * Too large and users will complain, but too small would hinder partitioning of the values across Cassandra nodes. 1 or 2 may be enough.
  3064. * 2. I could combine all the secondary tables into 1 with a field indicating the type of the key. The key field would be repeated though... Would it help?
  3065. * I'm not sure it really changes a lot - adds a bit of noise into the partitioner...
  3066. * Actually, it does mean that the updates and deletes can all be done with a single Cassandra query, though whether that has any advantages over multiple in a batch I don't know
  3067. * It MAY well make it easier to make sure that searches are case-insensitive, since we'll generally need to separate out the search field from the display field to achieve that
  3068. * 3. Sort orders are tricky - I can use the secondary table to deliver sorted by one field as long as it is the one I am filtering by (but if is is I probably don't need it sorted!
  3069. *
  3070. */
  3071. // The following describe child tables - all keyed by wuid
  3072. enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphProgressChild, WuResultsChild, WuVariablesChild, ChildTablesSize };
  3073. struct ChildTableInfo
  3074. {
  3075. const char *parentElement;
  3076. const char *childElement;
  3077. ChildTablesEnum index;
  3078. const CassandraXmlMapping *mappings;
  3079. };
  3080. static const CassandraXmlMapping wuExceptionsMappings [] =
  3081. {
  3082. {"partition", "int", NULL, hashRootNameColumnMapper},
  3083. {"wuid", "text", NULL, rootNameColumnMapper},
  3084. {"sequence", "int", "@sequence", intColumnMapper},
  3085. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  3086. {"value", "text", ".", stringColumnMapper},
  3087. { NULL, "wuExceptions", "((partition, wuid), sequence)", stringColumnMapper}
  3088. };
  3089. static const ChildTableInfo wuExceptionsTable =
  3090. {
  3091. "Exceptions", "Exception",
  3092. WuExceptionsChild,
  3093. wuExceptionsMappings
  3094. };
  3095. static const CassandraXmlMapping wuStatisticsMappings [] =
  3096. {
  3097. {"partition", "int", NULL, hashRootNameColumnMapper},
  3098. {"wuid", "text", NULL, rootNameColumnMapper},
  3099. {"ts", "bigint", "@ts", bigintColumnMapper}, // MORE - should change this to a timeuuid ?
  3100. {"kind", "text", "@kind", stringColumnMapper},
  3101. {"creator", "text", "@creator", stringColumnMapper},
  3102. {"scope", "text", "@scope", stringColumnMapper},
  3103. {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
  3104. { NULL, "wuStatistics", "((partition, wuid), ts, kind, creator, scope)", stringColumnMapper}
  3105. };
  3106. static const ChildTableInfo wuStatisticsTable =
  3107. {
  3108. "Statistics", "Statistic",
  3109. WuStatisticsChild,
  3110. wuStatisticsMappings
  3111. };
  3112. static const CassandraXmlMapping wuGraphProgressMappings [] =
  3113. {
  3114. {"partition", "int", NULL, hashRootNameColumnMapper},
  3115. {"wuid", "text", NULL, rootNameColumnMapper},
  3116. {"graphID", "text", NULL, graphIdColumnMapper},
  3117. {"progress", "blob", NULL, progressColumnMapper}, // NOTE - order of these is significant - this creates the subtree that ones below will modify
  3118. {"subgraphID", "text", "@id", subgraphIdColumnMapper},
  3119. {"state", "int", "@_state", intColumnMapper},
  3120. { NULL, "wuGraphProgress", "((partition, wuid), graphid, subgraphid)", stringColumnMapper}
  3121. };
  3122. static const ChildTableInfo wuGraphProgressTable =
  3123. {
  3124. "Bit of a", "Special case",
  3125. WuGraphProgressChild,
  3126. wuGraphProgressMappings
  3127. };
  3128. static const CassandraXmlMapping wuResultsMappings [] =
  3129. {
  3130. {"partition", "int", NULL, hashRootNameColumnMapper},
  3131. {"wuid", "text", NULL, rootNameColumnMapper},
  3132. {"sequence", "int", "@sequence", intColumnMapper},
  3133. {"name", "text", "@name", stringColumnMapper},
  3134. {"format", "text", "@format", stringColumnMapper}, // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
  3135. {"status", "text", "@status", stringColumnMapper},
  3136. {"attributes", "map<text, text>", "@sequence@name@format@status@", attributeMapColumnMapper}, // name is the suppression list. We could consider folding format/status into this?
  3137. {"rowcount", "int", "rowCount", intColumnMapper}, // This is the number of rows in result (which may be stored in a file rather than in value)
  3138. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},// This is the number of rows in value
  3139. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
  3140. {"logicalName", "text", "logicalName", stringColumnMapper}, // either this or value will be present once result status is "calculated"
  3141. {"value", "blob", "Value", blobColumnMapper},
  3142. { NULL, "wuResults", "((partition, wuid), sequence)", stringColumnMapper}
  3143. };
  3144. static const ChildTableInfo wuResultsTable =
  3145. {
  3146. "Results", "Result",
  3147. WuResultsChild,
  3148. wuResultsMappings
  3149. };
  3150. // This looks very similar to the above, but the key is different...
  3151. static const CassandraXmlMapping wuVariablesMappings [] =
  3152. {
  3153. {"partition", "int", NULL, hashRootNameColumnMapper},
  3154. {"wuid", "text", NULL, rootNameColumnMapper},
  3155. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, // Note - should be either variable or temporary...
  3156. {"name", "text", "@name", requiredStringColumnMapper},
  3157. {"format", "text", "@format", stringColumnMapper}, // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
  3158. {"status", "text", "@status", stringColumnMapper},
  3159. {"rowcount", "int", "rowCount", intColumnMapper}, // This is the number of rows in result (which may be stored in a file rather than in value)
  3160. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},// This is the number of rows in value
  3161. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
  3162. {"logicalName", "text", "logicalName", stringColumnMapper}, // either this or value will be present once result status is "calculated"
  3163. {"value", "blob", "Value", blobColumnMapper},
  3164. { NULL, "wuVariables", "((partition, wuid), sequence, name)", stringColumnMapper}
  3165. };
  3166. static const ChildTableInfo wuVariablesTable =
  3167. {
  3168. "Variables", "Variable", // Actually sometimes uses Variables, sometimes Temporaries.... MORE - think about how to fix that...
  3169. WuVariablesChild,
  3170. wuVariablesMappings
  3171. };
  3172. // Order should match the enum above
  3173. static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphProgressTable, &wuResultsTable, &wuVariablesTable, NULL };
  3174. interface ICassandraSession : public IInterface // MORE - rename!
  3175. {
  3176. virtual CassSession *querySession() const = 0;
  3177. virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
  3178. virtual unsigned queryTraceLevel() const = 0;
  3179. virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid) const = 0;
  3180. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
  3181. };
  3182. void getBoundFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
  3183. {
  3184. while (mappings->columnName)
  3185. {
  3186. if (mappings->mapper.fromXML(NULL, 0, inXML, mappings->xpath, userVal))
  3187. {
  3188. names.appendf(",%s", mappings->columnName);
  3189. if (strcmp(mappings->columnType, "timeuuid")==0)
  3190. bindings.appendf(",now()");
  3191. else
  3192. bindings.appendf(",?");
  3193. }
  3194. mappings++;
  3195. }
  3196. tableName.append(mappings->columnType);
  3197. }
  3198. void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
  3199. {
  3200. while (mappings->columnName)
  3201. {
  3202. names.appendf(",%s", mappings->columnName);
  3203. mappings++;
  3204. }
  3205. tableName.append(mappings->columnType);
  3206. }
  3207. const char *queryTableName(const CassandraXmlMapping *mappings)
  3208. {
  3209. while (mappings->columnName)
  3210. mappings++;
  3211. return mappings->columnType;
  3212. }
  3213. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  3214. {
  3215. StringBuffer fields;
  3216. while (mappings->columnName)
  3217. {
  3218. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  3219. mappings++;
  3220. }
  3221. StringArray options;
  3222. options.appendList(mappings->xpath, "|");
  3223. assertex(options.length()); // Primary key at least should be present!
  3224. out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
  3225. unsigned idx = 1;
  3226. while (options.isItem(idx))
  3227. {
  3228. if (idx==1)
  3229. out.append(" WITH ");
  3230. else
  3231. out.append(", ");
  3232. out.append(options.item(idx));
  3233. idx++;
  3234. }
  3235. out.append(';');
  3236. return out;
  3237. }
  3238. const CassResult *executeQuery(CassSession *session, CassStatement *statement)
  3239. {
  3240. CassandraFuture future(cass_session_execute(session, statement));
  3241. future.wait("executeQuery");
  3242. return cass_future_get_result(future);
  3243. }
  3244. void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid, const ICassandraSession *sessionCache, CassBatch *batch)
  3245. {
  3246. if (key)
  3247. {
  3248. StringBuffer ucKey(key);
  3249. ucKey.toUpperCase();
  3250. StringBuffer names;
  3251. StringBuffer tableName;
  3252. getFieldNames(searchMappings, names, tableName);
  3253. VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
  3254. Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(deleteQuery);
  3255. CassandraStatement update(cass_prepared_bind(*prepared));
  3256. check(cass_statement_bind_string(update, 0, xpath));
  3257. if (ucKey.length() < CASS_SEARCH_PREFIX_SIZE)
  3258. check(cass_statement_bind_string(update, 1, ucKey));
  3259. else
  3260. check(cass_statement_bind_string_n(update, 1, ucKey, CASS_SEARCH_PREFIX_SIZE));
  3261. check(cass_statement_bind_string(update, 2, ucKey));
  3262. check(cass_statement_bind_string(update, 3, wuid));
  3263. check(cass_batch_add_statement(batch, update));
  3264. }
  3265. }
  3266. void executeSimpleCommand(CassSession *session, const char *command)
  3267. {
  3268. CassandraStatement statement(cass_statement_new(command, 0));
  3269. CassandraFuture future(cass_session_execute(session, statement));
  3270. future.wait("execute");
  3271. }
  3272. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  3273. {
  3274. StringBuffer schema;
  3275. executeSimpleCommand(session, describeTable(mappings, schema));
  3276. }
  3277. extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  3278. {
  3279. StringBuffer names;
  3280. StringBuffer bindings;
  3281. StringBuffer tableName;
  3282. getBoundFieldNames(mappings, names, bindings, inXML, userVal, tableName);
  3283. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  3284. Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
  3285. CassandraStatement update(cass_prepared_bind(*prepared));
  3286. unsigned bindidx = 0;
  3287. while (mappings->columnName)
  3288. {
  3289. if (mappings->mapper.fromXML(update, bindidx, inXML, mappings->xpath, userVal))
  3290. bindidx++;
  3291. mappings++;
  3292. }
  3293. check(cass_batch_add_statement(batch, update));
  3294. }
  3295. extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree &row, const char *userVal)
  3296. {
  3297. StringBuffer bindings;
  3298. StringBuffer names;
  3299. StringBuffer tableName;
  3300. getBoundFieldNames(mappings, names, bindings, &row, userVal, tableName);
  3301. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  3302. Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
  3303. CassandraStatement update(cass_prepared_bind(*prepared));
  3304. check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
  3305. check(cass_statement_bind_string(update, 1, wuid));
  3306. unsigned bindidx = 2; // We already bound wuid and partition
  3307. unsigned colidx = 2; // We already bound wuid and partition
  3308. while (mappings[colidx].columnName)
  3309. {
  3310. if (mappings[colidx].mapper.fromXML(update, bindidx, &row, mappings[colidx].xpath, userVal))
  3311. bindidx++;
  3312. colidx++;
  3313. }
  3314. check(cass_batch_add_statement(batch, update));
  3315. }
  3316. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, const char *userVal)
  3317. {
  3318. if (elements->first())
  3319. {
  3320. do
  3321. {
  3322. childXMLRowtoCassandra(session, batch, mappings, wuid, elements->query(), userVal);
  3323. }
  3324. while (elements->next());
  3325. }
  3326. }
  3327. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, const char *defaultValue)
  3328. {
  3329. Owned<IPTreeIterator> elements = inXML->getElements(xpath);
  3330. childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
  3331. }
  3332. extern void wuResultsXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, IPTree *inXML, const char *xpath)
  3333. {
  3334. childXMLtoCassandra(session, batch, wuResultsMappings, inXML, xpath, NULL);
  3335. }
  3336. extern void wuVariablesXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, IPTree *inXML, const char *xpath, const char *defaultSequence)
  3337. {
  3338. childXMLtoCassandra(session, batch, wuVariablesMappings, inXML, xpath, defaultSequence);
  3339. }
  3340. /*
  3341. extern void cassandraToWuVariablesXML(CassSession *session, const char *wuid, IPTree *wuTree)
  3342. {
  3343. CassandraResult result(fetchDataForWuid(wuid, session, wuVariablesMappings));
  3344. Owned<IPTree> variables;
  3345. Owned<IPTree> temporaries;
  3346. CassandraIterator rows(cass_iterator_from_result(result));
  3347. while (cass_iterator_next(rows))
  3348. {
  3349. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3350. if (!cass_iterator_next(cols))
  3351. fail("No column found reading wuvariables.sequence");
  3352. const CassValue *sequenceValue = cass_iterator_get_column(cols);
  3353. int sequence = getSignedResult(NULL, sequenceValue);
  3354. Owned<IPTree> child;
  3355. IPTree *parent;
  3356. switch (sequence)
  3357. {
  3358. case ResultSequenceStored:
  3359. if (!variables)
  3360. variables.setown(createPTree("Variables"));
  3361. child.setown(createPTree("Variable"));
  3362. parent = variables;
  3363. break;
  3364. case ResultSequenceInternal:
  3365. case ResultSequenceOnce:
  3366. if (!temporaries)
  3367. temporaries.setown(createPTree("Temporaries"));
  3368. child.setown(createPTree("Variable"));
  3369. parent = temporaries;
  3370. break;
  3371. default:
  3372. throwUnexpected();
  3373. break;
  3374. }
  3375. unsigned colidx = 2;
  3376. while (cass_iterator_next(cols))
  3377. {
  3378. assertex(wuVariablesMappings[colidx].columnName);
  3379. const CassValue *value = cass_iterator_get_column(cols);
  3380. if (value && !cass_value_is_null(value))
  3381. wuVariablesMappings[colidx].mapper.toXML(child, wuVariablesMappings[colidx].xpath, value);
  3382. colidx++;
  3383. }
  3384. const char *childName = child->queryName();
  3385. parent->addPropTree(childName, child.getClear());
  3386. }
  3387. if (variables)
  3388. wuTree->addPropTree("Variables", variables.getClear());
  3389. if (temporaries)
  3390. wuTree->addPropTree("Temporaries", temporaries.getClear());
  3391. }
  3392. */
  3393. /*
  3394. extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
  3395. {
  3396. StringBuffer names;
  3397. StringBuffer bindings;
  3398. StringBuffer tableName;
  3399. int numBound = getFieldNames(graphProgressMappings, names, bindings, tableName);
  3400. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  3401. DBGLOG("%s", insertQuery.str());
  3402. CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
  3403. CassandraFuture futurePrep(cass_session_prepare(session, insertQuery));
  3404. futurePrep.wait("prepare statement");
  3405. CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
  3406. Owned<IPTreeIterator> graphs = inXML->getElements("./graph*");
  3407. ForEach(*graphs)
  3408. {
  3409. IPTree &graph = graphs->query();
  3410. Owned<IPTreeIterator> subgraphs = graph.getElements("./node");
  3411. ForEach(*subgraphs)
  3412. {
  3413. IPTree &subgraph = subgraphs->query();
  3414. CassandraStatement update(cass_prepared_bind(prepared));
  3415. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  3416. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  3417. unsigned colidx = 2;
  3418. while (graphProgressMappings[colidx].columnName)
  3419. {
  3420. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &subgraph, graphProgressMappings[colidx].xpath);
  3421. colidx++;
  3422. }
  3423. check(cass_batch_add_statement(batch, update));
  3424. }
  3425. // And one more with subgraphid = 0 for the graph status
  3426. CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
  3427. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  3428. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  3429. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  3430. unsigned colidx = 4; // we skip progress and subgraphid
  3431. while (graphProgressMappings[colidx].columnName)
  3432. {
  3433. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &graph, graphProgressMappings[colidx].xpath);
  3434. colidx++;
  3435. }
  3436. check(cass_batch_add_statement(batch, update));
  3437. }
  3438. if (inXML->hasProp("Running"))
  3439. {
  3440. IPTree *running = inXML->queryPropTree("Running");
  3441. CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
  3442. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  3443. graphProgressMappings[1].mapper.fromXML(update, 1, running, graphProgressMappings[1].xpath);
  3444. graphProgressMappings[2].mapper.fromXML(update, 2, running, graphProgressMappings[2].xpath);
  3445. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  3446. check(cass_batch_add_statement(batch, update));
  3447. }
  3448. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  3449. futureBatch.wait("execute");
  3450. }
  3451. extern void cassandraToGraphProgressXML(CassSession *session, const char *wuid)
  3452. {
  3453. CassandraResult result(fetchDataForWu(wuid, session, graphProgressMappings));
  3454. Owned<IPTree> progress = createPTree(wuid);
  3455. CassandraIterator rows(cass_iterator_from_result(result));
  3456. while (cass_iterator_next(rows))
  3457. {
  3458. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3459. unsigned colidx = 1; // wuid is not returned
  3460. IPTree *ptree = progress;
  3461. while (cass_iterator_next(cols))
  3462. {
  3463. assertex(graphProgressMappings[colidx].columnName);
  3464. const CassValue *value = cass_iterator_get_column(cols);
  3465. // NOTE - this relies on the fact that progress is NULL when subgraphId=0, so that the status and id fields
  3466. // get set on the graph instead of on the child node in those cases.
  3467. if (value && !cass_value_is_null(value))
  3468. ptree = graphProgressMappings[colidx].mapper.toXML(ptree, graphProgressMappings[colidx].xpath, value);
  3469. colidx++;
  3470. }
  3471. }
  3472. StringBuffer out;
  3473. toXML(progress, out, 0, XML_SortTags|XML_Format);
  3474. printf("%s", out.str());
  3475. }
  3476. */
  3477. /*
  3478. extern void cassandraTestGraphProgressXML()
  3479. {
  3480. CassandraCluster cluster(cass_cluster_new());
  3481. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  3482. CassandraSession session(cass_session_new());
  3483. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  3484. future.wait("connect");
  3485. ensureTable(session, graphProgressMappings);
  3486. Owned<IPTree> inXML = createPTreeFromXMLFile("/data/rchapman/hpcc/testing/regress/ecl/a.xml");
  3487. graphProgressXMLtoCassandra(session, inXML);
  3488. const char *wuid = inXML->queryName();
  3489. cassandraToGraphProgressXML(session, wuid);
  3490. }
  3491. extern void cassandraTest()
  3492. {
  3493. cassandraTestWorkunitXML();
  3494. //cassandraTestGraphProgressXML();
  3495. }
  3496. */
  3497. static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
  3498. {
  3499. CassandraIterator cols(cass_iterator_from_row(row));
  3500. Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
  3501. if (xpath && *xpath && key && *key)
  3502. xml->setProp(xpath, key);
  3503. while (cass_iterator_next(cols))
  3504. {
  3505. assertex(mappings->columnName);
  3506. const CassValue *value = cass_iterator_get_column(cols);
  3507. if (value && !cass_value_is_null(value))
  3508. mappings->mapper.toXML(xml, mappings->xpath, value);
  3509. mappings++;
  3510. }
  3511. return xml.getClear();
  3512. }
  3513. /*
  3514. * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
  3515. */
  3516. const char *queryFilterField(WUSortField field)
  3517. {
  3518. switch (field)
  3519. {
  3520. case WUSFuser: return "submitID";
  3521. case WUSFcluster: return "clustername";
  3522. case WUSFjob: return "jobname";
  3523. case WUSFstate: return "state";
  3524. case WUSFpriority: return "priorityClass";
  3525. case WUSFwuid: return "wuid";
  3526. case WUSFwuidhigh: return "wuid";
  3527. case WUSFfileread: UNIMPLEMENTED;
  3528. case WUSFprotected: return "protected";
  3529. case WUSFtotalthortime: return "totalThorTime";
  3530. case WUSFwildwuid: return "wuid";
  3531. case WUSFecl: UNIMPLEMENTED;
  3532. default:
  3533. throwUnexpected();
  3534. }
  3535. }
  3536. class PostFilter : public CInterface
  3537. {
  3538. public:
  3539. PostFilter(WUSortField _field, const char *_value, bool _wild)
  3540. : field(_field), cqlField(queryFilterField(_field)), xpath(queryFilterXPath(_field)), wild(_wild)
  3541. {
  3542. setValue(_value);
  3543. }
  3544. bool matches(IPTree &p) const
  3545. {
  3546. const char *val = p.queryProp(xpath);
  3547. if (val)
  3548. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  3549. else
  3550. return false;
  3551. }
  3552. const char *queryValue() const
  3553. {
  3554. return value.str();
  3555. }
  3556. void setValue(const char *_value)
  3557. {
  3558. if (wild)
  3559. {
  3560. VStringBuffer filter("*%s*", _value);
  3561. pattern.set(filter);
  3562. }
  3563. else
  3564. pattern.set(_value);
  3565. value.set(_value);
  3566. }
  3567. const char *queryXPath() const
  3568. {
  3569. return xpath;
  3570. }
  3571. WUSortField queryField() const
  3572. {
  3573. return field;
  3574. }
  3575. private:
  3576. const char *cqlField;
  3577. const char *xpath;
  3578. StringAttr pattern;
  3579. StringAttr value;
  3580. WUSortField field;
  3581. bool wild;
  3582. };
  3583. class CassSortableIterator : public CassandraIterator
  3584. {
  3585. public:
  3586. CassSortableIterator(CassIterator *_iterator, unsigned _compareColumn, bool _descending)
  3587. : CassandraIterator(_iterator), compareColumn(_compareColumn), descending(_descending)
  3588. {
  3589. }
  3590. const CassSortableIterator *nextRow()
  3591. {
  3592. if (iterator && cass_iterator_next(iterator))
  3593. {
  3594. const CassRow *row = cass_iterator_get_row(iterator);
  3595. getCassString(value.clear(), cass_row_get_column(row, compareColumn));
  3596. return this;
  3597. }
  3598. else
  3599. return NULL;
  3600. }
  3601. void stop()
  3602. {
  3603. value.clear();
  3604. set(NULL);
  3605. }
  3606. int compare(const CassSortableIterator *to) const
  3607. {
  3608. int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
  3609. return descending ? -ret : ret;
  3610. }
  3611. private:
  3612. StringBuffer value;
  3613. unsigned compareColumn;
  3614. bool descending;
  3615. };
  3616. interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
  3617. {
  3618. virtual bool hasPostFilters() const = 0;
  3619. virtual bool isMerging() const = 0;
  3620. };
  3621. /*
  3622. *
  3623. * The cache entries serve two purposes:
  3624. *
  3625. * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
  3626. * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
  3627. */
  3628. class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
  3629. {
  3630. public:
  3631. CCassandraWuUQueryCacheEntry()
  3632. {
  3633. hint = get_cycles_now(); // MORE - should do better perhaps?
  3634. lastAccess = msTick();
  3635. }
  3636. __int64 queryHint() const
  3637. {
  3638. return hint;
  3639. }
  3640. void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
  3641. {
  3642. CriticalBlock b(crit);
  3643. ForEachItemInRev(idx, rows)
  3644. {
  3645. unsigned foundRow = rows.item(idx);
  3646. assertex(foundRow != row);
  3647. if (foundRow < row)
  3648. break;
  3649. }
  3650. rows.add(row, idx+1);
  3651. wuids.add(wuid, idx+1);
  3652. fieldValues.add(fieldValue, idx+1);
  3653. }
  3654. IConstWorkUnitIteratorEx *getResult() const
  3655. {
  3656. CriticalBlock b(crit);
  3657. return result.getLink();
  3658. }
  3659. void setResult(IConstWorkUnitIteratorEx *_result)
  3660. {
  3661. CriticalBlock b(crit);
  3662. result.set(_result);
  3663. }
  3664. unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset)
  3665. {
  3666. // See if we can provide a base wuid to search above/below
  3667. CriticalBlock b(crit);
  3668. ForEachItemInRev(idx, rows)
  3669. {
  3670. unsigned foundRow = rows.item(idx);
  3671. if (foundRow <= startOffset)
  3672. {
  3673. wuid.set(wuids.item(idx));
  3674. fieldValue.set(fieldValues.item(idx));
  3675. return foundRow;
  3676. }
  3677. }
  3678. return 0;
  3679. }
  3680. void touch()
  3681. {
  3682. lastAccess = msTick();
  3683. }
  3684. inline unsigned queryLastAccess() const
  3685. {
  3686. return lastAccess;
  3687. }
  3688. private:
  3689. mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
  3690. UnsignedArray rows;
  3691. StringArray wuids;
  3692. StringArray fieldValues;
  3693. Owned<IConstWorkUnitIteratorEx> result;
  3694. __uint64 hint;
  3695. unsigned lastAccess;
  3696. };
  3697. class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
  3698. {
  3699. public:
  3700. IMPLEMENT_IINTERFACE;
  3701. CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, unsigned _compareColumn, bool _descending)
  3702. : cache(_cache)
  3703. {
  3704. compareColumn = _compareColumn;
  3705. descending = _descending;
  3706. startRowNum = _startRowNum;
  3707. }
  3708. void setStartOffset(unsigned start)
  3709. {
  3710. startRowNum = start; // we managed to do a seek forward via a filter
  3711. }
  3712. void addResult(CassandraResult &result)
  3713. {
  3714. results.append(result);
  3715. }
  3716. void addPostFilters(CIArrayOf<PostFilter> &filters, unsigned start)
  3717. {
  3718. unsigned len = filters.length();
  3719. while (start<len)
  3720. postFilters.append(OLINK(filters.item(start++)));
  3721. }
  3722. void addPostFilter(PostFilter &filter)
  3723. {
  3724. postFilters.append(filter);
  3725. }
  3726. virtual bool hasPostFilters() const
  3727. {
  3728. return postFilters.length() != 0;
  3729. }
  3730. virtual bool isMerging() const
  3731. {
  3732. return results.length() > 1;
  3733. }
  3734. virtual bool first()
  3735. {
  3736. inputs.kill();
  3737. ForEachItemIn(idx, results)
  3738. {
  3739. inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending));
  3740. }
  3741. merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
  3742. rowNum = startRowNum;
  3743. return next();
  3744. }
  3745. virtual bool next()
  3746. {
  3747. Owned<IConstWorkUnitInfo> last = current.getClear();
  3748. loop
  3749. {
  3750. const CassandraIterator *nextSource = get_row();
  3751. if (!nextSource)
  3752. {
  3753. if (cache && last)
  3754. {
  3755. cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
  3756. }
  3757. return false;
  3758. }
  3759. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
  3760. bool postFiltered = false;
  3761. ForEachItemIn(pfIdx, postFilters)
  3762. {
  3763. if (!postFilters.item(pfIdx).matches(*wuXML))
  3764. {
  3765. postFiltered = true;
  3766. break;
  3767. }
  3768. }
  3769. if (!postFiltered)
  3770. {
  3771. current.setown(createConstWorkUnitInfo(*wuXML));
  3772. lastThorTime.set(wuXML->queryProp("@totalThorTime"));
  3773. rowNum++;
  3774. return true;
  3775. }
  3776. }
  3777. }
  3778. virtual bool isValid()
  3779. {
  3780. return current != NULL;
  3781. }
  3782. virtual IConstWorkUnitInfo & query()
  3783. {
  3784. assertex(current);
  3785. return *current.get();
  3786. }
  3787. const CassandraIterator *get_row()
  3788. {
  3789. return (const CassSortableIterator *) merger->nextRow();
  3790. }
  3791. protected:
  3792. virtual void linkRow(const void *row) { throwUnexpected(); } // The 'rows' we pass around are CassSortableIterator objects - we CAN link them if we have to
  3793. virtual void releaseRow(const void *row) { throwUnexpected(); }
  3794. virtual const void *nextRow(unsigned idx)
  3795. {
  3796. CassSortableIterator &it = inputs.item(idx);
  3797. return it.nextRow(); // returns either a pointer to the iterator, or NULL
  3798. }
  3799. virtual void stop(unsigned idx)
  3800. {
  3801. inputs.item(idx).stop();
  3802. }
  3803. virtual int docompare(const void *a, const void *b) const
  3804. {
  3805. // a and b point to to CassSortableIterator objects
  3806. const CassSortableIterator *aa = (const CassSortableIterator *) a;
  3807. const CassSortableIterator *bb = (const CassSortableIterator *) b;
  3808. return aa->compare(bb);
  3809. }
  3810. private:
  3811. Owned<IRowStream> merger;
  3812. IArrayOf<CassandraResult> results;
  3813. IArrayOf<CassSortableIterator> inputs;
  3814. CIArrayOf<PostFilter> postFilters;
  3815. Owned<IConstWorkUnitInfo> current;
  3816. Linked<CCassandraWuUQueryCacheEntry> cache;
  3817. StringAttr lastThorTime;
  3818. unsigned compareColumn;
  3819. unsigned startRowNum;
  3820. unsigned rowNum;
  3821. bool descending;
  3822. };
  3823. class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
  3824. {
  3825. public:
  3826. CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
  3827. : input(_input), sortorder(_sortorder), limit(_limit)
  3828. {
  3829. idx = 0;
  3830. }
  3831. virtual bool first()
  3832. {
  3833. if (input)
  3834. {
  3835. readFirst();
  3836. input.clear();
  3837. }
  3838. idx = 0;
  3839. return sorted.isItem(idx);
  3840. }
  3841. virtual bool next()
  3842. {
  3843. idx++;
  3844. return sorted.isItem(idx);
  3845. }
  3846. virtual bool isValid()
  3847. {
  3848. return sorted.isItem(idx);
  3849. }
  3850. virtual IConstWorkUnitInfo & query()
  3851. {
  3852. return sorted.item(idx);
  3853. }
  3854. virtual bool hasPostFilters() const
  3855. {
  3856. return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
  3857. }
  3858. virtual bool isMerging() const
  3859. {
  3860. return false;
  3861. }
  3862. private:
  3863. void readFirst()
  3864. {
  3865. ForEach(*input)
  3866. {
  3867. sorted.append(OLINK(input->query()));
  3868. if (sorted.length()>=limit)
  3869. break;
  3870. }
  3871. qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
  3872. }
  3873. virtual int docompare(const void *a, const void *b) const
  3874. {
  3875. // a and b point to to IConstWorkUnitInfo objects
  3876. const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
  3877. const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
  3878. int diff;
  3879. switch (sortorder & 0xff)
  3880. {
  3881. case WUSFuser:
  3882. diff = stricmp(aa->queryUser(), bb->queryUser());
  3883. break;
  3884. case WUSFcluster:
  3885. diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
  3886. break;
  3887. case WUSFjob:
  3888. diff = stricmp(aa->queryJobName(), bb->queryJobName());
  3889. break;
  3890. case WUSFstate:
  3891. diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
  3892. break;
  3893. case WUSFprotected:
  3894. diff = (int) bb->isProtected() - (int) aa->isProtected();
  3895. break;
  3896. case WUSFtotalthortime:
  3897. diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
  3898. break;
  3899. case WUSFwuid:
  3900. diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
  3901. break;
  3902. default:
  3903. throwUnexpected();
  3904. }
  3905. if (sortorder & WUSFreverse)
  3906. return -diff;
  3907. else
  3908. return diff;
  3909. }
  3910. Owned<IConstWorkUnitIterator> input;
  3911. IArrayOf<IConstWorkUnitInfo> sorted;
  3912. unsigned sortorder;
  3913. unsigned idx;
  3914. unsigned limit;
  3915. };
  3916. class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
  3917. {
  3918. public:
  3919. SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
  3920. : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
  3921. {
  3922. }
  3923. virtual bool first()
  3924. {
  3925. idx = 0;
  3926. // MORE - put a seek into the Ex interface
  3927. if (input->first())
  3928. {
  3929. for (int i = 0; i < startOffset;i++)
  3930. {
  3931. if (!input->next())
  3932. return false;
  3933. }
  3934. return true;
  3935. }
  3936. else
  3937. return false;
  3938. }
  3939. virtual bool next()
  3940. {
  3941. if (idx >= pageSize)
  3942. return false;
  3943. idx++;
  3944. return input->next();
  3945. }
  3946. virtual bool isValid()
  3947. {
  3948. return idx < pageSize && input->isValid();
  3949. }
  3950. virtual IConstWorkUnitInfo & query()
  3951. {
  3952. return input->query();
  3953. }
  3954. virtual bool hasPostFilters() const
  3955. {
  3956. return false;
  3957. }
  3958. virtual bool isMerging() const
  3959. {
  3960. return false;
  3961. }
  3962. private:
  3963. Owned<IConstWorkUnitIteratorEx> input;
  3964. unsigned startOffset;
  3965. unsigned pageSize;
  3966. unsigned idx;
  3967. };
  3968. class CassJoinIterator : public CInterface, implements IConstWorkUnitIteratorEx
  3969. {
  3970. public:
  3971. IMPLEMENT_IINTERFACE;
  3972. CassJoinIterator(unsigned _compareColumn, bool _descending)
  3973. {
  3974. compareColumn = _compareColumn;
  3975. descending = _descending;
  3976. }
  3977. void addResult(CassandraResult &result)
  3978. {
  3979. results.append(result);
  3980. }
  3981. void addPostFilter(PostFilter &post)
  3982. {
  3983. postFilters.append(post);
  3984. }
  3985. virtual bool first()
  3986. {
  3987. if (!results.length())
  3988. return false;
  3989. inputs.kill();
  3990. ForEachItemIn(idx, results)
  3991. {
  3992. Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending);
  3993. if (!input->nextRow())
  3994. return false;
  3995. inputs.append(*input.getClear());
  3996. }
  3997. return next();
  3998. }
  3999. virtual bool next()
  4000. {
  4001. current.clear();
  4002. loop
  4003. {
  4004. unsigned idx = 0;
  4005. unsigned target = 0;
  4006. unsigned matches = 1; // I always match myself!
  4007. unsigned sources = inputs.length();
  4008. if (!sources)
  4009. return false;
  4010. while (matches < sources)
  4011. {
  4012. idx++;
  4013. if (idx==sources)
  4014. idx = 0;
  4015. int diff;
  4016. loop
  4017. {
  4018. assert(idx != target);
  4019. diff = inputs.item(idx).compare(&inputs.item(target));
  4020. if (diff >= 0)
  4021. break;
  4022. if (!inputs.item(idx).nextRow())
  4023. {
  4024. inputs.kill(); // Once any reaches EOF, we are done
  4025. return false;
  4026. }
  4027. }
  4028. if (diff > 0)
  4029. {
  4030. target = idx;
  4031. matches = 1;
  4032. }
  4033. else
  4034. matches++;
  4035. }
  4036. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
  4037. bool postFiltered = false;
  4038. ForEachItemIn(pfIdx, postFilters)
  4039. {
  4040. if (!postFilters.item(pfIdx).matches(*wuXML))
  4041. {
  4042. postFiltered = true;
  4043. break;
  4044. }
  4045. }
  4046. if (!postFiltered)
  4047. {
  4048. current.setown(createConstWorkUnitInfo(*wuXML));
  4049. ForEachItemIn(idx2, inputs)
  4050. {
  4051. if (!inputs.item(idx2).nextRow())
  4052. {
  4053. inputs.clear(); // Make sure next() fails next time it is called
  4054. break;
  4055. }
  4056. }
  4057. return true;
  4058. }
  4059. }
  4060. }
  4061. virtual bool isValid()
  4062. {
  4063. return current != NULL;
  4064. }
  4065. virtual IConstWorkUnitInfo & query()
  4066. {
  4067. assertex(current);
  4068. return *current.get();
  4069. }
  4070. private:
  4071. IArrayOf<CassandraResult> results;
  4072. IArrayOf<CassSortableIterator> inputs;
  4073. CIArrayOf<PostFilter> postFilters;
  4074. Owned<IConstWorkUnitInfo> current;
  4075. unsigned compareColumn;
  4076. bool descending;
  4077. };
  4078. class CCassandraWorkUnit : public CLocalWorkUnit
  4079. {
  4080. public:
  4081. IMPLEMENT_IINTERFACE;
  4082. CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser)
  4083. : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
  4084. {
  4085. CLocalWorkUnit::loadPTree(wuXML);
  4086. allDirty = false; // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
  4087. memset(childLoaded, 0, sizeof(childLoaded));
  4088. abortDirty = true;
  4089. abortState = false;
  4090. }
  4091. ~CCassandraWorkUnit()
  4092. {
  4093. }
  4094. virtual void forceReload()
  4095. {
  4096. printStackReport();
  4097. UNIMPLEMENTED;
  4098. abortDirty = true;
  4099. }
  4100. virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
  4101. {
  4102. const char *wuid = queryWuid();
  4103. CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
  4104. if (!batch)
  4105. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
  4106. deleteChildren(wuid);
  4107. deleteSecondaries(wuid);
  4108. Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;");
  4109. CassandraStatement update(cass_prepared_bind(*prepared));
  4110. check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
  4111. check(cass_statement_bind_string(update, 1, wuid));
  4112. check(cass_batch_add_statement(*batch, update));
  4113. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
  4114. futureBatch.wait("execute");
  4115. batch.clear();
  4116. }
  4117. virtual void commit()
  4118. {
  4119. CLocalWorkUnit::commit();
  4120. if (sessionCache->queryTraceLevel() >= 8)
  4121. {
  4122. StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
  4123. }
  4124. if (batch)
  4125. {
  4126. const char *wuid = queryWuid();
  4127. if (prev) // Holds the values of the "basic" info at the last commit
  4128. updateSecondaries(wuid);
  4129. simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, p); // This just does the parent row
  4130. if (allDirty)
  4131. {
  4132. // MORE - this delete is technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
  4133. // empty newly-created WU, it is unnecessary.
  4134. //deleteChildren(wuid);
  4135. wuResultsXMLtoCassandra(sessionCache, *batch, p, "Results/Result");
  4136. wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Variables/Variable", "-1"); // ResultSequenceStored
  4137. wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
  4138. childXMLtoCassandra(sessionCache, *batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
  4139. childXMLtoCassandra(sessionCache, *batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
  4140. }
  4141. else
  4142. {
  4143. HashIterator iter(dirtyPaths);
  4144. ForEach (iter)
  4145. {
  4146. const char *path = (const char *) iter.query().getKey();
  4147. const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
  4148. if (sessionCache->queryTraceLevel()>2)
  4149. DBGLOG("Updating dirty path %s", path);
  4150. if (*path == '*')
  4151. {
  4152. sessionCache->deleteChildByWuid(table, wuid, *batch);
  4153. childXMLtoCassandra(sessionCache, *batch, table, p, path+1, 0);
  4154. }
  4155. else
  4156. {
  4157. IPTree *dirty = p->queryPropTree(path);
  4158. if (dirty)
  4159. childXMLRowtoCassandra(sessionCache, *batch, table, wuid, *dirty, 0);
  4160. else if (sessionCache->queryTraceLevel())
  4161. {
  4162. StringBuffer xml;
  4163. toXML(p, xml);
  4164. DBGLOG("Missing dirty element %s in %s", path, xml.str());
  4165. }
  4166. }
  4167. }
  4168. }
  4169. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
  4170. futureBatch.wait("execute");
  4171. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED))); // Commit leaves it locked...
  4172. prev.clear();
  4173. allDirty = false;
  4174. dirtyPaths.kill();
  4175. }
  4176. else
  4177. DBGLOG("No batch present??");
  4178. }
  4179. virtual void setUser(const char *user)
  4180. {
  4181. if (trackSecondaryChange(user, "@submitID"))
  4182. CLocalWorkUnit::setUser(user);
  4183. }
  4184. virtual void setClusterName(const char *cluster)
  4185. {
  4186. if (trackSecondaryChange(cluster, "@clusterName"))
  4187. CLocalWorkUnit::setClusterName(cluster);
  4188. }
  4189. virtual void setJobName(const char *jobname)
  4190. {
  4191. if (trackSecondaryChange(jobname, "@jobName"))
  4192. CLocalWorkUnit::setJobName(jobname);
  4193. }
  4194. virtual void setState(WUState state)
  4195. {
  4196. if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
  4197. CLocalWorkUnit::setState(state);
  4198. }
  4199. virtual void _lockRemote()
  4200. {
  4201. // Ignore locking for now!
  4202. // printStackReport();
  4203. // UNIMPLEMENTED;
  4204. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
  4205. }
  4206. virtual void _unlockRemote()
  4207. {
  4208. // printStackReport();
  4209. // UNIMPLEMENTED;
  4210. commit();
  4211. batch.clear();
  4212. }
  4213. virtual void subscribe(WUSubscribeOptions options)
  4214. {
  4215. // printStackReport();
  4216. // UNIMPLEMENTED;
  4217. }
  4218. virtual void unsubscribe()
  4219. {
  4220. // printStackReport();
  4221. // UNIMPLEMENTED;
  4222. }
  4223. virtual bool aborting() const
  4224. {
  4225. return false;
  4226. // MORE - work out what to do about aborts in Cassandra
  4227. // printStackReport();
  4228. // UNIMPLEMENTED;
  4229. }
  4230. virtual IWUResult * updateResultByName(const char * name)
  4231. {
  4232. return noteDirty(CLocalWorkUnit::updateResultByName(name));
  4233. }
  4234. virtual IWUResult * updateResultBySequence(unsigned seq)
  4235. {
  4236. return noteDirty(CLocalWorkUnit::updateResultBySequence(seq));
  4237. }
  4238. virtual IWUResult * updateTemporaryByName(const char * name)
  4239. {
  4240. return noteDirty(CLocalWorkUnit::updateTemporaryByName(name));
  4241. }
  4242. virtual IWUResult * updateVariableByName(const char * name)
  4243. {
  4244. return noteDirty(CLocalWorkUnit::updateVariableByName(name));
  4245. }
  4246. virtual IWUException *createException()
  4247. {
  4248. IWUException *result = CLocalWorkUnit::createException();
  4249. VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
  4250. noteDirty(xpath, wuExceptionsMappings);
  4251. return result;
  4252. }
  4253. virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
  4254. {
  4255. // Make sure that any required updates to the secondary files happen
  4256. IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
  4257. for (const char * const *search = searchPaths; *search; search++)
  4258. trackSecondaryChange(fromP->queryProp(*search), *search);
  4259. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  4260. checkChildLoaded(**table);
  4261. CLocalWorkUnit::copyWorkUnit(cached, all);
  4262. memset(childLoaded, 1, sizeof(childLoaded));
  4263. allDirty = true;
  4264. }
  4265. virtual void _loadResults() const
  4266. {
  4267. checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
  4268. CLocalWorkUnit::_loadResults();
  4269. }
  4270. virtual void _loadStatistics() const
  4271. {
  4272. checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
  4273. CLocalWorkUnit::_loadStatistics();
  4274. }
  4275. virtual void _loadExceptions() const
  4276. {
  4277. checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
  4278. CLocalWorkUnit::_loadExceptions();
  4279. }
  4280. virtual void clearExceptions()
  4281. {
  4282. CriticalBlock b(crit);
  4283. noteDirty("*Exceptions/Exception", wuExceptionsMappings);
  4284. CLocalWorkUnit::clearExceptions();
  4285. }
  4286. virtual IPropertyTree *queryPTree() const
  4287. {
  4288. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  4289. CriticalBlock b(crit);
  4290. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  4291. checkChildLoaded(**table);
  4292. return p;
  4293. }
  4294. protected:
  4295. // Delete child table rows
  4296. void deleteChildren(const char *wuid)
  4297. {
  4298. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  4299. sessionCache->deleteChildByWuid(table[0]->mappings, wuid, *batch);
  4300. }
  4301. // Lazy-populate a portion of WU xml from a child table
  4302. void checkChildLoaded(const ChildTableInfo &childTable) const
  4303. {
  4304. // NOTE - should be called inside critsec
  4305. if (!childLoaded[childTable.index])
  4306. {
  4307. CassandraResult result(sessionCache->fetchDataForWuid(childTable.mappings, queryWuid()));
  4308. Owned<IPTree> results;
  4309. CassandraIterator rows(cass_iterator_from_result(result));
  4310. while (cass_iterator_next(rows))
  4311. {
  4312. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  4313. Owned<IPTree> child;
  4314. if (!results)
  4315. results.setown(createPTree(childTable.parentElement));
  4316. child.setown(createPTree(childTable.childElement));
  4317. unsigned colidx = 2; // We did not fetch wuid or partition
  4318. while (cass_iterator_next(cols))
  4319. {
  4320. assertex(childTable.mappings[colidx].columnName);
  4321. const CassValue *value = cass_iterator_get_column(cols);
  4322. if (value && !cass_value_is_null(value))
  4323. childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
  4324. colidx++;
  4325. }
  4326. const char *childName = child->queryName();
  4327. results->addPropTree(childName, child.getClear());
  4328. }
  4329. if (results)
  4330. p->addPropTree(childTable.parentElement, results.getClear());
  4331. childLoaded[childTable.index] = true;
  4332. }
  4333. }
  4334. // Update secondary tables (used to search wuids by owner, state, jobname etc)
  4335. void updateSecondaryTable(const char *xpath, const char *prevKey, const char *wuid)
  4336. {
  4337. deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, *batch);
  4338. if (p->hasProp(xpath))
  4339. simpleXMLtoCassandra(sessionCache, *batch, searchMappings, p, xpath);
  4340. }
  4341. void deleteSecondaries(const char *wuid)
  4342. {
  4343. for (const char * const *search = searchPaths; *search; search++)
  4344. deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, *batch);
  4345. }
  4346. void updateSecondaries(const char *wuid)
  4347. {
  4348. const char * const *search;
  4349. for (search = searchPaths; *search; search++)
  4350. updateSecondaryTable(*search, prev->queryProp(*search), wuid);
  4351. for (search = wildSearchPaths; *search; search++)
  4352. {
  4353. if (p->hasProp(*search))
  4354. simpleXMLtoCassandra(sessionCache, *batch, uniqueSearchMappings, p, *search);
  4355. }
  4356. }
  4357. // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
  4358. bool trackSecondaryChange(const char *newval, const char *xpath)
  4359. {
  4360. if (!newval)
  4361. newval = "";
  4362. const char *oldval = p->queryProp(xpath);
  4363. if (!oldval)
  4364. oldval = "";
  4365. if (streq(newval, oldval))
  4366. return false; // No change
  4367. if (!prev)
  4368. {
  4369. prev.setown(createPTree());
  4370. prev->setProp(xpath, oldval);
  4371. }
  4372. else if (!prev->hasProp(xpath))
  4373. prev->setProp(xpath, oldval);
  4374. return true;
  4375. }
  4376. IWUResult *noteDirty(IWUResult *result)
  4377. {
  4378. if (result)
  4379. {
  4380. VStringBuffer xpath("Results/Result[@sequence='%d']", result->getResultSequence());
  4381. noteDirty(xpath, wuResultsMappings);
  4382. }
  4383. return result;
  4384. }
  4385. void noteDirty(const char *xpath, const CassandraXmlMapping *table)
  4386. {
  4387. dirtyPaths.setValue(xpath, table);
  4388. }
  4389. Linked<const ICassandraSession> sessionCache;
  4390. mutable bool abortDirty;
  4391. mutable bool abortState;
  4392. mutable bool childLoaded[ChildTablesSize];
  4393. bool allDirty;
  4394. Owned<IPTree> prev;
  4395. Owned<CassandraBatch> batch;
  4396. MapStringTo<const CassandraXmlMapping *> dirtyPaths;
  4397. };
  4398. class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
  4399. {
  4400. IMPLEMENT_IINTERFACE;
  4401. public:
  4402. CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
  4403. {
  4404. StringArray options;
  4405. Owned<IPTreeIterator> it = props->getElements("Option");
  4406. ForEach(*it)
  4407. {
  4408. IPTree &item = it->query();
  4409. const char *opt = item.queryProp("@name");
  4410. const char *val = item.queryProp("@value");
  4411. if (opt && val)
  4412. {
  4413. if (strieq(opt, "randomWuidSuffix"))
  4414. randomizeSuffix = atoi(val);
  4415. else if (strieq(opt, "traceLevel"))
  4416. traceLevel = atoi(val);
  4417. else
  4418. {
  4419. VStringBuffer optstr("%s=%s", opt, val);
  4420. options.append(optstr);
  4421. }
  4422. }
  4423. }
  4424. cluster.setOptions(options);
  4425. if (cluster.keyspace.isEmpty())
  4426. cluster.keyspace.set("hpcc");
  4427. connect();
  4428. cacheRetirer.start();
  4429. }
  4430. ~CCasssandraWorkUnitFactory()
  4431. {
  4432. cacheRetirer.stop();
  4433. cacheRetirer.join();
  4434. }
  4435. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  4436. {
  4437. unsigned suffix;
  4438. unsigned suffixLength;
  4439. if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
  4440. {
  4441. suffix = rand_r(&randState);
  4442. suffixLength = randomizeSuffix;
  4443. }
  4444. else
  4445. {
  4446. suffix = 0;
  4447. suffixLength = 0;
  4448. }
  4449. Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
  4450. loop
  4451. {
  4452. // Create a unique WUID by adding suffixes until we managed to add a new value
  4453. StringBuffer useWuid(wuid);
  4454. if (suffix)
  4455. {
  4456. useWuid.append("-");
  4457. for (unsigned i = 0; i < suffixLength; i++)
  4458. {
  4459. useWuid.appendf("%c", '0'+suffix%10);
  4460. suffix /= 10;
  4461. }
  4462. }
  4463. CassandraStatement statement(cass_prepared_bind(*prepared));
  4464. check(cass_statement_bind_int32(statement, 0, rtlHash32VStr(useWuid.str(), 0) % NUM_PARTITIONS));
  4465. check(cass_statement_bind_string(statement, 1, useWuid.str()));
  4466. if (traceLevel >= 2)
  4467. DBGLOG("Try creating %s", useWuid.str());
  4468. CassandraFuture future(cass_session_execute(session, statement));
  4469. future.wait("execute");
  4470. CassandraResult result(cass_future_get_result(future));
  4471. if (cass_result_column_count(result)==1)
  4472. {
  4473. // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
  4474. // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
  4475. Owned<IPTree> wuXML = createPTree(useWuid);
  4476. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  4477. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
  4478. wu->lockRemote(true);
  4479. return wu.getClear();
  4480. }
  4481. suffix = rand_r(&randState);
  4482. if (suffixLength<9)
  4483. suffixLength++;
  4484. }
  4485. }
  4486. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
  4487. {
  4488. // MORE - what to do about lock?
  4489. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  4490. if (wuXML)
  4491. return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
  4492. else
  4493. return NULL;
  4494. }
  4495. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  4496. {
  4497. // Ignore locking for now
  4498. // Note - in Dali, this would lock for write, whereas _openWorkUnit would either lock for read (if lock set) or not lock at all
  4499. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  4500. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
  4501. wu->lockRemote(true);
  4502. return wu.getClear();
  4503. }
  4504. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) { UNIMPLEMENTED; }
  4505. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
  4506. {
  4507. return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
  4508. }
  4509. virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
  4510. {
  4511. return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
  4512. }
  4513. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
  4514. unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
  4515. ISecManager *secmgr, ISecUser *secuser)
  4516. {
  4517. // Note that we only support a single sort order.
  4518. // Any sort order other than WUID ASC or WUID DESC will require post-sorting - if there are more than WUID_LOCALSORT_LIMIT we will refuse
  4519. // We need a single 'hard' filter, and do others by post-filtering. If the best we can do for a 'hard' filter is itself wild, we
  4520. // have to do two hits - one to find all the matching values, and the second to do a merge of all the results for each value.
  4521. // We should encourage the UI to present drop-lists of users for filtering.
  4522. // Any post-filter that Cassandra CAN do (via ALLOW FILTERING) it should? This seems to be an empty set unless we add secondary indexes AND give up on sorted results...
  4523. // Any that it can't (e.g. wild) we post-filter client-side.
  4524. // Wild can be translated into ranges but we then end up losing the sorting (well, we end up sorted by the filter field first, then wuid. This may actually be desirable in some situations.
  4525. // Alternatively we can transform into a set and merge multiple queries.
  4526. // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids - this can be join-merged if other filters are present.
  4527. // Searching by application values are done as if each was a new (user-defined) attribute.
  4528. // At present we assume that the order in which filters are provided indicates the best order to apply them - this may not be smart
  4529. Owned<CCassandraWuUQueryCacheEntry> cached;
  4530. if (cachehint && *cachehint)
  4531. {
  4532. CriticalBlock b(cacheCrit);
  4533. cached.set(cacheIdMap.getValue(*cachehint));
  4534. }
  4535. if (cached)
  4536. cached->touch();
  4537. else
  4538. cached.setown(new CCassandraWuUQueryCacheEntry());
  4539. const WUSortField *thisFilter = filters;
  4540. CIArrayOf<PostFilter> goodFilters;
  4541. CIArrayOf<PostFilter> wuidFilters;
  4542. CIArrayOf<PostFilter> poorFilters;
  4543. CIArrayOf<PostFilter> remoteWildFilters;
  4544. Owned<IConstWorkUnitIteratorEx> result;
  4545. WUSortField baseSort = (WUSortField) (sortorder & 0xff);
  4546. StringBuffer thorTimeThreshold;
  4547. bool sortByThorTime = (baseSort == WUSFtotalthortime);
  4548. bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
  4549. bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
  4550. if (!result)
  4551. {
  4552. Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid // MORE - except when we merge by thor time....
  4553. if (startOffset)
  4554. {
  4555. StringBuffer startWuid;
  4556. unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
  4557. if (found)
  4558. {
  4559. if (!sortByThorTime)
  4560. {
  4561. if (sortDescending)
  4562. startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
  4563. else
  4564. startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
  4565. thorTimeThreshold.clear();
  4566. }
  4567. wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
  4568. startOffset -= found;
  4569. merger->setStartOffset(found);
  4570. }
  4571. }
  4572. const char *fv = (const char *) filterbuf;
  4573. while (thisFilter && *thisFilter)
  4574. {
  4575. WUSortField field = (WUSortField) (*thisFilter & 0xff);
  4576. bool isWild = (*thisFilter & WUSFwild) != 0;
  4577. switch (field)
  4578. {
  4579. case WUSFuser:
  4580. case WUSFcluster:
  4581. case WUSFjob:
  4582. if (isWild)
  4583. {
  4584. StringBuffer s(fv);
  4585. if (s.charAt(s.length()-1)== '*')
  4586. s.remove(s.length()-1, 1);
  4587. if (s.length())
  4588. remoteWildFilters.append(*new PostFilter(field, s, true)); // Trailing-only wildcards can be done remotely
  4589. }
  4590. else
  4591. goodFilters.append(*new PostFilter(field, fv, false));
  4592. break;
  4593. case WUSFstate:
  4594. case WUSFpriority:
  4595. case WUSFprotected:
  4596. // These can't be wild, but are not very good filters
  4597. poorFilters.append(*new PostFilter(field, fv, false));
  4598. break;
  4599. case WUSFwuid: // Acts as wuidLo when specified as a filter
  4600. case WUSFwuidhigh:
  4601. // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
  4602. if (sortByThorTime)
  4603. remoteWildFilters.append(*new PostFilter(field, fv, true));
  4604. else
  4605. mergeFilter(wuidFilters, field, fv);
  4606. break;
  4607. case WUSFfileread:
  4608. UNIMPLEMENTED;
  4609. case WUSFtotalthortime:
  4610. // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
  4611. if (thorTimeThreshold.isEmpty()) // If not a continuation
  4612. formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
  4613. break;
  4614. case WUSFwildwuid:
  4615. // Translate into a range - note that we only support trailing * wildcard.
  4616. if (fv && *fv)
  4617. {
  4618. StringBuffer s(fv);
  4619. if (s.charAt(s.length()-1)== '*')
  4620. s.remove(s.length()-1, 1);
  4621. if (s.length())
  4622. {
  4623. mergeFilter(wuidFilters, WUSFwuid, s);
  4624. s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
  4625. mergeFilter(wuidFilters, WUSFwuidhigh, s);
  4626. }
  4627. }
  4628. break;
  4629. case WUSFcustom:
  4630. UNIMPLEMENTED;
  4631. case WUSFecl: // This is different...
  4632. if (isWild)
  4633. merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
  4634. else
  4635. goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
  4636. }
  4637. thisFilter++;
  4638. fv = fv + strlen(fv)+1;
  4639. }
  4640. if (sortByThorTime)
  4641. {
  4642. merger->addPostFilters(goodFilters, 0);
  4643. merger->addPostFilters(poorFilters, 0);
  4644. merger->addPostFilters(remoteWildFilters, 0);
  4645. if (wuidFilters.length())
  4646. {
  4647. // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
  4648. // We need two queries - one where searchField==startSearchField and wuid > startWuid,
  4649. // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
  4650. // though there may be postfilters
  4651. assertex(wuidFilters.length()==1);
  4652. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4653. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4654. }
  4655. else
  4656. merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4657. }
  4658. else if (goodFilters.length())
  4659. {
  4660. merger->addPostFilters(goodFilters, 1);
  4661. merger->addPostFilters(poorFilters, 0);
  4662. merger->addPostFilters(remoteWildFilters, 0);
  4663. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(goodFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4664. }
  4665. else if (poorFilters.length())
  4666. {
  4667. merger->addPostFilters(poorFilters, 1);
  4668. merger->addPostFilters(remoteWildFilters, 0);
  4669. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(poorFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4670. }
  4671. else if (remoteWildFilters.length())
  4672. {
  4673. merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
  4674. // Convert into a value IN [] which we do via a merge
  4675. // MORE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
  4676. StringArray fieldValues;
  4677. PostFilter &best= remoteWildFilters.item(0);
  4678. _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
  4679. ForEachItemIn(idx, fieldValues)
  4680. {
  4681. PostFilter p(best.queryField(), fieldValues.item(idx), false);
  4682. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(p, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4683. }
  4684. }
  4685. else
  4686. {
  4687. // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
  4688. for (int i = 0; i < NUM_PARTITIONS; i++)
  4689. {
  4690. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  4691. }
  4692. }
  4693. // The result we have will be sorted by wuid (ascending or descending)
  4694. if (needsPostSort)
  4695. {
  4696. // A post-sort will be required.
  4697. // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
  4698. result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
  4699. cached->setResult(result);
  4700. }
  4701. else
  4702. result.setown(merger.getClear());
  4703. }
  4704. if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
  4705. result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
  4706. if (cachehint)
  4707. {
  4708. *cachehint = cached->queryHint();
  4709. CriticalBlock b(cacheCrit);
  4710. cacheIdMap.setValue(*cachehint, cached.getClear());
  4711. }
  4712. if (total)
  4713. *total = 0; // We don't know
  4714. return result.getClear();
  4715. }
  4716. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  4717. {
  4718. return _getUniqueValues(queryFilterXPath(field), prefix, result);
  4719. }
  4720. virtual unsigned numWorkUnits()
  4721. {
  4722. Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits;");
  4723. CassandraStatement statement(cass_prepared_bind(*prepared));
  4724. CassandraFuture future(cass_session_execute(session, statement));
  4725. future.wait("select count(*)");
  4726. CassandraResult result(cass_future_get_result(future));
  4727. return getUnsignedResult(NULL, getSingleResult(result));
  4728. }
  4729. /*
  4730. virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  4731. virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
  4732. virtual bool isAborting(const char *wuid) const { UNIMPLEMENTED; }
  4733. virtual void clearAborting(const char *wuid) { UNIMPLEMENTED; }
  4734. */
  4735. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
  4736. {
  4737. VStringBuffer select("select state from workunits where wuid = '%s';", wuid);
  4738. CassandraStatement statement(cass_statement_new(select.str(), 0));
  4739. unsigned start = msTick();
  4740. loop
  4741. {
  4742. CassandraFuture future(cass_session_execute(session, statement));
  4743. future.wait("Lookup wu state");
  4744. CassandraResult result(cass_future_get_result(future));
  4745. const CassValue *value = getSingleResult(result);
  4746. if (value == NULL)
  4747. return WUStateUnknown;
  4748. const char *output;
  4749. size_t length;
  4750. check(cass_value_get_string(value, &output, &length));
  4751. StringBuffer stateStr(length, output);
  4752. WUState state = getWorkUnitState(stateStr);
  4753. switch (state)
  4754. {
  4755. case WUStateCompiled:
  4756. case WUStateUploadingFiles:
  4757. if (compiled)
  4758. return state;
  4759. break;
  4760. case WUStateCompleted:
  4761. case WUStateFailed:
  4762. case WUStateAborted:
  4763. return state;
  4764. case WUStateWait:
  4765. if (returnOnWaitState)
  4766. return state;
  4767. break;
  4768. case WUStateCompiling:
  4769. case WUStateRunning:
  4770. case WUStateDebugPaused:
  4771. case WUStateDebugRunning:
  4772. case WUStateBlocked:
  4773. case WUStateAborting:
  4774. // MORE - can see if agent still running, and set to failed if it is not
  4775. break;
  4776. }
  4777. unsigned waited = msTick() - start;
  4778. if (timeout != -1 && waited > timeout)
  4779. {
  4780. return WUStateUnknown;
  4781. break;
  4782. }
  4783. Sleep(1000); // MORE - may want to back off as waited gets longer...
  4784. }
  4785. }
  4786. unsigned validateRepository(bool fix)
  4787. {
  4788. unsigned errCount = 0;
  4789. // MORE - if the batch gets too big you may need to flush it occasionally
  4790. CassandraBatch batch(fix ? cass_batch_new(CASS_BATCH_TYPE_LOGGED) : NULL);
  4791. // 1. Check that every entry in main wu table has matching entries in secondary tables
  4792. CassandraResult result(fetchData(workunitInfoMappings+1));
  4793. CassandraIterator rows(cass_iterator_from_result(result));
  4794. if (batch)
  4795. {
  4796. // Delete the unique values table - the validate process recreates it afresh
  4797. CassandraStatement truncate(cass_statement_new("TRUNCATE uniqueSearchValues", 0));
  4798. check(cass_batch_add_statement(batch, truncate));
  4799. }
  4800. while (cass_iterator_next(rows))
  4801. {
  4802. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
  4803. const char *wuid = wuXML->queryName();
  4804. // For each search entry, check that we get matching XML
  4805. for (const char * const *search = searchPaths; *search; search++)
  4806. errCount += validateSearch(*search, wuid, wuXML, batch);
  4807. }
  4808. // 2. Check that there are no orphaned entries in search or child tables
  4809. errCount += checkOrphans(searchMappings, 3, batch);
  4810. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  4811. errCount += checkOrphans(table[0]->mappings, 0, batch);
  4812. // 3. Commit fixes
  4813. if (batch)
  4814. {
  4815. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  4816. futureBatch.wait("Fix_repository");
  4817. }
  4818. return errCount;
  4819. }
  4820. virtual void deleteRepository(bool recreate)
  4821. {
  4822. // USE WITH CARE!
  4823. session.set(cass_session_new());
  4824. CassandraFuture future(cass_session_connect(session, cluster));
  4825. future.wait("connect without keyspace to delete");
  4826. VStringBuffer deleteKeyspace("DROP KEYSPACE IF EXISTS %s;", cluster.keyspace.get());
  4827. executeSimpleCommand(session, deleteKeyspace);
  4828. if (recreate)
  4829. connect();
  4830. else
  4831. session.set(NULL);
  4832. }
  4833. virtual void createRepository()
  4834. {
  4835. session.set(cass_session_new());
  4836. CassandraFuture future(cass_session_connect(session, cluster));
  4837. future.wait("connect without keyspace");
  4838. VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } ;", cluster.keyspace.get()); // MORE - options from props? Not 100% sure if they are appropriate.
  4839. executeSimpleCommand(session, create);
  4840. connect();
  4841. ensureTable(session, workunitsMappings);
  4842. ensureTable(session, searchMappings);
  4843. ensureTable(session, uniqueSearchMappings);
  4844. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  4845. ensureTable(session, table[0]->mappings);
  4846. }
  4847. virtual const char *queryStoreType() const
  4848. {
  4849. return "Cassandra";
  4850. }
  4851. // Interface ICassandraSession
  4852. virtual CassSession *querySession() const { return session; };
  4853. virtual unsigned queryTraceLevel() const { return traceLevel; };
  4854. virtual CassandraPrepared *prepareStatement(const char *query) const
  4855. {
  4856. assertex(session);
  4857. CriticalBlock b(cacheCrit);
  4858. Linked<CassandraPrepared> cached = preparedCache.getValue(query);
  4859. if (cached)
  4860. {
  4861. if (traceLevel >= 2)
  4862. DBGLOG("prepareStatement: Reusing %s", query);
  4863. return cached.getClear();
  4864. }
  4865. {
  4866. if (traceLevel >= 2)
  4867. DBGLOG("prepareStatement: Binding %s", query);
  4868. // We don't want to block cache lookups while we prepare a new bound statement
  4869. // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
  4870. CriticalUnblock b(cacheCrit);
  4871. CassandraFuture futurePrep(cass_session_prepare(session, query));
  4872. futurePrep.wait("prepare statement");
  4873. cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep)));
  4874. }
  4875. preparedCache.setValue(query, cached); // NOTE - this links parameter
  4876. return cached.getClear();
  4877. }
  4878. private:
  4879. void connect()
  4880. {
  4881. session.set(cass_session_new());
  4882. CassandraFuture future(cass_session_connect_keyspace(session, cluster, cluster.keyspace));
  4883. future.wait("connect with keyspace");
  4884. }
  4885. bool checkWuExists(const char *wuid)
  4886. {
  4887. Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;");
  4888. CassandraStatement statement(cass_prepared_bind(*prepared));
  4889. cass_statement_bind_int32(statement, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  4890. cass_statement_bind_string(statement, 1, wuid);
  4891. CassandraFuture future(cass_session_execute(session, statement));
  4892. future.wait("select count(*)");
  4893. CassandraResult result(cass_future_get_result(future));
  4894. return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
  4895. }
  4896. void mergeFilter(CIArrayOf<PostFilter> &filters, WUSortField field, const char *value)
  4897. {
  4898. // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
  4899. ForEachItemIn(idx, filters)
  4900. {
  4901. PostFilter &filter = filters.item(idx);
  4902. if (filter.queryField()==field)
  4903. {
  4904. const char *prevLimit = filter.queryValue();
  4905. int diff = strcmp(prevLimit, value);
  4906. if (diff && ((diff < 0) == (field==WUSFwuid)))
  4907. filter.setValue(value);
  4908. return;
  4909. }
  4910. }
  4911. // Not found - add new filter
  4912. filters.append(*new PostFilter(field, value, true));
  4913. }
  4914. IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
  4915. {
  4916. Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
  4917. if (!key || !*key)
  4918. {
  4919. CIArrayOf<PostFilter> wuidFilters;
  4920. for (int i = 0; i < NUM_PARTITIONS; i++)
  4921. {
  4922. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
  4923. }
  4924. }
  4925. else
  4926. merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
  4927. return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
  4928. }
  4929. StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
  4930. {
  4931. if (prefix && strlen(prefix) >= CASS_SEARCH_PREFIX_SIZE)
  4932. {
  4933. CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
  4934. CassandraIterator rows(cass_iterator_from_result(r));
  4935. StringBuffer value;
  4936. while (cass_iterator_next(rows))
  4937. {
  4938. const CassRow *row = cass_iterator_get_row(rows);
  4939. getCassString(value.clear(), cass_row_get_column(row, 0));
  4940. result.append(value);
  4941. }
  4942. }
  4943. return result;
  4944. }
  4945. unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, CassBatch *batch)
  4946. {
  4947. unsigned errCount = 0;
  4948. const char *childKey = wuXML->queryProp(xpath);
  4949. if (childKey && *childKey)
  4950. {
  4951. CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
  4952. if (batch)
  4953. simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
  4954. switch (cass_result_row_count(result))
  4955. {
  4956. case 0:
  4957. DBGLOG("Missing search data for %s for wuid=%s key=%s", xpath, wuid, childKey);
  4958. if (batch)
  4959. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  4960. errCount++;
  4961. break;
  4962. case 1:
  4963. {
  4964. Owned<IPTree> secXML = rowToPTree(xpath, childKey, searchMappings+4, cass_result_first_row(result)); // type, prefix, key, and wuid are not returned
  4965. secXML->renameProp("/", wuid);
  4966. if (!areMatchingPTrees(wuXML, secXML))
  4967. {
  4968. DBGLOG("Mismatched search data for %s for wuid %s", xpath, wuid);
  4969. if (batch)
  4970. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  4971. errCount++;
  4972. }
  4973. break;
  4974. }
  4975. default:
  4976. DBGLOG("Multiple secondary data %d for %s for wuid %s", (int) cass_result_row_count(result), xpath, wuid); // This should be impossible!
  4977. if (batch)
  4978. {
  4979. deleteSecondaryByKey(xpath, childKey, wuid, this, batch);
  4980. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  4981. }
  4982. break;
  4983. }
  4984. }
  4985. return errCount;
  4986. }
  4987. unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, CassBatch *batch)
  4988. {
  4989. unsigned errCount = 0;
  4990. CassandraResult result(fetchData(mappings));
  4991. CassandraIterator rows(cass_iterator_from_result(result));
  4992. while (cass_iterator_next(rows))
  4993. {
  4994. const CassRow *row = cass_iterator_get_row(rows);
  4995. StringBuffer wuid;
  4996. getCassString(wuid, cass_row_get_column(row, wuidIndex));
  4997. if (!checkWuExists(wuid))
  4998. {
  4999. DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
  5000. if (batch)
  5001. {
  5002. if (wuidIndex)
  5003. {
  5004. StringBuffer xpath, fieldValue;
  5005. getCassString(xpath, cass_row_get_column(row, 0));
  5006. getCassString(fieldValue, cass_row_get_column(row, 2));
  5007. deleteSecondaryByKey(xpath, fieldValue, wuid, this, batch);
  5008. }
  5009. else
  5010. deleteChildByWuid(mappings, wuid, batch);
  5011. }
  5012. errCount++;
  5013. }
  5014. }
  5015. return errCount;
  5016. }
  5017. IPTree *cassandraToWorkunitXML(const char *wuid) const
  5018. {
  5019. CassandraResult result(fetchDataForWuid(workunitsMappings, wuid));
  5020. CassandraIterator rows(cass_iterator_from_result(result));
  5021. if (cass_iterator_next(rows)) // should just be one
  5022. {
  5023. Owned<IPTree> wuXML = createPTree(wuid);
  5024. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  5025. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  5026. unsigned colidx = 2; // wuid and partition are not returned
  5027. while (cass_iterator_next(cols))
  5028. {
  5029. assertex(workunitsMappings[colidx].columnName);
  5030. const CassValue *value = cass_iterator_get_column(cols);
  5031. if (value && !cass_value_is_null(value))
  5032. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  5033. colidx++;
  5034. }
  5035. return wuXML.getClear();
  5036. }
  5037. else
  5038. return NULL;
  5039. }
  5040. // Fetch all rows from a table
  5041. const CassResult *fetchData(const CassandraXmlMapping *mappings) const
  5042. {
  5043. StringBuffer names;
  5044. StringBuffer tableName;
  5045. getFieldNames(mappings, names, tableName);
  5046. VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
  5047. selectQuery.append(';');
  5048. if (traceLevel >= 2)
  5049. DBGLOG("%s", selectQuery.str());
  5050. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5051. return executeQuery(session, statement);
  5052. }
  5053. // Fetch all rows from a single partition of a table
  5054. const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const CIArrayOf<PostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
  5055. {
  5056. StringBuffer names;
  5057. StringBuffer tableName;
  5058. getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
  5059. VStringBuffer selectQuery("select %s from %s where partition=%d", names.str()+1, tableName.str(), partition);
  5060. ForEachItemIn(idx, wuidFilters)
  5061. {
  5062. const PostFilter &wuidFilter = wuidFilters.item(idx);
  5063. selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
  5064. }
  5065. switch (sortOrder)
  5066. {
  5067. case WUSFwuid:
  5068. selectQuery.append(" ORDER BY WUID ASC");
  5069. break;
  5070. case WUSFwuid|WUSFreverse:
  5071. // If not wuid, descending, we will have to post-sort
  5072. selectQuery.append(" ORDER BY WUID DESC");
  5073. break;
  5074. default:
  5075. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  5076. selectQuery.append(" ORDER BY WUID DESC");
  5077. if (!limit)
  5078. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  5079. break;
  5080. }
  5081. if (limit)
  5082. selectQuery.appendf(" LIMIT %u", limit);
  5083. selectQuery.append(';');
  5084. if (traceLevel >= 2)
  5085. DBGLOG("%s", selectQuery.str());
  5086. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5087. return executeQuery(session, statement);
  5088. }
  5089. // Fetch matching rows from a child table, or the main wu table
  5090. const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid) const
  5091. {
  5092. assertex(wuid && *wuid);
  5093. StringBuffer names;
  5094. StringBuffer tableName;
  5095. getFieldNames(mappings+2, names, tableName); // mappings+1 means we don't return the partition or wuid columns
  5096. VStringBuffer selectQuery("select %s from %s where partition=%d and wuid='%s';", names.str()+1, tableName.str(), rtlHash32VStr(wuid, 0) % NUM_PARTITIONS, wuid); // MORE - should consider using prepared/bind for this - is it faster?
  5097. if (traceLevel >= 2)
  5098. DBGLOG("%s", selectQuery.str());
  5099. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5100. return executeQuery(session, statement);
  5101. }
  5102. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  5103. const CassResult *fetchDataForKey(const char *xpath, const char *key) const
  5104. {
  5105. assertex(key);
  5106. StringBuffer names;
  5107. StringBuffer tableName;
  5108. StringBuffer ucKey(key);
  5109. ucKey.toUpperCase();
  5110. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  5111. VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str()); // MORE - should consider using prepared/bind for this - is it faster?
  5112. selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
  5113. if (traceLevel >= 2)
  5114. DBGLOG("%s", selectQuery.str());
  5115. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5116. return executeQuery(session, statement);
  5117. }
  5118. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  5119. const CassResult *fetchDataForKeyWithFilter(const PostFilter &filter, const CIArrayOf<PostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
  5120. {
  5121. const char *xpath = filter.queryXPath();
  5122. const char *key = filter.queryValue();
  5123. assertex(key);
  5124. StringBuffer names;
  5125. StringBuffer tableName;
  5126. StringBuffer ucKey(key);
  5127. ucKey.toUpperCase();
  5128. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  5129. VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str());
  5130. ForEachItemIn(idx, wuidFilters)
  5131. {
  5132. const PostFilter &wuidFilter = wuidFilters.item(idx);
  5133. selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
  5134. }
  5135. switch (sortOrder)
  5136. {
  5137. case WUSFwuid:
  5138. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  5139. break;
  5140. case WUSFwuid|WUSFreverse:
  5141. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  5142. break;
  5143. default:
  5144. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  5145. selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
  5146. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  5147. break;
  5148. }
  5149. if (limit)
  5150. selectQuery.appendf(" LIMIT %u", limit);
  5151. if (traceLevel >= 2)
  5152. DBGLOG("%s", selectQuery.str());
  5153. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5154. return executeQuery(session, statement);
  5155. }
  5156. // Fetch matching rows from the search or uniqueSearch table, for a given prefix
  5157. const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
  5158. {
  5159. assertex(prefix && *prefix);
  5160. StringBuffer names;
  5161. StringBuffer tableName;
  5162. StringBuffer ucKey(prefix);
  5163. ucKey.toUpperCase();
  5164. StringBuffer ucKeyEnd(ucKey);
  5165. size32_t len = ucKeyEnd.length();
  5166. assertex(len);
  5167. ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
  5168. getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  5169. VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue >='%s' and fieldValue < '%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str(), ucKeyEnd.str()); // MORE - should consider using prepared/bind for this - is it faster?
  5170. if (traceLevel >= 2)
  5171. DBGLOG("%s", selectQuery.str());
  5172. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5173. return executeQuery(session, statement);
  5174. }
  5175. // Fetch rows from the search table, by thorTime, above a threshold
  5176. const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
  5177. {
  5178. StringBuffer names;
  5179. StringBuffer tableName;
  5180. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  5181. VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "");
  5182. if (threshold && *threshold)
  5183. selectQuery.appendf(" where fieldValue >= '%s'", threshold);
  5184. if (descending)
  5185. selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
  5186. else
  5187. selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
  5188. if (limit)
  5189. selectQuery.appendf(" LIMIT %u", limit);
  5190. selectQuery.append(';');
  5191. if (traceLevel >= 2)
  5192. DBGLOG("%s", selectQuery.str());
  5193. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5194. return executeQuery(session, statement);
  5195. }
  5196. // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
  5197. // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
  5198. const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
  5199. {
  5200. StringBuffer names;
  5201. StringBuffer tableName;
  5202. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  5203. const char *wuidTest;
  5204. const char *fieldTest;
  5205. if (descending)
  5206. {
  5207. wuidTest = ">";
  5208. fieldTest = wuid ? "=" : "<";
  5209. }
  5210. else
  5211. {
  5212. wuidTest = "<";
  5213. fieldTest = wuid ? "=" : ">";
  5214. }
  5215. VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s' and fieldValue %s '%s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "", fieldTest, threshold);
  5216. if (wuid)
  5217. selectQuery.appendf(" and wuid %s '%s'", wuidTest, wuid);
  5218. if (descending)
  5219. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  5220. else
  5221. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  5222. if (limit)
  5223. selectQuery.appendf(" LIMIT %u", limit);
  5224. selectQuery.append(';');
  5225. if (traceLevel >= 2)
  5226. DBGLOG("%s", selectQuery.str());
  5227. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5228. return executeQuery(session, statement);
  5229. }
  5230. // Fetch matching rows from the search table, for a single wuid
  5231. const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
  5232. {
  5233. assertex(key);
  5234. StringBuffer names;
  5235. StringBuffer tableName;
  5236. StringBuffer ucKey(key);
  5237. ucKey.toUpperCase();
  5238. getFieldNames(searchMappings+4, names, tableName); // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
  5239. VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s' and wuid='%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str(), wuid); // MORE - should consider using prepared/bind for this - is it faster?
  5240. if (traceLevel >= 2)
  5241. DBGLOG("%s", selectQuery.str());
  5242. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  5243. return executeQuery(session, statement);
  5244. }
  5245. // Delete matching rows from a child table
  5246. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const
  5247. {
  5248. StringBuffer names;
  5249. StringBuffer tableName;
  5250. getFieldNames(mappings, names, tableName);
  5251. VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
  5252. Owned<CassandraPrepared> prepared = prepareStatement(insertQuery);
  5253. CassandraStatement update(cass_prepared_bind(*prepared));
  5254. check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
  5255. check(cass_statement_bind_string(update, 1, wuid));
  5256. check(cass_batch_add_statement(batch, update));
  5257. }
  5258. unsigned retireCache()
  5259. {
  5260. CriticalBlock b(cacheCrit); // Is this too coarse-grained?
  5261. unsigned expires = CASS_WU_QUERY_EXPIRES;
  5262. unsigned now = msTick();
  5263. ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
  5264. HashIterator iter(cacheIdMap);
  5265. ForEach(iter)
  5266. {
  5267. CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
  5268. unsigned age = now - entry->queryLastAccess();
  5269. int ttl = CASS_WU_QUERY_EXPIRES-age;
  5270. if (ttl<= 0)
  5271. goers.append(*entry);
  5272. else if (ttl< expires)
  5273. expires = ttl;
  5274. }
  5275. ForEachItemIn(idx, goers)
  5276. {
  5277. DBGLOG("Expiring cache entry %p", &goers.item(idx));
  5278. cacheIdMap.remove(goers.item(idx).queryHint());
  5279. }
  5280. return expires;
  5281. }
  5282. class CacheRetirer : public Thread
  5283. {
  5284. public:
  5285. CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
  5286. {
  5287. stopping = false;
  5288. }
  5289. virtual int run()
  5290. {
  5291. while (!stopping)
  5292. {
  5293. unsigned delay = parent.retireCache();
  5294. sem.wait(delay);
  5295. }
  5296. return 0;
  5297. }
  5298. void stop()
  5299. {
  5300. stopping = true;
  5301. sem.signal();
  5302. }
  5303. private:
  5304. Semaphore sem;
  5305. CCasssandraWorkUnitFactory &parent;
  5306. bool stopping;
  5307. } cacheRetirer;
  5308. unsigned randomizeSuffix;
  5309. unsigned traceLevel;
  5310. unsigned randState;
  5311. CassandraCluster cluster;
  5312. CassandraSession session;
  5313. mutable CriticalSection cacheCrit; // protects both of the caches below... we could separate
  5314. mutable MapStringToMyClass<CassandraPrepared> preparedCache;
  5315. mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
  5316. };
  5317. } // namespace
  5318. extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *props)
  5319. {
  5320. return new cassandraembed::CCasssandraWorkUnitFactory(props);
  5321. }