cassandraembed.cpp 144 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jptree.hpp"
  28. #include "workunit.hpp"
  29. #include "workunit.ipp"
  30. #ifdef _WIN32
  31. #define EXPORT __declspec(dllexport)
  32. #else
  33. #define EXPORT
  34. #endif
  35. static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  36. static void UNSUPPORTED(const char *feature)
  37. {
  38. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  39. }
  40. static const char * compatibleVersions[] = {
  41. "Cassandra Embed Helper 1.0.0",
  42. NULL };
  43. static const char *version = "Cassandra Embed Helper 1.0.0";
  44. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  45. {
  46. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  47. {
  48. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  49. pbx->compatibleVersions = compatibleVersions;
  50. }
  51. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  52. return false;
  53. pb->magicVersion = PLUGIN_VERSION;
  54. pb->version = version;
  55. pb->moduleName = "cassandra";
  56. pb->ECL = NULL;
  57. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  58. pb->description = "Cassandra Embed Helper";
  59. return true;
  60. }
  61. namespace cassandraembed {
  62. static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  63. static void fail(const char *msg) __attribute__((noreturn));
  64. static void failx(const char *message, ...)
  65. {
  66. va_list args;
  67. va_start(args,message);
  68. StringBuffer msg;
  69. msg.append("cassandra: ").valist_appendf(message,args);
  70. va_end(args);
  71. rtlFail(0, msg.str());
  72. }
  73. static void fail(const char *message)
  74. {
  75. StringBuffer msg;
  76. msg.append("cassandra: ").append(message);
  77. rtlFail(0, msg.str());
  78. }
  79. // Wrappers to Cassandra structures that require corresponding releases
  80. class CassandraCluster : public CInterface
  81. {
  82. public:
  83. CassandraCluster(CassCluster *_cluster) : cluster(_cluster), batchMode((CassBatchType) -1)
  84. {
  85. }
  86. void setOptions(const StringArray &options)
  87. {
  88. const char *contact_points = "localhost";
  89. const char *user = "";
  90. const char *password = "";
  91. ForEachItemIn(idx, options)
  92. {
  93. const char *opt = options.item(idx);
  94. const char *val = strchr(opt, '=');
  95. if (val)
  96. {
  97. StringBuffer optName(val-opt, opt);
  98. val++;
  99. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  100. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  101. else if (stricmp(optName, "user")==0)
  102. user = val;
  103. else if (stricmp(optName, "password")==0)
  104. password = val;
  105. else if (stricmp(optName, "keyspace")==0)
  106. keyspace.set(val);
  107. else if (stricmp(optName, "batch")==0)
  108. {
  109. if (stricmp(val, "LOGGED")==0)
  110. batchMode = CASS_BATCH_TYPE_LOGGED;
  111. else if (stricmp(val, "UNLOGGED")==0)
  112. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  113. else if (stricmp(val, "COUNTER")==0)
  114. batchMode = CASS_BATCH_TYPE_COUNTER;
  115. }
  116. else if (stricmp(optName, "port")==0)
  117. {
  118. unsigned port = getUnsignedOption(val, "port");
  119. checkSetOption(cass_cluster_set_port(cluster, port), "port");
  120. }
  121. else if (stricmp(optName, "protocol_version")==0)
  122. {
  123. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  124. checkSetOption(cass_cluster_set_protocol_version(cluster, protocol_version), "protocol_version");
  125. }
  126. else if (stricmp(optName, "num_threads_io")==0)
  127. {
  128. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  129. cass_cluster_set_num_threads_io(cluster, num_threads_io); // No status return
  130. }
  131. else if (stricmp(optName, "queue_size_io")==0)
  132. {
  133. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  134. checkSetOption(cass_cluster_set_queue_size_io(cluster, queue_size_io), "queue_size_io");
  135. }
  136. else if (stricmp(optName, "core_connections_per_host")==0)
  137. {
  138. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  139. checkSetOption(cass_cluster_set_core_connections_per_host(cluster, core_connections_per_host), "core_connections_per_host");
  140. }
  141. else if (stricmp(optName, "max_connections_per_host")==0)
  142. {
  143. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  144. checkSetOption(cass_cluster_set_max_connections_per_host(cluster, max_connections_per_host), "max_connections_per_host");
  145. }
  146. else if (stricmp(optName, "max_concurrent_creation")==0)
  147. {
  148. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  149. checkSetOption(cass_cluster_set_max_concurrent_creation(cluster, max_concurrent_creation), "max_concurrent_creation");
  150. }
  151. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  152. {
  153. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  154. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  155. }
  156. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  157. {
  158. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  159. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  160. }
  161. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  162. {
  163. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  164. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  165. }
  166. else if (stricmp(optName, "connect_timeout")==0)
  167. {
  168. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  169. cass_cluster_set_connect_timeout(cluster, connect_timeout);
  170. }
  171. else if (stricmp(optName, "request_timeout")==0)
  172. {
  173. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  174. cass_cluster_set_request_timeout(cluster, request_timeout);
  175. }
  176. else
  177. failx("Unrecognized option %s", optName.str());
  178. }
  179. }
  180. cass_cluster_set_contact_points(cluster, contact_points);
  181. if (*user || *password)
  182. cass_cluster_set_credentials(cluster, user, password);
  183. }
  184. ~CassandraCluster()
  185. {
  186. if (cluster)
  187. cass_cluster_free(cluster);
  188. }
  189. inline operator CassCluster *() const
  190. {
  191. return cluster;
  192. }
  193. private:
  194. void checkSetOption(CassError rc, const char *name)
  195. {
  196. if (rc != CASS_OK)
  197. {
  198. failx("While setting option %s: %s", name, cass_error_desc(rc));
  199. }
  200. }
  201. unsigned getUnsignedOption(const char *val, const char *option)
  202. {
  203. char *endp;
  204. long value = strtoul(val, &endp, 0);
  205. if (endp==val || *endp != '\0' || value > INT_MAX || value < INT_MIN)
  206. failx("Invalid value '%s' for option %s", val, option);
  207. return (int) value;
  208. }
  209. CassandraCluster(const CassandraCluster &);
  210. CassCluster *cluster;
  211. public:
  212. // These are here as convenient to set from same options string. They are really properties of the session
  213. // rather than the cluster, but we have one session per cluster so we get away with it.
  214. CassBatchType batchMode;
  215. StringAttr keyspace;
  216. };
  217. class CassandraFuture : public CInterface
  218. {
  219. public:
  220. CassandraFuture(CassFuture *_future) : future(_future)
  221. {
  222. }
  223. ~CassandraFuture()
  224. {
  225. if (future)
  226. cass_future_free(future);
  227. }
  228. inline operator CassFuture *() const
  229. {
  230. return future;
  231. }
  232. void wait(const char *why)
  233. {
  234. cass_future_wait(future);
  235. CassError rc = cass_future_error_code(future);
  236. if(rc != CASS_OK)
  237. {
  238. CassString message = cass_future_error_message(future);
  239. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int)message.length, message.data);
  240. rtlFail(0, err.str());
  241. }
  242. }
  243. private:
  244. CassandraFuture(const CassandraFuture &);
  245. CassFuture *future;
  246. };
  247. class CassandraSession : public CInterface
  248. {
  249. public:
  250. CassandraSession() : session(NULL) {}
  251. CassandraSession(CassSession *_session) : session(_session)
  252. {
  253. }
  254. ~CassandraSession()
  255. {
  256. set(NULL);
  257. }
  258. void set(CassSession *_session)
  259. {
  260. if (session)
  261. {
  262. CassandraFuture close_future(cass_session_close(session));
  263. cass_future_wait(close_future);
  264. cass_session_free(session);
  265. }
  266. session = _session;
  267. }
  268. inline operator CassSession *() const
  269. {
  270. return session;
  271. }
  272. private:
  273. CassandraSession(const CassandraSession &);
  274. CassSession *session;
  275. };
  276. class CassandraBatch : public CInterface
  277. {
  278. public:
  279. CassandraBatch(CassBatch *_batch) : batch(_batch)
  280. {
  281. }
  282. ~CassandraBatch()
  283. {
  284. if (batch)
  285. cass_batch_free(batch);
  286. }
  287. inline operator CassBatch *() const
  288. {
  289. return batch;
  290. }
  291. private:
  292. CassandraBatch(const CassandraBatch &);
  293. CassBatch *batch;
  294. };
  295. class CassandraStatement : public CInterface
  296. {
  297. public:
  298. CassandraStatement(CassStatement *_statement) : statement(_statement)
  299. {
  300. }
  301. CassandraStatement(const char *simple) : statement(cass_statement_new(cass_string_init(simple), 0))
  302. {
  303. }
  304. ~CassandraStatement()
  305. {
  306. if (statement)
  307. cass_statement_free(statement);
  308. }
  309. inline operator CassStatement *() const
  310. {
  311. return statement;
  312. }
  313. private:
  314. CassandraStatement(const CassandraStatement &);
  315. CassStatement *statement;
  316. };
  317. class CassandraPrepared : public CInterfaceOf<IInterface>
  318. {
  319. public:
  320. CassandraPrepared(const CassPrepared *_prepared) : prepared(_prepared)
  321. {
  322. }
  323. ~CassandraPrepared()
  324. {
  325. if (prepared)
  326. cass_prepared_free(prepared);
  327. }
  328. inline operator const CassPrepared *() const
  329. {
  330. return prepared;
  331. }
  332. private:
  333. CassandraPrepared(const CassandraPrepared &);
  334. const CassPrepared *prepared;
  335. };
  336. class CassandraResult : public CInterface
  337. {
  338. public:
  339. CassandraResult(const CassResult *_result) : result(_result)
  340. {
  341. }
  342. ~CassandraResult()
  343. {
  344. if (result)
  345. cass_result_free(result);
  346. }
  347. inline operator const CassResult *() const
  348. {
  349. return result;
  350. }
  351. private:
  352. CassandraResult(const CassandraResult &);
  353. const CassResult *result;
  354. };
  355. class CassandraIterator : public CInterface
  356. {
  357. public:
  358. CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  359. {
  360. }
  361. ~CassandraIterator()
  362. {
  363. if (iterator)
  364. cass_iterator_free(iterator);
  365. }
  366. inline operator CassIterator *() const
  367. {
  368. return iterator;
  369. }
  370. private:
  371. CassandraIterator(const CassandraIterator &);
  372. CassIterator *iterator;
  373. };
  374. class CassandraCollection : public CInterface
  375. {
  376. public:
  377. CassandraCollection(CassCollection *_collection) : collection(_collection)
  378. {
  379. }
  380. ~CassandraCollection()
  381. {
  382. if (collection)
  383. cass_collection_free(collection);
  384. }
  385. inline operator CassCollection *() const
  386. {
  387. return collection;
  388. }
  389. private:
  390. CassandraCollection(const CassandraCollection &);
  391. CassCollection *collection;
  392. };
  393. void check(CassError rc)
  394. {
  395. if (rc != CASS_OK)
  396. {
  397. fail(cass_error_desc(rc));
  398. }
  399. }
  400. class CassandraStatementInfo : public CInterface
  401. {
  402. public:
  403. IMPLEMENT_IINTERFACE;
  404. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode)
  405. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
  406. {
  407. assertex(prepared && *prepared);
  408. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  409. }
  410. ~CassandraStatementInfo()
  411. {
  412. stop();
  413. }
  414. inline void stop()
  415. {
  416. iterator.clear();
  417. result.clear();
  418. prepared.clear();
  419. }
  420. bool next()
  421. {
  422. if (!iterator)
  423. return false;
  424. return cass_iterator_next(*iterator);
  425. }
  426. void startStream()
  427. {
  428. if (batchMode != (CassBatchType) -1)
  429. {
  430. batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
  431. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  432. }
  433. }
  434. void endStream()
  435. {
  436. if (batch)
  437. {
  438. CassandraFuture future(cass_session_execute_batch(*session, *batch));
  439. future.wait("execute");
  440. result.setown(new CassandraResult(cass_future_get_result(future)));
  441. assertex (rowCount() == 0);
  442. }
  443. }
  444. void execute()
  445. {
  446. assertex(statement && *statement);
  447. if (batch)
  448. {
  449. check(cass_batch_add_statement(*batch, *statement));
  450. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  451. }
  452. else
  453. {
  454. CassandraFuture future(cass_session_execute(*session, *statement));
  455. future.wait("execute");
  456. result.setown(new CassandraResult(cass_future_get_result(future)));
  457. if (rowCount() > 0)
  458. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  459. }
  460. }
  461. inline size_t rowCount() const
  462. {
  463. return cass_result_row_count(*result);
  464. }
  465. inline bool hasResult() const
  466. {
  467. return result != NULL;
  468. }
  469. inline const CassRow *queryRow() const
  470. {
  471. assertex(iterator && *iterator);
  472. return cass_iterator_get_row(*iterator);
  473. }
  474. inline CassStatement *queryStatement() const
  475. {
  476. assertex(statement && *statement);
  477. return *statement;
  478. }
  479. protected:
  480. Linked<CassandraSession> session;
  481. Linked<CassandraPrepared> prepared;
  482. Owned<CassandraBatch> batch;
  483. Owned<CassandraStatement> statement;
  484. Owned<CassandraResult> result;
  485. Owned<CassandraIterator> iterator;
  486. unsigned numBindings;
  487. CassBatchType batchMode;
  488. };
  489. // Conversions from Cassandra values to ECL data
  490. static const char *getTypeName(CassValueType type)
  491. {
  492. switch (type)
  493. {
  494. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  495. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  496. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  497. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  498. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  499. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  500. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  501. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  502. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  503. case CASS_VALUE_TYPE_INT: return "INT";
  504. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  505. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  506. case CASS_VALUE_TYPE_UUID: return "UUID";
  507. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  508. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  509. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  510. case CASS_VALUE_TYPE_INET: return "INET";
  511. case CASS_VALUE_TYPE_LIST: return "LIST";
  512. case CASS_VALUE_TYPE_MAP: return "MAP";
  513. case CASS_VALUE_TYPE_SET: return "SET";
  514. default: return "UNKNOWN";
  515. }
  516. }
  517. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  518. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  519. {
  520. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  521. if (field)
  522. msg.appendf(" for field %s", field->name->str());
  523. if (value)
  524. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  525. rtlFail(0, msg.str());
  526. }
  527. static bool isInteger(const CassValueType t)
  528. {
  529. switch (t)
  530. {
  531. case CASS_VALUE_TYPE_TIMESTAMP:
  532. case CASS_VALUE_TYPE_INT:
  533. case CASS_VALUE_TYPE_BIGINT:
  534. case CASS_VALUE_TYPE_COUNTER:
  535. case CASS_VALUE_TYPE_VARINT:
  536. return true;
  537. default:
  538. return false;
  539. }
  540. }
  541. static bool isString(CassValueType t)
  542. {
  543. switch (t)
  544. {
  545. case CASS_VALUE_TYPE_VARCHAR:
  546. case CASS_VALUE_TYPE_TEXT:
  547. case CASS_VALUE_TYPE_ASCII:
  548. return true;
  549. default:
  550. return false;
  551. }
  552. }
  553. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  554. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  555. {
  556. const RtlTypeInfo *type = field->type;
  557. if ((type->fieldType & RFTMkind) == type_set)
  558. return type->queryChildType();
  559. else
  560. return type;
  561. }
  562. static int getNumFields(const RtlTypeInfo *record)
  563. {
  564. int count = 0;
  565. const RtlFieldInfo * const *fields = record->queryFields();
  566. assertex(fields);
  567. while (*fields++)
  568. count++;
  569. return count;
  570. }
  571. static bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  572. {
  573. if (cass_value_is_null(value))
  574. {
  575. NullFieldProcessor p(field);
  576. return p.boolResult;
  577. }
  578. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  579. typeError("boolean", value, field);
  580. cass_bool_t output;
  581. check(cass_value_get_bool(value, &output));
  582. return output != cass_false;
  583. }
  584. static void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  585. {
  586. if (cass_value_is_null(value))
  587. {
  588. NullFieldProcessor p(field);
  589. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  590. return;
  591. }
  592. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  593. // it seems like it could be more useful to support anything
  594. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  595. // typeError("blob", value, field);
  596. CassBytes bytes;
  597. check(cass_value_get_bytes(value, &bytes));
  598. rtlStrToDataX(chars, result, bytes.size, bytes.data);
  599. }
  600. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  601. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  602. static double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  603. {
  604. if (cass_value_is_null(value))
  605. {
  606. NullFieldProcessor p(field);
  607. return p.doubleResult;
  608. }
  609. else if (isInteger(cass_value_type(value)))
  610. return (double) getSignedResult(field, value);
  611. else switch (cass_value_type(value))
  612. {
  613. case CASS_VALUE_TYPE_FLOAT:
  614. {
  615. cass_float_t output_f;
  616. check(cass_value_get_float(value, &output_f));
  617. return output_f;
  618. }
  619. case CASS_VALUE_TYPE_DOUBLE:
  620. {
  621. cass_double_t output_d;
  622. check(cass_value_get_double(value, &output_d));
  623. return output_d;
  624. }
  625. default:
  626. typeError("double", value, field);
  627. }
  628. }
  629. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  630. {
  631. if (cass_value_is_null(value))
  632. {
  633. NullFieldProcessor p(field);
  634. return p.intResult;
  635. }
  636. switch (cass_value_type(value))
  637. {
  638. case CASS_VALUE_TYPE_INT:
  639. {
  640. cass_int32_t output;
  641. check(cass_value_get_int32(value, &output));
  642. return output;
  643. }
  644. case CASS_VALUE_TYPE_TIMESTAMP:
  645. case CASS_VALUE_TYPE_BIGINT:
  646. case CASS_VALUE_TYPE_COUNTER:
  647. case CASS_VALUE_TYPE_VARINT:
  648. {
  649. cass_int64_t output;
  650. check(cass_value_get_int64(value, &output));
  651. return output;
  652. }
  653. default:
  654. typeError("integer", value, field);
  655. }
  656. }
  657. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  658. {
  659. if (cass_value_is_null(value))
  660. {
  661. NullFieldProcessor p(field);
  662. return p.uintResult;
  663. }
  664. return (__uint64) getSignedResult(field, value);
  665. }
  666. static void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  667. {
  668. if (cass_value_is_null(value))
  669. {
  670. NullFieldProcessor p(field);
  671. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  672. return;
  673. }
  674. switch (cass_value_type(value))
  675. {
  676. case CASS_VALUE_TYPE_ASCII:
  677. {
  678. CassString output;
  679. check(cass_value_get_string(value, &output));
  680. const char *text = output.data;
  681. unsigned long bytes = output.length;
  682. rtlStrToStrX(chars, result, bytes, text);
  683. break;
  684. }
  685. case CASS_VALUE_TYPE_VARCHAR:
  686. case CASS_VALUE_TYPE_TEXT:
  687. {
  688. CassString output;
  689. check(cass_value_get_string(value, &output));
  690. const char *text = output.data;
  691. unsigned long bytes = output.length;
  692. unsigned numchars = rtlUtf8Length(bytes, text);
  693. rtlUtf8ToStrX(chars, result, numchars, text);
  694. break;
  695. }
  696. default:
  697. typeError("string", value, field);
  698. }
  699. }
  700. static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  701. {
  702. if (cass_value_is_null(value))
  703. {
  704. NullFieldProcessor p(field);
  705. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  706. return;
  707. }
  708. switch (cass_value_type(value))
  709. {
  710. case CASS_VALUE_TYPE_ASCII:
  711. {
  712. CassString output;
  713. check(cass_value_get_string(value, &output));
  714. const char *text = output.data;
  715. unsigned long bytes = output.length;
  716. rtlStrToUtf8X(chars, result, bytes, text);
  717. break;
  718. }
  719. case CASS_VALUE_TYPE_VARCHAR:
  720. case CASS_VALUE_TYPE_TEXT:
  721. {
  722. CassString output;
  723. check(cass_value_get_string(value, &output));
  724. const char *text = output.data;
  725. unsigned long bytes = output.length;
  726. unsigned numchars = rtlUtf8Length(bytes, text);
  727. rtlUtf8ToUtf8X(chars, result, numchars, text);
  728. break;
  729. }
  730. default:
  731. typeError("string", value, field);
  732. }
  733. }
  734. static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  735. {
  736. if (cass_value_is_null(value))
  737. {
  738. NullFieldProcessor p(field);
  739. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  740. return;
  741. }
  742. switch (cass_value_type(value))
  743. {
  744. case CASS_VALUE_TYPE_ASCII:
  745. {
  746. CassString output;
  747. check(cass_value_get_string(value, &output));
  748. const char *text = output.data;
  749. unsigned long bytes = output.length;
  750. rtlStrToUnicodeX(chars, result, bytes, text);
  751. break;
  752. }
  753. case CASS_VALUE_TYPE_VARCHAR:
  754. case CASS_VALUE_TYPE_TEXT:
  755. {
  756. CassString output;
  757. check(cass_value_get_string(value, &output));
  758. const char *text = output.data;
  759. unsigned long bytes = output.length;
  760. unsigned numchars = rtlUtf8Length(bytes, text);
  761. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  762. break;
  763. }
  764. default:
  765. typeError("string", value, field);
  766. }
  767. }
  768. static void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  769. {
  770. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  771. if (cass_value_is_null(value))
  772. {
  773. NullFieldProcessor p(field);
  774. result.set(p.decimalResult);
  775. return;
  776. }
  777. size32_t chars;
  778. rtlDataAttr tempStr;
  779. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  780. result.setString(chars, tempStr.getstr());
  781. if (field)
  782. {
  783. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  784. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  785. }
  786. }
  787. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  788. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  789. {
  790. public:
  791. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  792. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  793. {
  794. }
  795. virtual bool getBooleanResult(const RtlFieldInfo *field)
  796. {
  797. return cassandraembed::getBooleanResult(field, nextField(field));
  798. }
  799. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  800. {
  801. cassandraembed::getDataResult(field, nextField(field), len, result);
  802. }
  803. virtual double getRealResult(const RtlFieldInfo *field)
  804. {
  805. return cassandraembed::getRealResult(field, nextField(field));
  806. }
  807. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  808. {
  809. return cassandraembed::getSignedResult(field, nextField(field));
  810. }
  811. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  812. {
  813. return cassandraembed::getUnsignedResult(field, nextField(field));
  814. }
  815. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  816. {
  817. cassandraembed::getStringResult(field, nextField(field), chars, result);
  818. }
  819. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  820. {
  821. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  822. }
  823. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  824. {
  825. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  826. }
  827. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  828. {
  829. cassandraembed::getDecimalResult(field, nextField(field), value);
  830. }
  831. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  832. {
  833. isAll = false;
  834. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  835. }
  836. virtual bool processNextSet(const RtlFieldInfo * field)
  837. {
  838. numIteratorFields = 1;
  839. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  840. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  841. }
  842. virtual void processBeginDataset(const RtlFieldInfo * field)
  843. {
  844. numIteratorFields = getNumFields(field->type->queryChildType());
  845. switch (numIteratorFields)
  846. {
  847. case 1:
  848. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  849. break;
  850. case 2:
  851. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  852. break;
  853. default:
  854. UNSUPPORTED("Nested datasets with > 2 fields");
  855. }
  856. }
  857. virtual void processBeginRow(const RtlFieldInfo * field)
  858. {
  859. }
  860. virtual bool processNextRow(const RtlFieldInfo * field)
  861. {
  862. nextIteratedField = 0;
  863. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  864. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  865. }
  866. virtual void processEndSet(const RtlFieldInfo * field)
  867. {
  868. iterator.clear();
  869. numIteratorFields = 0;
  870. }
  871. virtual void processEndDataset(const RtlFieldInfo * field)
  872. {
  873. iterator.clear();
  874. numIteratorFields = 0;
  875. }
  876. virtual void processEndRow(const RtlFieldInfo * field)
  877. {
  878. }
  879. protected:
  880. const CassValue *nextField(const RtlFieldInfo * field)
  881. {
  882. const CassValue *ret;
  883. if (iterator)
  884. {
  885. switch (numIteratorFields)
  886. {
  887. case 1:
  888. ret = cass_iterator_get_value(*iterator);
  889. break;
  890. case 2:
  891. if (nextIteratedField==0)
  892. ret = cass_iterator_get_map_key(*iterator);
  893. else
  894. ret = cass_iterator_get_map_value(*iterator);
  895. nextIteratedField++;
  896. break;
  897. default:
  898. throwUnexpected();
  899. }
  900. }
  901. else
  902. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  903. if (!ret)
  904. failx("Too many fields in ECL output row, reading field %s", field->name->getAtomNamePtr());
  905. return ret;
  906. }
  907. const CassandraStatementInfo *stmtInfo;
  908. Owned<CassandraIterator> iterator;
  909. int colIdx;
  910. int numIteratorFields;
  911. int nextIteratedField;
  912. };
  913. // Bind Cassandra columns from an ECL record
  914. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  915. {
  916. public:
  917. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  918. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  919. {
  920. }
  921. int numFields()
  922. {
  923. int count = 0;
  924. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  925. assertex(fields);
  926. while (*fields++)
  927. count++;
  928. return count;
  929. }
  930. void processRow(const byte *row)
  931. {
  932. thisParam = firstParam;
  933. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  934. }
  935. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  936. {
  937. size32_t utf8chars;
  938. rtlDataAttr utfText;
  939. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  940. if (collection)
  941. checkBind(cass_collection_append_string(*collection,
  942. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  943. field);
  944. else
  945. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  946. checkNextParam(field),
  947. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  948. field);
  949. }
  950. virtual void processBool(bool value, const RtlFieldInfo * field)
  951. {
  952. if (collection)
  953. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  954. else
  955. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  956. }
  957. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  958. {
  959. if (collection)
  960. checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
  961. else
  962. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
  963. }
  964. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  965. {
  966. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  967. {
  968. if (collection)
  969. checkBind(cass_collection_append_int64(*collection, value), field);
  970. else
  971. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  972. }
  973. else
  974. {
  975. if (collection)
  976. checkBind(cass_collection_append_int32(*collection, value), field);
  977. else
  978. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  979. }
  980. }
  981. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  982. {
  983. UNSUPPORTED("UNSIGNED columns");
  984. }
  985. virtual void processReal(double value, const RtlFieldInfo * field)
  986. {
  987. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  988. {
  989. if (collection)
  990. checkBind(cass_collection_append_double(*collection, value), field);
  991. else
  992. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  993. }
  994. else
  995. {
  996. if (collection)
  997. checkBind(cass_collection_append_float(*collection, (float) value), field);
  998. else
  999. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  1000. }
  1001. }
  1002. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1003. {
  1004. Decimal val;
  1005. size32_t bytes;
  1006. rtlDataAttr decText;
  1007. val.setDecimal(digits, precision, value);
  1008. val.getStringX(bytes, decText.refstr());
  1009. processUtf8(bytes, decText.getstr(), field);
  1010. }
  1011. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1012. {
  1013. UNSUPPORTED("UNSIGNED decimals");
  1014. }
  1015. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  1016. {
  1017. size32_t utf8chars;
  1018. rtlDataAttr utfText;
  1019. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  1020. if (collection)
  1021. checkBind(cass_collection_append_string(*collection,
  1022. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1023. field);
  1024. else
  1025. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1026. checkNextParam(field),
  1027. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1028. field);
  1029. }
  1030. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1031. {
  1032. size32_t charCount;
  1033. rtlDataAttr text;
  1034. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1035. processUtf8(charCount, text.getstr(), field);
  1036. }
  1037. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  1038. {
  1039. if (collection)
  1040. checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  1041. else
  1042. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  1043. }
  1044. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1045. {
  1046. if (isAll)
  1047. UNSUPPORTED("SET(ALL)");
  1048. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  1049. return true;
  1050. }
  1051. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1052. {
  1053. // If there's a single field, assume we are mapping to a SET/LIST
  1054. // If there are two, assume it's a MAP
  1055. // Otherwise, fail
  1056. int numFields = getNumFields(field->type->queryChildType());
  1057. if (numFields < 1 || numFields > 2)
  1058. {
  1059. UNSUPPORTED("Nested datasets with > 2 fields");
  1060. }
  1061. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  1062. return true;
  1063. }
  1064. virtual bool processBeginRow(const RtlFieldInfo * field)
  1065. {
  1066. return true;
  1067. }
  1068. virtual void processEndSet(const RtlFieldInfo * field)
  1069. {
  1070. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1071. collection.clear();
  1072. }
  1073. virtual void processEndDataset(const RtlFieldInfo * field)
  1074. {
  1075. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1076. collection.clear();
  1077. }
  1078. virtual void processEndRow(const RtlFieldInfo * field)
  1079. {
  1080. }
  1081. protected:
  1082. inline unsigned checkNextParam(const RtlFieldInfo * field)
  1083. {
  1084. if (logctx.queryTraceLevel() > 4)
  1085. logctx.CTXLOG("Binding %s to %d", field->name->str(), thisParam);
  1086. return thisParam++;
  1087. }
  1088. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  1089. {
  1090. if (rc != CASS_OK)
  1091. {
  1092. failx("While binding parameter %s: %s", field->name->getAtomNamePtr(), cass_error_desc(rc));
  1093. }
  1094. }
  1095. const RtlTypeInfo *typeInfo;
  1096. const CassandraStatementInfo *stmtInfo;
  1097. Owned<CassandraCollection> collection;
  1098. const IContextLogger &logctx;
  1099. int firstParam;
  1100. RtlFieldStrInfo dummyField;
  1101. int thisParam;
  1102. };
  1103. //
  1104. class CassandraDatasetBinder : public CassandraRecordBinder
  1105. {
  1106. public:
  1107. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  1108. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  1109. {
  1110. }
  1111. bool bindNext()
  1112. {
  1113. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1114. if (!nextRow)
  1115. return false;
  1116. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1117. return true;
  1118. }
  1119. void executeAll(CassandraStatementInfo *stmtInfo)
  1120. {
  1121. stmtInfo->startStream();
  1122. while (bindNext())
  1123. {
  1124. stmtInfo->execute();
  1125. }
  1126. stmtInfo->endStream();
  1127. }
  1128. protected:
  1129. Owned<IRowStream> input;
  1130. };
  1131. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  1132. // interrogated to return each row of the result in turn
  1133. class CassandraRowStream : public CInterfaceOf<IRowStream>
  1134. {
  1135. public:
  1136. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1137. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1138. {
  1139. executePending = true;
  1140. eof = false;
  1141. }
  1142. virtual const void *nextRow()
  1143. {
  1144. // A little complex when streaming data in as well as out - want to execute for every input record
  1145. if (eof)
  1146. return NULL;
  1147. loop
  1148. {
  1149. if (executePending)
  1150. {
  1151. executePending = false;
  1152. if (inputStream && !inputStream->bindNext())
  1153. {
  1154. noteEOF();
  1155. return NULL;
  1156. }
  1157. stmtInfo->execute();
  1158. }
  1159. if (stmtInfo->next())
  1160. break;
  1161. if (inputStream)
  1162. executePending = true;
  1163. else
  1164. {
  1165. noteEOF();
  1166. return NULL;
  1167. }
  1168. }
  1169. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1170. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1171. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1172. assertex(typeInfo);
  1173. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1174. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1175. return rowBuilder.finalizeRowClear(len);
  1176. }
  1177. virtual void stop()
  1178. {
  1179. resultAllocator.clear();
  1180. stmtInfo->stop();
  1181. }
  1182. protected:
  1183. void noteEOF()
  1184. {
  1185. if (!eof)
  1186. {
  1187. eof = true;
  1188. stop();
  1189. }
  1190. }
  1191. Linked<CassandraDatasetBinder> inputStream;
  1192. Linked<CassandraStatementInfo> stmtInfo;
  1193. Linked<IEngineRowAllocator> resultAllocator;
  1194. bool executePending;
  1195. bool eof;
  1196. };
  1197. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1198. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1199. {
  1200. public:
  1201. CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
  1202. : logctx(_logctx), flags(_flags), nextParam(0), numParams(0)
  1203. {
  1204. StringArray opts;
  1205. opts.appendList(options, ",");
  1206. cluster.setown(new CassandraCluster(cass_cluster_new()));
  1207. cluster->setOptions(opts);
  1208. session.setown(new CassandraSession(cass_session_new()));
  1209. CassandraFuture future(cluster->keyspace.isEmpty() ? cass_session_connect(*session, *cluster) : cass_session_connect_keyspace(*session, *cluster, cluster->keyspace));
  1210. future.wait("connect");
  1211. }
  1212. virtual bool getBooleanResult()
  1213. {
  1214. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1215. checkSingleRow();
  1216. return ret;
  1217. }
  1218. virtual void getDataResult(size32_t &len, void * &result)
  1219. {
  1220. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1221. checkSingleRow();
  1222. }
  1223. virtual double getRealResult()
  1224. {
  1225. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1226. checkSingleRow();
  1227. return ret;
  1228. }
  1229. virtual __int64 getSignedResult()
  1230. {
  1231. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1232. checkSingleRow();
  1233. return ret;
  1234. }
  1235. virtual unsigned __int64 getUnsignedResult()
  1236. {
  1237. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1238. checkSingleRow();
  1239. return ret;
  1240. }
  1241. virtual void getStringResult(size32_t &chars, char * &result)
  1242. {
  1243. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1244. checkSingleRow();
  1245. }
  1246. virtual void getUTF8Result(size32_t &chars, char * &result)
  1247. {
  1248. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1249. checkSingleRow();
  1250. }
  1251. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1252. {
  1253. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1254. checkSingleRow();
  1255. }
  1256. virtual void getDecimalResult(Decimal &value)
  1257. {
  1258. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1259. checkSingleRow();
  1260. }
  1261. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1262. {
  1263. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1264. rtlRowBuilder out;
  1265. byte *outData = NULL;
  1266. size32_t outBytes = 0;
  1267. while (cass_iterator_next(iterator))
  1268. {
  1269. const CassValue *value = cass_iterator_get_value(iterator);
  1270. assertex(value);
  1271. if (elemSize != UNKNOWN_LENGTH)
  1272. {
  1273. out.ensureAvailable(outBytes + elemSize);
  1274. outData = out.getbytes() + outBytes;
  1275. }
  1276. switch ((type_t) elemType)
  1277. {
  1278. case type_int:
  1279. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1280. break;
  1281. case type_unsigned:
  1282. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1283. break;
  1284. case type_real:
  1285. if (elemSize == sizeof(double))
  1286. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1287. else
  1288. {
  1289. assertex(elemSize == sizeof(float));
  1290. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1291. }
  1292. break;
  1293. case type_boolean:
  1294. assertex(elemSize == sizeof(bool));
  1295. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1296. break;
  1297. case type_string:
  1298. case type_varstring:
  1299. {
  1300. rtlDataAttr str;
  1301. size32_t lenBytes;
  1302. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1303. if (elemSize == UNKNOWN_LENGTH)
  1304. {
  1305. if (elemType == type_string)
  1306. {
  1307. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1308. outData = out.getbytes() + outBytes;
  1309. * (size32_t *) outData = lenBytes;
  1310. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1311. outBytes += lenBytes + sizeof(size32_t);
  1312. }
  1313. else
  1314. {
  1315. out.ensureAvailable(outBytes + lenBytes + 1);
  1316. outData = out.getbytes() + outBytes;
  1317. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1318. outBytes += lenBytes + 1;
  1319. }
  1320. }
  1321. else
  1322. {
  1323. if (elemType == type_string)
  1324. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1325. else
  1326. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1327. }
  1328. break;
  1329. }
  1330. case type_unicode:
  1331. case type_utf8:
  1332. {
  1333. rtlDataAttr str;
  1334. size32_t lenChars;
  1335. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1336. const char * text = str.getstr();
  1337. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1338. if (elemType == type_utf8)
  1339. {
  1340. assertex (elemSize == UNKNOWN_LENGTH);
  1341. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1342. outData = out.getbytes() + outBytes;
  1343. * (size32_t *) outData = lenChars;
  1344. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1345. outBytes += lenBytes + sizeof(size32_t);
  1346. }
  1347. else
  1348. {
  1349. if (elemSize == UNKNOWN_LENGTH)
  1350. {
  1351. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1352. size32_t numchars16;
  1353. rtlDataAttr unicode16;
  1354. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1355. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1356. outData = out.getbytes() + outBytes;
  1357. * (size32_t *) outData = numchars16;
  1358. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1359. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1360. }
  1361. else
  1362. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1363. }
  1364. break;
  1365. }
  1366. default:
  1367. fail("type mismatch - unsupported return type");
  1368. }
  1369. if (elemSize != UNKNOWN_LENGTH)
  1370. outBytes += elemSize;
  1371. }
  1372. __isAllResult = false;
  1373. __resultBytes = outBytes;
  1374. __result = out.detachdata();
  1375. }
  1376. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1377. {
  1378. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1379. }
  1380. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1381. {
  1382. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1383. typeError("row", NULL, NULL);
  1384. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1385. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1386. stream.stop();
  1387. if (ret == NULL) // Check for exactly one returned row
  1388. typeError("row", NULL, NULL);
  1389. return (byte *) ret.getClear();
  1390. }
  1391. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1392. {
  1393. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1394. typeError("row", NULL, NULL);
  1395. if (!stmtInfo->next())
  1396. fail("Failed to read row");
  1397. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1398. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1399. assertex(typeInfo);
  1400. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1401. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1402. }
  1403. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1404. {
  1405. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1406. binder.processRow(val);
  1407. nextParam += binder.numFields();
  1408. }
  1409. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1410. {
  1411. // We only support a single dataset parameter...
  1412. // MORE - look into batch?
  1413. if (inputStream)
  1414. {
  1415. fail("At most one dataset parameter supported");
  1416. }
  1417. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1418. nextParam += inputStream->numFields();
  1419. }
  1420. virtual void bindBooleanParam(const char *name, bool val)
  1421. {
  1422. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1423. }
  1424. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1425. {
  1426. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), cass_bytes_init((const cass_byte_t*) val, len)), name);
  1427. }
  1428. virtual void bindFloatParam(const char *name, float val)
  1429. {
  1430. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1431. }
  1432. virtual void bindRealParam(const char *name, double val)
  1433. {
  1434. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1435. }
  1436. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1437. {
  1438. if (size > 4)
  1439. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1440. else
  1441. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1442. }
  1443. virtual void bindSignedParam(const char *name, __int64 val)
  1444. {
  1445. bindSignedSizeParam(name, 8, val);
  1446. }
  1447. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1448. {
  1449. UNSUPPORTED("UNSIGNED columns");
  1450. }
  1451. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1452. {
  1453. UNSUPPORTED("UNSIGNED columns");
  1454. }
  1455. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1456. {
  1457. size32_t utf8chars;
  1458. rtlDataAttr utfText;
  1459. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1460. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1461. checkNextParam(name),
  1462. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1463. name);
  1464. }
  1465. virtual void bindVStringParam(const char *name, const char *val)
  1466. {
  1467. bindStringParam(name, strlen(val), val);
  1468. }
  1469. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1470. {
  1471. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(name), cass_string_init2(val, rtlUtf8Size(chars, val))), name);
  1472. }
  1473. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1474. {
  1475. size32_t utf8chars;
  1476. rtlDataAttr utfText;
  1477. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1478. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1479. checkNextParam(name),
  1480. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1481. name);
  1482. }
  1483. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1484. {
  1485. if (isAll)
  1486. UNSUPPORTED("SET(ALL)");
  1487. type_t typecode = (type_t) elemType;
  1488. const byte *inData = (const byte *) setData;
  1489. const byte *endData = inData + totalBytes;
  1490. int numElems;
  1491. if (elemSize == UNKNOWN_LENGTH)
  1492. {
  1493. numElems = 0;
  1494. // Will need 2 passes to work out how many elements there are in the set :(
  1495. while (inData < endData)
  1496. {
  1497. int thisSize;
  1498. switch (elemType)
  1499. {
  1500. case type_varstring:
  1501. thisSize = strlen((const char *) inData) + 1;
  1502. break;
  1503. case type_string:
  1504. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1505. break;
  1506. case type_unicode:
  1507. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1508. break;
  1509. case type_utf8:
  1510. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1511. break;
  1512. default:
  1513. fail("Unsupported parameter type");
  1514. break;
  1515. }
  1516. inData += thisSize;
  1517. numElems++;
  1518. }
  1519. inData = (const byte *) setData;
  1520. }
  1521. else
  1522. numElems = totalBytes / elemSize;
  1523. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
  1524. while (inData < endData)
  1525. {
  1526. size32_t thisSize = elemSize;
  1527. CassError rc;
  1528. switch (typecode)
  1529. {
  1530. case type_int:
  1531. if (elemSize > 4)
  1532. rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
  1533. else
  1534. rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
  1535. break;
  1536. case type_unsigned:
  1537. UNSUPPORTED("UNSIGNED columns");
  1538. break;
  1539. case type_varstring:
  1540. {
  1541. size32_t numChars = strlen((const char *) inData);
  1542. if (elemSize == UNKNOWN_LENGTH)
  1543. thisSize = numChars + 1;
  1544. size32_t utf8chars;
  1545. rtlDataAttr utfText;
  1546. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1547. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1548. break;
  1549. }
  1550. case type_string:
  1551. {
  1552. if (elemSize == UNKNOWN_LENGTH)
  1553. {
  1554. thisSize = * (size32_t *) inData;
  1555. inData += sizeof(size32_t);
  1556. }
  1557. size32_t utf8chars;
  1558. rtlDataAttr utfText;
  1559. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1560. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1561. break;
  1562. }
  1563. case type_real:
  1564. if (elemSize == sizeof(double))
  1565. rc = cass_collection_append_double(collection, * (double *) inData);
  1566. else
  1567. rc = cass_collection_append_float(collection, * (float *) inData);
  1568. break;
  1569. case type_boolean:
  1570. assertex(elemSize == sizeof(bool));
  1571. rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
  1572. break;
  1573. case type_unicode:
  1574. {
  1575. if (elemSize == UNKNOWN_LENGTH)
  1576. {
  1577. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1578. inData += sizeof(size32_t);
  1579. }
  1580. unsigned unicodeChars;
  1581. rtlDataAttr unicode;
  1582. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1583. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1584. rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
  1585. break;
  1586. }
  1587. case type_utf8:
  1588. {
  1589. assertex (elemSize == UNKNOWN_LENGTH);
  1590. size32_t numChars = * (size32_t *) inData;
  1591. inData += sizeof(size32_t);
  1592. thisSize = rtlUtf8Size(numChars, inData);
  1593. rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
  1594. break;
  1595. }
  1596. case type_data:
  1597. if (elemSize == UNKNOWN_LENGTH)
  1598. {
  1599. thisSize = * (size32_t *) inData;
  1600. inData += sizeof(size32_t);
  1601. }
  1602. rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
  1603. break;
  1604. }
  1605. checkBind(rc, name);
  1606. inData += thisSize;
  1607. }
  1608. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1609. checkNextParam(name),
  1610. collection),
  1611. name);
  1612. }
  1613. virtual void importFunction(size32_t lenChars, const char *text)
  1614. {
  1615. throwUnexpected();
  1616. }
  1617. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1618. {
  1619. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1620. size32_t len = rtlUtf8Size(chars, _script);
  1621. queryString.set(_script, len);
  1622. const char *script = queryString.get(); // Now null terminated
  1623. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1624. {
  1625. loop
  1626. {
  1627. const char *nextScript = findUnquoted(script, ';');
  1628. if (!nextScript)
  1629. {
  1630. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1631. break;
  1632. }
  1633. CassandraStatement statement(cass_statement_new(cass_string_init2(script, nextScript-script), 0));
  1634. CassandraFuture future(cass_session_execute(*session, statement));
  1635. future.wait("execute statement");
  1636. script = nextScript;
  1637. }
  1638. }
  1639. else
  1640. {
  1641. // MORE - can cache this, perhaps, if script is same as last time?
  1642. CassandraFuture future(cass_session_prepare(*session, cass_string_init(script)));
  1643. future.wait("prepare statement");
  1644. Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
  1645. if ((flags & EFnoparams) == 0)
  1646. numParams = countBindings(script);
  1647. else
  1648. numParams = 0;
  1649. stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, cluster->batchMode));
  1650. }
  1651. }
  1652. virtual void callFunction()
  1653. {
  1654. // Does not seem to be a way to check number of parameters expected...
  1655. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1656. // fail("Not enough parameters");
  1657. try
  1658. {
  1659. if (stmtInfo && !stmtInfo->hasResult())
  1660. lazyExecute();
  1661. }
  1662. catch (IException *E)
  1663. {
  1664. StringBuffer msg;
  1665. E->errorMessage(msg);
  1666. msg.appendf(" (processing query %s)", queryString.get());
  1667. throw makeStringException(E->errorCode(), msg);
  1668. }
  1669. }
  1670. protected:
  1671. void lazyExecute()
  1672. {
  1673. if (inputStream)
  1674. inputStream->executeAll(stmtInfo);
  1675. else
  1676. stmtInfo->execute();
  1677. }
  1678. const CassValue *getScalarResult()
  1679. {
  1680. if (!stmtInfo->next())
  1681. typeError("scalar", NULL, NULL);
  1682. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1683. typeError("scalar", NULL, NULL);
  1684. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1685. if (!result)
  1686. typeError("scalar", NULL, NULL);
  1687. return result;
  1688. }
  1689. void checkSingleRow()
  1690. {
  1691. if (stmtInfo->rowCount() != 1)
  1692. typeError("scalar", NULL, NULL);
  1693. }
  1694. unsigned countBindings(const char *query)
  1695. {
  1696. unsigned queryCount = 0;
  1697. while ((query = findUnquoted(query, '?')) != NULL)
  1698. queryCount++;
  1699. return queryCount;
  1700. }
  1701. const char *findUnquoted(const char *query, char searchFor)
  1702. {
  1703. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1704. char inStr = '\0';
  1705. char ch;
  1706. while ((ch = *query++) != 0)
  1707. {
  1708. if (ch == inStr)
  1709. inStr = false;
  1710. else switch (ch)
  1711. {
  1712. case '\'':
  1713. case '"':
  1714. inStr = ch;
  1715. break;
  1716. case '\\':
  1717. if (inStr && *query)
  1718. query++;
  1719. break;
  1720. case '/':
  1721. if (!inStr)
  1722. {
  1723. if (*query=='/')
  1724. {
  1725. while (*query && *query != '\n')
  1726. query++;
  1727. }
  1728. else if (*query=='*')
  1729. {
  1730. query++;
  1731. loop
  1732. {
  1733. if (!*query)
  1734. fail("Unterminated comment in query string");
  1735. if (*query=='*' && query[1]=='/')
  1736. {
  1737. query+= 2;
  1738. break;
  1739. }
  1740. query++;
  1741. }
  1742. }
  1743. }
  1744. break;
  1745. default:
  1746. if (!inStr && ch==searchFor)
  1747. return query;
  1748. break;
  1749. }
  1750. }
  1751. return NULL;
  1752. }
  1753. inline unsigned checkNextParam(const char *name)
  1754. {
  1755. if (nextParam == numParams)
  1756. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1757. return nextParam++;
  1758. }
  1759. inline void checkBind(CassError rc, const char *name)
  1760. {
  1761. if (rc != CASS_OK)
  1762. {
  1763. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1764. }
  1765. }
  1766. Owned<CassandraCluster> cluster;
  1767. Owned<CassandraSession> session;
  1768. Owned<CassandraStatementInfo> stmtInfo;
  1769. Owned<CassandraDatasetBinder> inputStream;
  1770. const IContextLogger &logctx;
  1771. unsigned flags;
  1772. unsigned nextParam;
  1773. unsigned numParams;
  1774. StringAttr queryString;
  1775. };
  1776. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1777. {
  1778. public:
  1779. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1780. {
  1781. return createFunctionContextEx(NULL, flags, options);
  1782. }
  1783. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1784. {
  1785. if (flags & EFimport)
  1786. UNSUPPORTED("IMPORT");
  1787. else
  1788. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
  1789. }
  1790. };
  1791. extern IEmbedContext* getEmbedContext()
  1792. {
  1793. return new CassandraEmbedContext();
  1794. }
  1795. extern bool syntaxCheck(const char *script)
  1796. {
  1797. return true; // MORE
  1798. }
  1799. //--------------------------------------------
  1800. #define ATTRIBUTES_NAME "attributes"
  1801. void addElement(IPTree *parent, const char *name, const CassValue *value)
  1802. {
  1803. switch (cass_value_type(value))
  1804. {
  1805. case CASS_VALUE_TYPE_UNKNOWN:
  1806. // It's a NULL - ignore it (or we could add empty element...)
  1807. break;
  1808. case CASS_VALUE_TYPE_ASCII:
  1809. case CASS_VALUE_TYPE_TEXT:
  1810. case CASS_VALUE_TYPE_VARCHAR:
  1811. {
  1812. rtlDataAttr str;
  1813. unsigned chars;
  1814. getUTF8Result(NULL, value, chars, str.refstr());
  1815. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  1816. parent->addProp(name, s);
  1817. break;
  1818. }
  1819. case CASS_VALUE_TYPE_INT:
  1820. case CASS_VALUE_TYPE_BIGINT:
  1821. case CASS_VALUE_TYPE_VARINT:
  1822. parent->addPropInt64(name, getSignedResult(NULL, value));
  1823. break;
  1824. case CASS_VALUE_TYPE_BLOB:
  1825. {
  1826. rtlDataAttr data;
  1827. unsigned bytes;
  1828. getDataResult(NULL, value, bytes, data.refdata());
  1829. parent->addPropBin(name, bytes, data.getbytes());
  1830. break;
  1831. }
  1832. case CASS_VALUE_TYPE_BOOLEAN:
  1833. parent->addPropBool(name, getBooleanResult(NULL, value));
  1834. break;
  1835. case CASS_VALUE_TYPE_DOUBLE:
  1836. case CASS_VALUE_TYPE_FLOAT:
  1837. {
  1838. double v = getRealResult(NULL, value);
  1839. StringBuffer s;
  1840. s.append(v);
  1841. parent->addProp(name, s);
  1842. break;
  1843. }
  1844. case CASS_VALUE_TYPE_LIST:
  1845. case CASS_VALUE_TYPE_SET:
  1846. {
  1847. CassandraIterator elems(cass_iterator_from_collection(value));
  1848. Owned<IPTree> list = createPTree(name);
  1849. while (cass_iterator_next(elems))
  1850. addElement(list, "item", cass_iterator_get_value(elems));
  1851. parent->addPropTree(name, list.getClear());
  1852. break;
  1853. }
  1854. case CASS_VALUE_TYPE_MAP:
  1855. {
  1856. CassandraIterator elems(cass_iterator_from_map(value));
  1857. if (strcmp(name, ATTRIBUTES_NAME)==0 && isString(cass_value_primary_sub_type(value)))
  1858. {
  1859. while (cass_iterator_next(elems))
  1860. {
  1861. rtlDataAttr str;
  1862. unsigned chars;
  1863. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1864. StringBuffer s("@");
  1865. s.append(chars, str.getstr());
  1866. addElement(parent, s, cass_iterator_get_map_value(elems));
  1867. }
  1868. }
  1869. else
  1870. {
  1871. Owned<IPTree> map = createPTree(name);
  1872. while (cass_iterator_next(elems))
  1873. {
  1874. if (isString(cass_value_primary_sub_type(value)))
  1875. {
  1876. rtlDataAttr str;
  1877. unsigned chars;
  1878. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1879. StringAttr s(str.getstr(), chars);
  1880. addElement(map, s, cass_iterator_get_map_value(elems));
  1881. }
  1882. else
  1883. {
  1884. Owned<IPTree> mapping = createPTree("mapping");
  1885. addElement(mapping, "key", cass_iterator_get_map_key(elems));
  1886. addElement(mapping, "value", cass_iterator_get_map_value(elems));
  1887. map->addPropTree("mapping", mapping.getClear());
  1888. }
  1889. }
  1890. parent->addPropTree(name, map.getClear());
  1891. }
  1892. break;
  1893. }
  1894. default:
  1895. DBGLOG("Column type %d not supported", cass_value_type(value));
  1896. UNSUPPORTED("Column type");
  1897. }
  1898. }
  1899. void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const char *name, CassValueType type)
  1900. {
  1901. if (parent->hasProp(name) || strcmp(name, ATTRIBUTES_NAME)==0)
  1902. {
  1903. switch (type)
  1904. {
  1905. case CASS_VALUE_TYPE_ASCII:
  1906. case CASS_VALUE_TYPE_TEXT:
  1907. case CASS_VALUE_TYPE_VARCHAR:
  1908. {
  1909. const char *value = parent->queryProp(name);
  1910. if (value)
  1911. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  1912. break;
  1913. }
  1914. case CASS_VALUE_TYPE_INT:
  1915. check(cass_statement_bind_int32(statement, idx, parent->getPropInt(name)));
  1916. break;
  1917. case CASS_VALUE_TYPE_BIGINT:
  1918. case CASS_VALUE_TYPE_VARINT:
  1919. check(cass_statement_bind_int64(statement, idx, parent->getPropInt64(name)));
  1920. break;
  1921. case CASS_VALUE_TYPE_BLOB:
  1922. {
  1923. MemoryBuffer buf;
  1924. parent->getPropBin(name, buf);
  1925. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t*)buf.toByteArray(), buf.length())));
  1926. break;
  1927. }
  1928. case CASS_VALUE_TYPE_BOOLEAN:
  1929. check(cass_statement_bind_bool(statement, idx, (cass_bool_t) parent->getPropBool(name)));
  1930. break;
  1931. case CASS_VALUE_TYPE_DOUBLE:
  1932. check(cass_statement_bind_double(statement, idx, atof(parent->queryProp(name))));
  1933. break;
  1934. case CASS_VALUE_TYPE_FLOAT:
  1935. check(cass_statement_bind_float(statement, idx, atof(parent->queryProp(name))));
  1936. break;
  1937. case CASS_VALUE_TYPE_LIST:
  1938. case CASS_VALUE_TYPE_SET:
  1939. {
  1940. Owned<IPTree> child = parent->getPropTree(name);
  1941. unsigned numItems = child->getCount("item");
  1942. if (numItems)
  1943. {
  1944. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numItems));
  1945. Owned<IPTreeIterator> items = child->getElements("item");
  1946. ForEach(*items)
  1947. {
  1948. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  1949. if (strcmp(name, "list1")==0)
  1950. check(cass_collection_append_int32(collection, items->query().getPropInt(NULL)));
  1951. else
  1952. check(cass_collection_append_string(collection, cass_string_init(items->query().queryProp(NULL))));
  1953. }
  1954. check(cass_statement_bind_collection(statement, idx, collection));
  1955. }
  1956. break;
  1957. }
  1958. case CASS_VALUE_TYPE_MAP:
  1959. {
  1960. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  1961. if (strcmp(name, ATTRIBUTES_NAME)==0)
  1962. {
  1963. Owned<IAttributeIterator> attrs = parent->getAttributes();
  1964. unsigned numItems = attrs->count();
  1965. ForEach(*attrs)
  1966. {
  1967. numItems++;
  1968. }
  1969. if (numItems)
  1970. {
  1971. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  1972. ForEach(*attrs)
  1973. {
  1974. const char *key = attrs->queryName();
  1975. const char *value = attrs->queryValue();
  1976. check(cass_collection_append_string(collection, cass_string_init(key+1))); // skip the @
  1977. check(cass_collection_append_string(collection, cass_string_init(value)));
  1978. }
  1979. check(cass_statement_bind_collection(statement, idx, collection));
  1980. }
  1981. }
  1982. else
  1983. {
  1984. Owned<IPTree> child = parent->getPropTree(name);
  1985. unsigned numItems = child->numChildren();
  1986. // MORE - if the cassandra driver objects to there being fewer than numItems supplied, we may need to recode using a second pass.
  1987. if (numItems)
  1988. {
  1989. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  1990. Owned<IPTreeIterator> items = child->getElements("*");
  1991. ForEach(*items)
  1992. {
  1993. IPTree &item = items->query();
  1994. const char *key = item.queryName();
  1995. const char *value = item.queryProp(NULL);
  1996. if (key && value)
  1997. {
  1998. check(cass_collection_append_string(collection, cass_string_init(key)));
  1999. check(cass_collection_append_string(collection, cass_string_init(value)));
  2000. }
  2001. }
  2002. check(cass_statement_bind_collection(statement, idx, collection));
  2003. }
  2004. }
  2005. break;
  2006. }
  2007. default:
  2008. DBGLOG("Column type %d not supported", type);
  2009. UNSUPPORTED("Column type");
  2010. }
  2011. }
  2012. }
  2013. extern void cassandraToGenericXML()
  2014. {
  2015. CassandraCluster cluster(cass_cluster_new());
  2016. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2017. CassandraSession session(cass_session_new());
  2018. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "test"));
  2019. future.wait("connect");
  2020. CassandraStatement statement(cass_statement_new(cass_string_init("select * from tbl1 where name = 'name1';"), 0));
  2021. CassandraFuture future2(cass_session_execute(session, statement));
  2022. future2.wait("execute");
  2023. CassandraResult result(cass_future_get_result(future2));
  2024. StringArray names;
  2025. UnsignedArray types;
  2026. for (int i = 0; i < cass_result_column_count(result); i++)
  2027. {
  2028. CassString column = cass_result_column_name(result, i);
  2029. StringBuffer name(column.length, column.data);
  2030. names.append(name);
  2031. types.append(cass_result_column_type(result, i));
  2032. }
  2033. // Now fetch the rows
  2034. Owned<IPTree> xml = createPTree("tbl1");
  2035. CassandraIterator rows(cass_iterator_from_result(result));
  2036. while (cass_iterator_next(rows))
  2037. {
  2038. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2039. Owned<IPTree> row = createPTree("row");
  2040. unsigned colidx = 0;
  2041. while (cass_iterator_next(cols))
  2042. {
  2043. const CassValue *value = cass_iterator_get_column(cols);
  2044. const char *name = names.item(colidx);
  2045. addElement(row, name, value);
  2046. colidx++;
  2047. }
  2048. xml->addPropTree("row", row.getClear());
  2049. }
  2050. xml->setProp("row[1]/name", "newname");
  2051. StringBuffer buf;
  2052. toXML(xml, buf);
  2053. DBGLOG("%s", buf.str());
  2054. // Now try going the other way...
  2055. // For this we need to know the expected names (can fetch them from system table) and types (ditto, potentially, though a dummy select may be easier)
  2056. StringBuffer colNames;
  2057. StringBuffer values;
  2058. ForEachItemIn(idx, names)
  2059. {
  2060. colNames.append(",").append(names.item(idx));
  2061. values.append(",?");
  2062. }
  2063. VStringBuffer insertQuery("INSERT into tbl1 (%s) values (%s);", colNames.str()+1, values.str()+1);
  2064. Owned<IPTreeIterator> xmlRows = xml->getElements("row");
  2065. ForEach(*xmlRows)
  2066. {
  2067. IPropertyTree *xmlrow = &xmlRows->query();
  2068. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), names.length()));
  2069. ForEachItemIn(idx, names)
  2070. {
  2071. bindElement(update, xmlrow, idx, names.item(idx), (CassValueType) types.item(idx));
  2072. }
  2073. // MORE - use a batch
  2074. CassandraFuture future3(cass_session_execute(session, update));
  2075. future2.wait("insert");
  2076. }
  2077. }
  2078. //--------------------------------------------
  2079. interface ICassandraSession
  2080. {
  2081. virtual CassSession *querySession() const = 0;
  2082. virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
  2083. virtual unsigned queryTraceLevel() const = 0;
  2084. };
  2085. struct CassandraColumnMapper
  2086. {
  2087. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  2088. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal = 0) = 0;
  2089. };
  2090. class StringColumnMapper : implements CassandraColumnMapper
  2091. {
  2092. public:
  2093. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2094. {
  2095. rtlDataAttr str;
  2096. unsigned chars;
  2097. getUTF8Result(NULL, value, chars, str.refstr());
  2098. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2099. row->setProp(name, s);
  2100. return row;
  2101. }
  2102. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2103. {
  2104. const char *value = row->queryProp(name);
  2105. if (!value)
  2106. return false;
  2107. if (statement)
  2108. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2109. return true;
  2110. }
  2111. } stringColumnMapper;
  2112. class RequiredStringColumnMapper : public StringColumnMapper
  2113. {
  2114. public:
  2115. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2116. {
  2117. const char *value = row->queryProp(name);
  2118. if (!value)
  2119. value = "";
  2120. if (statement)
  2121. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2122. return true;
  2123. }
  2124. } requiredStringColumnMapper;
  2125. class BlobColumnMapper : implements CassandraColumnMapper
  2126. {
  2127. public:
  2128. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2129. {
  2130. rtlDataAttr str;
  2131. unsigned chars;
  2132. getDataResult(NULL, value, chars, str.refdata());
  2133. row->setPropBin(name, chars, str.getbytes());
  2134. return row;
  2135. }
  2136. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2137. {
  2138. MemoryBuffer value;
  2139. row->getPropBin(name, value);
  2140. if (value.length())
  2141. {
  2142. if (statement)
  2143. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.toByteArray(), value.length())));
  2144. return true;
  2145. }
  2146. else
  2147. return false;
  2148. }
  2149. } blobColumnMapper;
  2150. class TimeStampColumnMapper : implements CassandraColumnMapper
  2151. {
  2152. public:
  2153. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2154. {
  2155. // never fetched (that may change?)
  2156. return row;
  2157. }
  2158. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2159. {
  2160. // never bound
  2161. return false;
  2162. }
  2163. } timestampColumnMapper;
  2164. class RootNameColumnMapper : implements CassandraColumnMapper
  2165. {
  2166. public:
  2167. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2168. {
  2169. rtlDataAttr str;
  2170. unsigned chars;
  2171. getUTF8Result(NULL, value, chars, str.refstr());
  2172. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2173. row->renameProp("/", s);
  2174. return row;
  2175. }
  2176. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2177. {
  2178. if (statement)
  2179. {
  2180. const char *value = row->queryName();
  2181. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2182. }
  2183. return true;
  2184. }
  2185. } rootNameColumnMapper;
  2186. // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
  2187. // is an error to try to map such a column to/from the XML representation
  2188. class WuidColumnMapper : implements CassandraColumnMapper
  2189. {
  2190. public:
  2191. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2192. {
  2193. throwUnexpected();
  2194. }
  2195. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2196. {
  2197. throwUnexpected();
  2198. }
  2199. } wuidColumnMapper;
  2200. class GraphIdColumnMapper : implements CassandraColumnMapper
  2201. {
  2202. public:
  2203. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2204. {
  2205. rtlDataAttr str;
  2206. unsigned chars;
  2207. getUTF8Result(NULL, value, chars, str.refstr());
  2208. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2209. if (strcmp(s, "Running")==0) // The input XML structure is a little odd
  2210. return row;
  2211. else
  2212. {
  2213. if (!row->hasProp(s))
  2214. row->addPropTree(s, createPTree());
  2215. return row->queryPropTree(s);
  2216. }
  2217. }
  2218. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2219. {
  2220. const char *value = row->queryName();
  2221. if (!value)
  2222. return false;
  2223. if (statement)
  2224. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2225. return true;
  2226. }
  2227. } graphIdColumnMapper;
  2228. class ProgressColumnMapper : implements CassandraColumnMapper
  2229. {
  2230. public:
  2231. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2232. {
  2233. rtlDataAttr str;
  2234. unsigned chars;
  2235. getDataResult(NULL, value, chars, str.refdata()); // Stored as a blob in case we want to compress
  2236. IPTree *child = createPTreeFromXMLString(chars, str.getstr()); // For now, assume we did not compress!
  2237. row->addPropTree(child->queryName(), child);
  2238. return child;
  2239. }
  2240. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2241. {
  2242. // MORE - may need to read, and probably should write, compressed.
  2243. StringBuffer value;
  2244. ::toXML(row, value, 0, 0);
  2245. if (value.length())
  2246. {
  2247. if (statement)
  2248. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.str(), value.length())));
  2249. return true;
  2250. }
  2251. else
  2252. return false;
  2253. }
  2254. } progressColumnMapper;
  2255. class BoolColumnMapper : implements CassandraColumnMapper
  2256. {
  2257. public:
  2258. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2259. {
  2260. row->addPropBool(name, getBooleanResult(NULL, value));
  2261. return row;
  2262. }
  2263. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2264. {
  2265. if (row->hasProp(name))
  2266. {
  2267. if (statement)
  2268. {
  2269. bool value = row->getPropBool(name, false);
  2270. check(cass_statement_bind_bool(statement, idx, value ? cass_true : cass_false));
  2271. }
  2272. return true;
  2273. }
  2274. else
  2275. return false;
  2276. }
  2277. } boolColumnMapper;
  2278. class IntColumnMapper : implements CassandraColumnMapper
  2279. {
  2280. public:
  2281. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2282. {
  2283. row->addPropInt(name, getSignedResult(NULL, value));
  2284. return row;
  2285. }
  2286. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2287. {
  2288. if (row->hasProp(name))
  2289. {
  2290. if (statement)
  2291. {
  2292. int value = row->getPropInt(name);
  2293. check(cass_statement_bind_int32(statement, idx, value));
  2294. }
  2295. return true;
  2296. }
  2297. else
  2298. return false;
  2299. }
  2300. } intColumnMapper;
  2301. class DefaultedIntColumnMapper : public IntColumnMapper
  2302. {
  2303. public:
  2304. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int defaultValue)
  2305. {
  2306. if (statement)
  2307. {
  2308. int value = row->getPropInt(name, defaultValue);
  2309. check(cass_statement_bind_int32(statement, idx, value));
  2310. }
  2311. return true;
  2312. }
  2313. } defaultedIntColumnMapper;
  2314. class BigIntColumnMapper : implements CassandraColumnMapper
  2315. {
  2316. public:
  2317. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2318. {
  2319. row->addPropInt64(name, getSignedResult(NULL, value));
  2320. return row;
  2321. }
  2322. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2323. {
  2324. if (row->hasProp(name))
  2325. {
  2326. if (statement)
  2327. {
  2328. __int64 value = row->getPropInt64(name);
  2329. check(cass_statement_bind_int64(statement, idx, value));
  2330. }
  2331. return true;
  2332. }
  2333. else
  2334. return false;
  2335. }
  2336. } bigintColumnMapper;
  2337. class SubgraphIdColumnMapper : implements CassandraColumnMapper
  2338. {
  2339. public:
  2340. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2341. {
  2342. __int64 id = getSignedResult(NULL, value);
  2343. if (id)
  2344. row->addPropInt64(name, id);
  2345. return row;
  2346. }
  2347. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2348. {
  2349. if (statement)
  2350. {
  2351. int value = row->getPropInt(name);
  2352. check(cass_statement_bind_int64(statement, idx, value));
  2353. }
  2354. return true;
  2355. }
  2356. } subgraphIdColumnMapper;
  2357. class SimpleMapColumnMapper : implements CassandraColumnMapper
  2358. {
  2359. public:
  2360. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2361. {
  2362. Owned<IPTree> map = createPTree(name);
  2363. CassandraIterator elems(cass_iterator_from_map(value));
  2364. while (cass_iterator_next(elems))
  2365. {
  2366. rtlDataAttr str;
  2367. unsigned chars;
  2368. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2369. StringAttr s(str.getstr(), chars);
  2370. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  2371. }
  2372. row->addPropTree(name, map.getClear());
  2373. return row;
  2374. }
  2375. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2376. {
  2377. Owned<IPTree> child = row->getPropTree(name);
  2378. if (child)
  2379. {
  2380. unsigned numItems = child->numChildren();
  2381. if (numItems)
  2382. {
  2383. if (statement)
  2384. {
  2385. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2386. Owned<IPTreeIterator> items = child->getElements("*");
  2387. ForEach(*items)
  2388. {
  2389. IPTree &item = items->query();
  2390. const char *key = item.queryName();
  2391. const char *value = item.queryProp(NULL);
  2392. if (key && value)
  2393. {
  2394. check(cass_collection_append_string(collection, cass_string_init(key)));
  2395. check(cass_collection_append_string(collection, cass_string_init(value)));
  2396. }
  2397. }
  2398. check(cass_statement_bind_collection(statement, idx, collection));
  2399. }
  2400. return true;
  2401. }
  2402. }
  2403. return false;
  2404. }
  2405. } simpleMapColumnMapper;
  2406. class AttributeMapColumnMapper : implements CassandraColumnMapper
  2407. {
  2408. public:
  2409. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2410. {
  2411. CassandraIterator elems(cass_iterator_from_map(value));
  2412. while (cass_iterator_next(elems))
  2413. {
  2414. rtlDataAttr str;
  2415. unsigned chars;
  2416. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2417. StringBuffer s("@");
  2418. s.append(chars, str.getstr());
  2419. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  2420. }
  2421. return row;
  2422. }
  2423. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2424. {
  2425. // NOTE - name here provides a list of attributes that we should NOT be mapping
  2426. Owned<IAttributeIterator> attrs = row->getAttributes();
  2427. unsigned numItems = attrs->count();
  2428. ForEach(*attrs)
  2429. {
  2430. const char *key = attrs->queryName();
  2431. if (strstr(name, key) == NULL) // MORE - should really check that the following char is a @
  2432. numItems++;
  2433. }
  2434. if (numItems)
  2435. {
  2436. if (statement)
  2437. {
  2438. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2439. ForEach(*attrs)
  2440. {
  2441. const char *key = attrs->queryName();
  2442. if (strstr(name, key) == NULL) // MORE - should really check that the following char is a @
  2443. {
  2444. const char *value = attrs->queryValue();
  2445. check(cass_collection_append_string(collection, cass_string_init(key+1))); // skip the @
  2446. check(cass_collection_append_string(collection, cass_string_init(value)));
  2447. }
  2448. }
  2449. check(cass_statement_bind_collection(statement, idx, collection));
  2450. }
  2451. return true;
  2452. }
  2453. else
  2454. return false;
  2455. }
  2456. } attributeMapColumnMapper;
  2457. class GraphMapColumnMapper : implements CassandraColumnMapper
  2458. {
  2459. public:
  2460. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2461. : elemName(_elemName), nameAttr(_nameAttr)
  2462. {
  2463. }
  2464. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2465. {
  2466. Owned<IPTree> map = createPTree(name);
  2467. CassandraIterator elems(cass_iterator_from_map(value));
  2468. while (cass_iterator_next(elems))
  2469. {
  2470. rtlDataAttr str;
  2471. unsigned chars;
  2472. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  2473. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  2474. map->addPropTree(elemName, child.getClear());
  2475. }
  2476. row->addPropTree(name, map.getClear());
  2477. return row;
  2478. }
  2479. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2480. {
  2481. Owned<IPTree> child = row->getPropTree(name);
  2482. if (child)
  2483. {
  2484. unsigned numItems = child->numChildren();
  2485. if (numItems)
  2486. {
  2487. if (statement)
  2488. {
  2489. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2490. Owned<IPTreeIterator> items = child->getElements("*");
  2491. ForEach(*items)
  2492. {
  2493. IPTree &item = items->query();
  2494. const char *key = item.queryProp(nameAttr);
  2495. // MORE - may need to read, and probably should write, compressed. At least for graphs
  2496. StringBuffer value;
  2497. ::toXML(&item, value, 0, 0);
  2498. if (key && value.length())
  2499. {
  2500. check(cass_collection_append_string(collection, cass_string_init(key)));
  2501. check(cass_collection_append_string(collection, cass_string_init(value)));
  2502. }
  2503. }
  2504. check(cass_statement_bind_collection(statement, idx, collection));
  2505. }
  2506. return true;
  2507. }
  2508. }
  2509. return false;
  2510. }
  2511. private:
  2512. const char *elemName;
  2513. const char *nameAttr;
  2514. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid");
  2515. class AssociationsMapColumnMapper : public GraphMapColumnMapper
  2516. {
  2517. public:
  2518. AssociationsMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2519. : GraphMapColumnMapper(_elemName, _nameAttr)
  2520. {
  2521. }
  2522. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2523. {
  2524. // Name is "Query/Associated ...
  2525. IPTree *query = row->queryPropTree("Query");
  2526. if (!query)
  2527. {
  2528. query = createPTree("Query");
  2529. row->setPropTree("Query", query);
  2530. }
  2531. return GraphMapColumnMapper::toXML(query, "Associated", value);
  2532. }
  2533. } associationsMapColumnMapper("File", "@filename");
  2534. class PluginListColumnMapper : implements CassandraColumnMapper
  2535. {
  2536. public:
  2537. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  2538. : elemName(_elemName), nameAttr(_nameAttr)
  2539. {
  2540. }
  2541. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2542. {
  2543. Owned<IPTree> map = createPTree(name);
  2544. CassandraIterator elems(cass_iterator_from_collection(value));
  2545. while (cass_iterator_next(elems))
  2546. {
  2547. Owned<IPTree> child = createPTree(elemName);
  2548. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  2549. map->addPropTree(elemName, child.getClear());
  2550. }
  2551. row->addPropTree(name, map.getClear());
  2552. return row;
  2553. }
  2554. virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2555. {
  2556. Owned<IPTree> child = row->getPropTree(name);
  2557. if (child)
  2558. {
  2559. unsigned numItems = child->numChildren();
  2560. if (numItems)
  2561. {
  2562. if (statement)
  2563. {
  2564. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  2565. Owned<IPTreeIterator> items = child->getElements("*");
  2566. ForEach(*items)
  2567. {
  2568. IPTree &item = items->query();
  2569. const char *value = item.queryProp(nameAttr);
  2570. if (value)
  2571. check(cass_collection_append_string(collection, cass_string_init(value)));
  2572. }
  2573. check(cass_statement_bind_collection(statement, idx, collection));
  2574. }
  2575. return true;
  2576. }
  2577. }
  2578. return false;
  2579. }
  2580. private:
  2581. const char *elemName;
  2582. const char *nameAttr;
  2583. } pluginListColumnMapper("Plugin", "@dllname");
  2584. struct CassandraXmlMapping
  2585. {
  2586. const char *columnName;
  2587. const char *columnType;
  2588. const char *xpath;
  2589. CassandraColumnMapper &mapper;
  2590. };
  2591. const CassandraXmlMapping wuExceptionsMappings [] =
  2592. {
  2593. {"wuid", "text", NULL, wuidColumnMapper},
  2594. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  2595. {"value", "text", ".", stringColumnMapper},
  2596. {"ts", "timeuuid", NULL, timestampColumnMapper}, // must be last since we don't bind it, so it would throw out the colidx values of following fields
  2597. { NULL, "wuExceptions", "((wuid), ts)", stringColumnMapper}
  2598. };
  2599. const CassandraXmlMapping wuStatisticsMappings [] =
  2600. {
  2601. {"wuid", "text", NULL, wuidColumnMapper},
  2602. {"kind", "text", "@kind", stringColumnMapper},
  2603. {"attributes", "map<text, text>", "@kind", attributeMapColumnMapper},
  2604. { NULL, "wuStatistics", "((wuid), kind)", stringColumnMapper}
  2605. };
  2606. const CassandraXmlMapping workunitsMappings [] =
  2607. {
  2608. {"wuid", "text", NULL, rootNameColumnMapper},
  2609. {"clustername", "text", "@clusterName", stringColumnMapper},
  2610. {"jobname", "text", "@jobName", stringColumnMapper},
  2611. {"priorityclass", "int", "@priorityClass", intColumnMapper},
  2612. {"protected", "boolean", "@protected", boolColumnMapper},
  2613. {"scope", "text", "@scope", stringColumnMapper},
  2614. {"submitID", "text", "@submitID", stringColumnMapper},
  2615. {"state", "text", "@state", stringColumnMapper},
  2616. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  2617. {"attributes", "map<text, text>", "@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state", attributeMapColumnMapper}, // name is the suppression list
  2618. {"graphs", "map<text, text>", "Graphs", graphMapColumnMapper},
  2619. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  2620. {"query", "text", "Query/Text", stringColumnMapper}, // This probably wants to be a child just because of size? Or at least needs to be lazy...
  2621. {"associations", "map<text, text>", "Query/Associated", associationsMapColumnMapper},
  2622. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  2623. { NULL, "workunits", "((wuid))", stringColumnMapper}
  2624. };
  2625. const CassandraXmlMapping ownerMappings [] =
  2626. {
  2627. {"submitID", "text", "@submitID", stringColumnMapper},
  2628. {"wuid", "text", NULL, rootNameColumnMapper},
  2629. {"clustername", "text", "@clusterName", stringColumnMapper},
  2630. {"jobname", "text", "@jobName", stringColumnMapper},
  2631. {"priorityclass", "int", "@priorityClass", intColumnMapper},
  2632. {"protected", "boolean", "@protected", boolColumnMapper},
  2633. {"scope", "text", "@scope", stringColumnMapper},
  2634. {"state", "text", "@state", stringColumnMapper},
  2635. { NULL, "workunitsByOwner", "((submitID), wuid)", stringColumnMapper}
  2636. };
  2637. const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
  2638. {
  2639. {"wuid", "text", NULL, rootNameColumnMapper},
  2640. {"clustername", "text", "@clusterName", stringColumnMapper},
  2641. {"jobname", "text", "@jobName", stringColumnMapper},
  2642. {"priorityclass", "int", "@priorityClass", intColumnMapper},
  2643. {"protected", "boolean", "@protected", boolColumnMapper},
  2644. {"scope", "text", "@scope", stringColumnMapper},
  2645. {"submitID", "text", "@submitID", stringColumnMapper},
  2646. {"state", "text", "@state", stringColumnMapper},
  2647. { NULL, "workunits", "((wuid))", stringColumnMapper}
  2648. };
  2649. const CassandraXmlMapping graphProgressMappings [] =
  2650. {
  2651. {"wuid", "text", NULL, wuidColumnMapper},
  2652. {"graphID", "text", NULL, graphIdColumnMapper},
  2653. {"progress", "blob", NULL, progressColumnMapper}, // NOTE - order of these is significant - this creates the subtree that ones below will modify
  2654. {"subgraphID", "text", "@id", subgraphIdColumnMapper},
  2655. {"state", "int", "@_state", intColumnMapper},
  2656. { NULL, "graphprogress", "((wuid), graphid, subgraphid)", stringColumnMapper}
  2657. };
  2658. const CassandraXmlMapping wuResultsMappings [] =
  2659. {
  2660. {"wuid", "text", NULL, wuidColumnMapper},
  2661. {"sequence", "int", "@sequence", intColumnMapper},
  2662. {"name", "text", "@name", stringColumnMapper},
  2663. {"format", "text", "@format", stringColumnMapper}, // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
  2664. {"status", "text", "@status", stringColumnMapper},
  2665. {"rowcount", "int", "rowCount", intColumnMapper}, // This is the number of rows in result (which may be stored in a file rather than in value)
  2666. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},// This is the number of rows in value
  2667. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
  2668. {"logicalName", "text", "logicalName", stringColumnMapper}, // either this or value will be present once result status is "calculated"
  2669. {"value", "blob", "Value", blobColumnMapper},
  2670. { NULL, "wuResults", "((wuid), sequence)", stringColumnMapper}
  2671. };
  2672. // This looks very similar to the above, but the key is different...
  2673. const CassandraXmlMapping wuVariablesMappings [] =
  2674. {
  2675. {"wuid", "text", NULL, wuidColumnMapper},
  2676. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, // Note - should be either variable or temporary...
  2677. {"name", "text", "@name", requiredStringColumnMapper},
  2678. {"format", "text", "@format", stringColumnMapper}, // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
  2679. {"status", "text", "@status", stringColumnMapper},
  2680. {"rowcount", "int", "rowCount", intColumnMapper}, // This is the number of rows in result (which may be stored in a file rather than in value)
  2681. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},// This is the number of rows in value
  2682. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
  2683. {"logicalName", "text", "logicalName", stringColumnMapper}, // either this or value will be present once result status is "calculated"
  2684. {"value", "blob", "Value", blobColumnMapper},
  2685. { NULL, "wuVariables", "((wuid), sequence, name)", stringColumnMapper}
  2686. };
  2687. void getBoundFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, StringBuffer &tableName)
  2688. {
  2689. while (mappings->columnName)
  2690. {
  2691. if (mappings->mapper.fromXML(NULL, 0, inXML, mappings->xpath))
  2692. {
  2693. names.appendf(",%s", mappings->columnName);
  2694. if (strcmp(mappings->columnType, "timeuuid")==0)
  2695. bindings.appendf(",now()");
  2696. else
  2697. bindings.appendf(",?");
  2698. }
  2699. mappings++;
  2700. }
  2701. tableName.append(mappings->columnType);
  2702. }
  2703. void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
  2704. {
  2705. while (mappings->columnName)
  2706. {
  2707. names.appendf(",%s", mappings->columnName);
  2708. mappings++;
  2709. }
  2710. tableName.append(mappings->columnType);
  2711. }
  2712. const char *queryTableName(const CassandraXmlMapping *mappings)
  2713. {
  2714. while (mappings->columnName)
  2715. mappings++;
  2716. return mappings->columnType;
  2717. }
  2718. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  2719. {
  2720. StringBuffer fields;
  2721. while (mappings->columnName)
  2722. {
  2723. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  2724. mappings++;
  2725. }
  2726. return out.appendf("CREATE TABLE IF NOT EXISTS HPCC.%s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
  2727. }
  2728. const CassResult *executeQuery(CassSession *session, CassStatement *statement)
  2729. {
  2730. CassandraFuture future(cass_session_execute(session, statement));
  2731. future.wait("executeQuery");
  2732. return cass_future_get_result(future);
  2733. }
  2734. const CassResult *fetchDataForKey(const char *key, CassSession *session, const CassandraXmlMapping *mappings)
  2735. {
  2736. StringBuffer names;
  2737. StringBuffer tableName;
  2738. getFieldNames(mappings+(key?1:0), names, tableName); // mappings+1 means we don't return the key column
  2739. VStringBuffer selectQuery("select %s from HPCC.%s", names.str()+1, tableName.str());
  2740. if (key)
  2741. selectQuery.appendf(" where %s='%s'", mappings->columnName, key); // MORE - should consider using prepared for this - is it faster?
  2742. selectQuery.append(';');
  2743. //if (traceLevel >= 2)
  2744. // DBGLOG("%s", selectQuery.str());
  2745. CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
  2746. return executeQuery(session, statement);
  2747. }
  2748. const CassResult *fetchDataForKeyAndWuid(const char *key, const char *wuid, CassSession *session, const CassandraXmlMapping *mappings)
  2749. {
  2750. StringBuffer names;
  2751. StringBuffer tableName;
  2752. getFieldNames(mappings+2, names, tableName); // mappings+1 means we don't return the key column
  2753. VStringBuffer selectQuery("select %s from HPCC.%s where %s='%s' and wuid='%s'", names.str()+1, tableName.str(), mappings->columnName, key, wuid); // MORE - should consider using prepared/bind for this - is it faster?
  2754. selectQuery.append(';');
  2755. //if (traceLevel >= 2)
  2756. // DBGLOG("%s", selectQuery.str());
  2757. CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
  2758. return executeQuery(session, statement);
  2759. }
  2760. void deleteSecondaryByKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key, const ICassandraSession *sessionCache, CassBatch *batch)
  2761. {
  2762. if (key && *key)
  2763. {
  2764. StringBuffer names;
  2765. StringBuffer tableName;
  2766. getFieldNames(mappings, names, tableName);
  2767. VStringBuffer insertQuery("DELETE from HPCC.%s where %s=? and wuid=?;", tableName.str(), mappings[0].columnName);
  2768. Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
  2769. CassandraStatement update(cass_prepared_bind(*prepared));
  2770. check(cass_statement_bind_string(update, 0, cass_string_init(key)));
  2771. check(cass_statement_bind_string(update, 1, cass_string_init(wuid)));
  2772. check(cass_batch_add_statement(batch, update));
  2773. }
  2774. }
  2775. void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, const ICassandraSession *sessionCache, CassBatch *batch)
  2776. {
  2777. StringBuffer names;
  2778. StringBuffer tableName;
  2779. getFieldNames(mappings, names, tableName);
  2780. VStringBuffer insertQuery("DELETE from HPCC.%s where wuid=?;", tableName.str());
  2781. Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
  2782. CassandraStatement update(cass_prepared_bind(*prepared));
  2783. check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
  2784. check(cass_batch_add_statement(batch, update));
  2785. }
  2786. void executeSimpleCommand(CassSession *session, const char *command)
  2787. {
  2788. CassandraStatement statement(cass_statement_new(cass_string_init(command), 0));
  2789. CassandraFuture future(cass_session_execute(session, statement));
  2790. future.wait("execute");
  2791. }
  2792. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  2793. {
  2794. StringBuffer schema;
  2795. executeSimpleCommand(session, describeTable(mappings, schema));
  2796. }
  2797. static void getBindingsString(const CassandraXmlMapping *mappings, StringBuffer &bindings, IPTree *inXML)
  2798. {
  2799. bindings.append(",*"); // For wuid
  2800. unsigned colidx = 1;
  2801. while (mappings[colidx].columnName)
  2802. {
  2803. if (mappings[colidx].mapper.fromXML(NULL, 0, inXML, mappings[colidx].xpath))
  2804. bindings.append(",*");
  2805. colidx++;
  2806. }
  2807. }
  2808. static void bindFromXML(const CassandraXmlMapping *mappings, CassStatement *statement, IPTree *inXML, int defaultValue)
  2809. {
  2810. unsigned colidx = 0;
  2811. unsigned bindidx = 0;
  2812. while (mappings[colidx].columnName)
  2813. {
  2814. if (mappings[colidx].mapper.fromXML(statement, bindidx, inXML, mappings[colidx].xpath, defaultValue))
  2815. bindidx++;
  2816. colidx++;
  2817. }
  2818. }
  2819. extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML)
  2820. {
  2821. StringBuffer names;
  2822. StringBuffer bindings;
  2823. StringBuffer tableName;
  2824. getBoundFieldNames(mappings, names, bindings, inXML, tableName);
  2825. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2826. Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
  2827. CassandraStatement update(cass_prepared_bind(*prepared));
  2828. bindFromXML(mappings, update, inXML, 0);
  2829. check(cass_batch_add_statement(batch, update));
  2830. }
  2831. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, int defaultValue)
  2832. {
  2833. if (elements->first())
  2834. {
  2835. do
  2836. {
  2837. IPTree &result = elements->query();
  2838. StringBuffer bindings;
  2839. StringBuffer names;
  2840. StringBuffer tableName;
  2841. getBoundFieldNames(mappings, names, bindings, &result, tableName);
  2842. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2843. Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
  2844. CassandraStatement update(cass_prepared_bind(*prepared));
  2845. check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
  2846. bindFromXML(mappings, update, &result, defaultValue);
  2847. check(cass_batch_add_statement(batch, update));
  2848. }
  2849. while (elements->next());
  2850. }
  2851. }
  2852. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, int defaultValue)
  2853. {
  2854. Owned<IPTreeIterator> elements = inXML->getElements(xpath);
  2855. childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
  2856. }
  2857. extern void wuResultsXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, IPTree *inXML, const char *xpath)
  2858. {
  2859. childXMLtoCassandra(session, batch, wuResultsMappings, inXML, xpath, 0);
  2860. }
  2861. extern void wuVariablesXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, IPTree *inXML, const char *xpath, int defaultSequence)
  2862. {
  2863. childXMLtoCassandra(session, batch, wuVariablesMappings, inXML, xpath, defaultSequence);
  2864. }
  2865. extern void cassandraToWuResultsXML(CassSession *session, const char *wuid, IPTree *wuTree)
  2866. {
  2867. CassandraResult result(fetchDataForKey(wuid, session, wuResultsMappings));
  2868. Owned<IPTree> results;
  2869. CassandraIterator rows(cass_iterator_from_result(result));
  2870. while (cass_iterator_next(rows))
  2871. {
  2872. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2873. Owned<IPTree> child;
  2874. if (!results)
  2875. results.setown(createPTree("Results"));
  2876. child.setown(createPTree("Result"));
  2877. unsigned colidx = 1;
  2878. while (cass_iterator_next(cols))
  2879. {
  2880. assertex(wuResultsMappings[colidx].columnName);
  2881. const CassValue *value = cass_iterator_get_column(cols);
  2882. if (value && !cass_value_is_null(value))
  2883. wuResultsMappings[colidx].mapper.toXML(child, wuResultsMappings[colidx].xpath, value);
  2884. colidx++;
  2885. }
  2886. const char *childName = child->queryName();
  2887. results->addPropTree(childName, child.getClear());
  2888. }
  2889. if (results)
  2890. wuTree->addPropTree("Results", results.getClear());
  2891. }
  2892. extern void cassandraToWuVariablesXML(CassSession *session, const char *wuid, IPTree *wuTree)
  2893. {
  2894. CassandraResult result(fetchDataForKey(wuid, session, wuVariablesMappings));
  2895. Owned<IPTree> variables;
  2896. Owned<IPTree> temporaries;
  2897. CassandraIterator rows(cass_iterator_from_result(result));
  2898. while (cass_iterator_next(rows))
  2899. {
  2900. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2901. if (!cass_iterator_next(cols))
  2902. fail("No column found reading wuvariables.sequence");
  2903. const CassValue *sequenceValue = cass_iterator_get_column(cols);
  2904. int sequence = getSignedResult(NULL, sequenceValue);
  2905. Owned<IPTree> child;
  2906. IPTree *parent;
  2907. switch (sequence)
  2908. {
  2909. case ResultSequenceStored:
  2910. if (!variables)
  2911. variables.setown(createPTree("Variables"));
  2912. child.setown(createPTree("Variable"));
  2913. parent = variables;
  2914. break;
  2915. case ResultSequenceInternal:
  2916. case ResultSequenceOnce:
  2917. if (!temporaries)
  2918. temporaries.setown(createPTree("Temporaries"));
  2919. child.setown(createPTree("Variable"));
  2920. parent = temporaries;
  2921. break;
  2922. default:
  2923. throwUnexpected();
  2924. break;
  2925. }
  2926. unsigned colidx = 2;
  2927. while (cass_iterator_next(cols))
  2928. {
  2929. assertex(wuVariablesMappings[colidx].columnName);
  2930. const CassValue *value = cass_iterator_get_column(cols);
  2931. if (value && !cass_value_is_null(value))
  2932. wuVariablesMappings[colidx].mapper.toXML(child, wuVariablesMappings[colidx].xpath, value);
  2933. colidx++;
  2934. }
  2935. const char *childName = child->queryName();
  2936. parent->addPropTree(childName, child.getClear());
  2937. }
  2938. if (variables)
  2939. wuTree->addPropTree("Variables", variables.getClear());
  2940. if (temporaries)
  2941. wuTree->addPropTree("Temporaries", temporaries.getClear());
  2942. }
  2943. /*
  2944. extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
  2945. {
  2946. StringBuffer names;
  2947. StringBuffer bindings;
  2948. StringBuffer tableName;
  2949. int numBound = getFieldNames(graphProgressMappings, names, bindings, tableName);
  2950. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2951. DBGLOG("%s", insertQuery.str());
  2952. CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
  2953. CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(insertQuery)));
  2954. futurePrep.wait("prepare statement");
  2955. CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
  2956. Owned<IPTreeIterator> graphs = inXML->getElements("./graph*");
  2957. ForEach(*graphs)
  2958. {
  2959. IPTree &graph = graphs->query();
  2960. Owned<IPTreeIterator> subgraphs = graph.getElements("./node");
  2961. ForEach(*subgraphs)
  2962. {
  2963. IPTree &subgraph = subgraphs->query();
  2964. CassandraStatement update(cass_prepared_bind(prepared));
  2965. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2966. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  2967. unsigned colidx = 2;
  2968. while (graphProgressMappings[colidx].columnName)
  2969. {
  2970. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &subgraph, graphProgressMappings[colidx].xpath);
  2971. colidx++;
  2972. }
  2973. check(cass_batch_add_statement(batch, update));
  2974. }
  2975. // And one more with subgraphid = 0 for the graph status
  2976. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
  2977. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2978. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  2979. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  2980. unsigned colidx = 4; // we skip progress and subgraphid
  2981. while (graphProgressMappings[colidx].columnName)
  2982. {
  2983. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &graph, graphProgressMappings[colidx].xpath);
  2984. colidx++;
  2985. }
  2986. check(cass_batch_add_statement(batch, update));
  2987. }
  2988. if (inXML->hasProp("Running"))
  2989. {
  2990. IPTree *running = inXML->queryPropTree("Running");
  2991. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
  2992. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2993. graphProgressMappings[1].mapper.fromXML(update, 1, running, graphProgressMappings[1].xpath);
  2994. graphProgressMappings[2].mapper.fromXML(update, 2, running, graphProgressMappings[2].xpath);
  2995. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  2996. check(cass_batch_add_statement(batch, update));
  2997. }
  2998. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  2999. futureBatch.wait("execute");
  3000. }
  3001. extern void cassandraToGraphProgressXML(CassSession *session, const char *wuid)
  3002. {
  3003. CassandraResult result(fetchDataForWu(wuid, session, graphProgressMappings));
  3004. Owned<IPTree> progress = createPTree(wuid);
  3005. CassandraIterator rows(cass_iterator_from_result(result));
  3006. while (cass_iterator_next(rows))
  3007. {
  3008. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3009. unsigned colidx = 1; // wuid is not returned
  3010. IPTree *ptree = progress;
  3011. while (cass_iterator_next(cols))
  3012. {
  3013. assertex(graphProgressMappings[colidx].columnName);
  3014. const CassValue *value = cass_iterator_get_column(cols);
  3015. // NOTE - this relies on the fact that progress is NULL when subgraphId=0, so that the status and id fields
  3016. // get set on the graph instead of on the child node in those cases.
  3017. if (value && !cass_value_is_null(value))
  3018. ptree = graphProgressMappings[colidx].mapper.toXML(ptree, graphProgressMappings[colidx].xpath, value);
  3019. colidx++;
  3020. }
  3021. }
  3022. StringBuffer out;
  3023. toXML(progress, out, 0, XML_SortTags|XML_Format);
  3024. printf("%s", out.str());
  3025. }
  3026. */
  3027. extern IPTree *cassandraToWorkunitXML(CassSession *session, const char *wuid)
  3028. {
  3029. CassandraResult result(fetchDataForKey(wuid, session, workunitsMappings));
  3030. CassandraIterator rows(cass_iterator_from_result(result));
  3031. if (cass_iterator_next(rows)) // should just be one
  3032. {
  3033. Owned<IPTree> wuXML = createPTree(wuid);
  3034. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3035. wuXML->setPropTree("Query", createPTree("Query"));
  3036. wuXML->setProp("Query/@fetchEntire", "1");
  3037. unsigned colidx = 1; // wuid is not returned
  3038. while (cass_iterator_next(cols))
  3039. {
  3040. assertex(workunitsMappings[colidx].columnName);
  3041. const CassValue *value = cass_iterator_get_column(cols);
  3042. if (value && !cass_value_is_null(value))
  3043. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  3044. colidx++;
  3045. }
  3046. return wuXML.getClear();
  3047. }
  3048. else
  3049. return NULL;
  3050. }
  3051. static const CassValue *getSingleResult(const CassResult *result)
  3052. {
  3053. const CassRow *row = cass_result_first_row(result);
  3054. if (row)
  3055. return cass_row_get_column(row, 0);
  3056. else
  3057. return NULL;
  3058. }
  3059. static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
  3060. {
  3061. CassString output;
  3062. check(cass_value_get_string(value, &output));
  3063. return str.append(output.length, output.data);
  3064. }
  3065. /*
  3066. extern void cassandraTestGraphProgressXML()
  3067. {
  3068. CassandraCluster cluster(cass_cluster_new());
  3069. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  3070. CassandraSession session(cass_session_new());
  3071. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  3072. future.wait("connect");
  3073. ensureTable(session, graphProgressMappings);
  3074. Owned<IPTree> inXML = createPTreeFromXMLFile("/data/rchapman/hpcc/testing/regress/ecl/a.xml");
  3075. graphProgressXMLtoCassandra(session, inXML);
  3076. const char *wuid = inXML->queryName();
  3077. cassandraToGraphProgressXML(session, wuid);
  3078. }
  3079. extern void cassandraTest()
  3080. {
  3081. cassandraTestWorkunitXML();
  3082. //cassandraTestGraphProgressXML();
  3083. }
  3084. */
  3085. class CCassandraWorkUnit : public CLocalWorkUnit
  3086. {
  3087. public:
  3088. IMPLEMENT_IINTERFACE;
  3089. CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser)
  3090. : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
  3091. {
  3092. CLocalWorkUnit::loadPTree(wuXML);
  3093. allDirty = false; // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
  3094. basicDirty = false;
  3095. abortDirty = true;
  3096. abortState = false;
  3097. }
  3098. ~CCassandraWorkUnit()
  3099. {
  3100. }
  3101. virtual void forceReload()
  3102. {
  3103. printStackReport();
  3104. UNIMPLEMENTED;
  3105. abortDirty = true;
  3106. }
  3107. virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
  3108. {
  3109. const char *wuid = queryWuid();
  3110. CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
  3111. if (!batch)
  3112. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
  3113. deleteChildren(wuid);
  3114. deleteSecondaries(wuid);
  3115. Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from HPCC.workunits where wuid=?;");
  3116. CassandraStatement update(cass_prepared_bind(*prepared));
  3117. check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
  3118. check(cass_batch_add_statement(*batch, update));
  3119. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
  3120. futureBatch.wait("execute");
  3121. batch.clear();
  3122. }
  3123. virtual void commit()
  3124. {
  3125. CLocalWorkUnit::commit();
  3126. if (sessionCache->queryTraceLevel() >= 8)
  3127. {
  3128. StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
  3129. }
  3130. if (batch)
  3131. {
  3132. const char *wuid = queryWuid();
  3133. if (basicDirty)
  3134. {
  3135. updateSecondaries(wuid);
  3136. }
  3137. simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, p); // This just does the parent row
  3138. if (allDirty)
  3139. {
  3140. // MORE - these deletes are technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
  3141. // empty WU from XML text, they are unnecessary.
  3142. deleteChildren(wuid);
  3143. wuResultsXMLtoCassandra(sessionCache, *batch, p, "Results/Result");
  3144. wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Variables/Variable", ResultSequenceStored);
  3145. wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", ResultSequenceInternal); // NOTE - lookups may also request ResultSequenceOnce
  3146. childXMLtoCassandra(sessionCache, *batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
  3147. childXMLtoCassandra(sessionCache, *batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
  3148. }
  3149. else
  3150. {
  3151. ResultPTreeIterator dirtyResultsIterator(dirtyResults);
  3152. childXMLtoCassandra(sessionCache, *batch, wuResultsMappings, wuid, &dirtyResultsIterator, 0); // MORE - all the other dirty subtrees... TBD
  3153. }
  3154. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
  3155. futureBatch.wait("execute");
  3156. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED))); // Commit leaves it locked...
  3157. prevOwner.clear();
  3158. allDirty = false;
  3159. }
  3160. else
  3161. DBGLOG("No batch present??");
  3162. }
  3163. virtual void setUser(const char *user)
  3164. {
  3165. if (trackSecondaryChange(user, queryUser(), prevOwner))
  3166. CLocalWorkUnit::setUser(user);
  3167. }
  3168. virtual void _lockRemote()
  3169. {
  3170. // Ignore locking for now!
  3171. // printStackReport();
  3172. // UNIMPLEMENTED;
  3173. batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
  3174. }
  3175. virtual void _unlockRemote()
  3176. {
  3177. // printStackReport();
  3178. // UNIMPLEMENTED;
  3179. commit();
  3180. batch.clear();
  3181. }
  3182. virtual void subscribe(WUSubscribeOptions options)
  3183. {
  3184. // printStackReport();
  3185. // UNIMPLEMENTED;
  3186. }
  3187. virtual void unsubscribe()
  3188. {
  3189. // printStackReport();
  3190. // UNIMPLEMENTED;
  3191. }
  3192. virtual bool aborting() const
  3193. {
  3194. return false;
  3195. // MORE - work out what to do about aborts in Cassandra
  3196. // printStackReport();
  3197. // UNIMPLEMENTED;
  3198. }
  3199. virtual IWUResult * updateResultByName(const char * name)
  3200. {
  3201. return noteDirty(CLocalWorkUnit::updateResultByName(name));
  3202. }
  3203. virtual IWUResult * updateResultBySequence(unsigned seq)
  3204. {
  3205. return noteDirty(CLocalWorkUnit::updateResultBySequence(seq));
  3206. }
  3207. virtual IWUResult * updateTemporaryByName(const char * name)
  3208. {
  3209. return noteDirty(CLocalWorkUnit::updateTemporaryByName(name));
  3210. }
  3211. virtual IWUResult * updateVariableByName(const char * name)
  3212. {
  3213. return noteDirty(CLocalWorkUnit::updateVariableByName(name));
  3214. }
  3215. virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
  3216. {
  3217. // Make sure that any required updates to the secondary files happen
  3218. trackSecondaryChange(cached->queryUser(), queryUser(), prevOwner);
  3219. // This populates entire XML tree - so we need to note that we did so to ensure everything is flushed by commit
  3220. CLocalWorkUnit::copyWorkUnit(cached, all);
  3221. allDirty = true;
  3222. }
  3223. virtual void _loadResults() const
  3224. {
  3225. // Lazy populate the Results branch of p from Cassandra
  3226. cassandraToWuResultsXML(sessionCache->querySession(), queryWuid(), p);
  3227. // MORE - we should really also override CLocalWUResult so that value and schema can be lazy-fetched
  3228. CLocalWorkUnit::_loadResults();
  3229. StringBuffer buf; toXML(p, buf); DBGLOG("After _loadResults:\n%s", buf.str());
  3230. }
  3231. protected:
  3232. // Delete child table rows
  3233. void deleteChildren(const char *wuid)
  3234. {
  3235. deleteChildByWuid(wuResultsMappings, wuid, sessionCache, *batch);
  3236. deleteChildByWuid(wuVariablesMappings, wuid, sessionCache, *batch);
  3237. deleteChildByWuid(wuExceptionsMappings, wuid, sessionCache, *batch);
  3238. deleteChildByWuid(wuStatisticsMappings, wuid, sessionCache, *batch);
  3239. }
  3240. // Update secondary tables (used to search wuids by orner, state, jobname etc)
  3241. void updateSecondaryTable(const CassandraXmlMapping *mappings, const char *wuid, const char *prevKey)
  3242. {
  3243. deleteSecondaryByKey(mappings, wuid, prevKey, sessionCache, *batch);
  3244. if (p->hasProp(mappings[0].xpath))
  3245. simpleXMLtoCassandra(sessionCache, *batch, mappings, p);
  3246. }
  3247. void deleteSecondaries(const char *wuid)
  3248. {
  3249. deleteSecondaryByKey(ownerMappings, wuid, queryUser(), sessionCache, *batch);
  3250. }
  3251. void updateSecondaries(const char *wuid)
  3252. {
  3253. updateSecondaryTable(ownerMappings, wuid, prevOwner);
  3254. }
  3255. // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
  3256. bool trackSecondaryChange(const char *newval, const char *oldval, StringAttr &tracker)
  3257. {
  3258. if (!newval)
  3259. newval = "";
  3260. if (streq(newval, oldval))
  3261. return false; // No change
  3262. if (!tracker) // We need to record the last _committed_ value so we can update the secondary tables appropriately
  3263. tracker.set(oldval);
  3264. basicDirty = true;
  3265. return true;
  3266. }
  3267. // Allows us to iterate over an array of IPTrees - MORE this could be in jptree? Should save the trees not the results I suspect.
  3268. class ResultPTreeIterator : implements CInterfaceOf<IPTreeIterator>
  3269. {
  3270. public:
  3271. ResultPTreeIterator(IArrayOf<IWUResult> &_results) : r(_results), idx(0), p(NULL) {}
  3272. virtual bool first() { idx = 0; return isValid(); }
  3273. virtual bool next() { idx++; return isValid(); }
  3274. virtual bool isValid()
  3275. {
  3276. if (r.isItem(idx))
  3277. {
  3278. p = r.item(idx).queryPTree();
  3279. return true;
  3280. }
  3281. else
  3282. {
  3283. p = NULL;
  3284. return false;
  3285. }
  3286. }
  3287. virtual IPropertyTree & query() { return *p; }
  3288. protected:
  3289. IArrayOf<IWUResult> &r;
  3290. IPropertyTree *p;
  3291. unsigned idx;
  3292. };
  3293. IWUResult *noteDirty(IWUResult *result)
  3294. {
  3295. if (result)
  3296. dirtyResults.append(*LINK(result));
  3297. return result;
  3298. }
  3299. const ICassandraSession *sessionCache;
  3300. mutable bool abortDirty;
  3301. mutable bool abortState;
  3302. bool allDirty;
  3303. bool basicDirty;
  3304. StringAttr prevOwner;
  3305. Owned<CassandraBatch> batch;
  3306. IArrayOf<IWUResult> dirtyResults;
  3307. };
  3308. class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
  3309. {
  3310. public:
  3311. CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0)
  3312. {
  3313. StringArray options;
  3314. Owned<IPTreeIterator> it = props->getElements("Option");
  3315. ForEach(*it)
  3316. {
  3317. IPTree &item = it->query();
  3318. const char *opt = item.queryProp("@name");
  3319. const char *val = item.queryProp("@value");
  3320. if (opt && val)
  3321. {
  3322. if (strieq(opt, "randomWuidSuffix"))
  3323. randomizeSuffix = atoi(val);
  3324. else if (strieq(opt, "traceLevel"))
  3325. traceLevel = atoi(val);
  3326. else
  3327. {
  3328. VStringBuffer optstr("%s=%s", opt, val);
  3329. options.append(optstr);
  3330. }
  3331. }
  3332. }
  3333. cluster.setOptions(options);
  3334. session.set(cass_session_new());
  3335. // CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  3336. CassandraFuture future(cass_session_connect(session, cluster)); // Since we don't know if HPCC keyspace exists, easier not to connect to it but rather specify it explicitly in each query
  3337. future.wait("connect");
  3338. VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS hpcc WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' } ;"); // MORE - options from props!
  3339. executeSimpleCommand(session, create);
  3340. ensureTable(session, workunitsMappings);
  3341. ensureTable(session, ownerMappings);
  3342. ensureTable(session, wuResultsMappings);
  3343. ensureTable(session, wuVariablesMappings);
  3344. ensureTable(session, wuExceptionsMappings);
  3345. ensureTable(session, wuStatisticsMappings);
  3346. }
  3347. ~CCasssandraWorkUnitFactory()
  3348. {
  3349. }
  3350. /*
  3351. virtual IWorkUnit * createWorkUnit(const char * app, const char * user, ISecManager *secmgr, ISecUser *secuser) { printStackReport(); UNIMPLEMENTED; }
  3352. virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3353. virtual IConstWorkUnit * openWorkUnit(const char * wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
  3354. {
  3355. // MORE - what to do about lock?
  3356. Owned<IPTree> wuXML = cassandraToWorkunitXML(session, wuid);
  3357. if (wuXML)
  3358. return new CConstCassandraWorkUnit(wuXML.getClear(), NULL, NULL);
  3359. else
  3360. return NULL;
  3361. }
  3362. virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3363. virtual int setTracingLevel(int newlevel) { UNIMPLEMENTED; }
  3364. virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *user, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3365. */
  3366. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3367. {
  3368. unsigned suffix;
  3369. unsigned suffixLength;
  3370. if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
  3371. {
  3372. suffix = rand();
  3373. suffixLength = randomizeSuffix;
  3374. }
  3375. else
  3376. {
  3377. suffix = 0;
  3378. suffixLength = 0;
  3379. }
  3380. Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO HPCC.workunits (wuid) VALUES (?) IF NOT EXISTS;");
  3381. loop
  3382. {
  3383. // Create a unique WUID by adding suffixes until we managed to add a new value
  3384. StringBuffer useWuid(wuid);
  3385. if (suffix)
  3386. {
  3387. useWuid.append("-");
  3388. for (unsigned i = 0; i < suffixLength; i++)
  3389. {
  3390. useWuid.appendf("%c", '0'+suffix%10);
  3391. suffix /= 10;
  3392. }
  3393. }
  3394. CassandraStatement statement(cass_prepared_bind(*prepared));
  3395. check(cass_statement_bind_string(statement, 0, cass_string_init(useWuid.str())));
  3396. if (traceLevel >= 2)
  3397. DBGLOG("Try creating %s", useWuid.str());
  3398. CassandraFuture future(cass_session_execute(session, statement));
  3399. future.wait("execute");
  3400. CassandraResult result(cass_future_get_result(future));
  3401. CassString columnName = cass_result_column_name(result, 0);
  3402. if (cass_result_column_count(result)==1)
  3403. {
  3404. // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
  3405. // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
  3406. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, createPTree(useWuid), secmgr, secuser);
  3407. wu->lockRemote(true);
  3408. return wu.getClear();
  3409. }
  3410. suffix = rand();
  3411. if (suffixLength<9)
  3412. suffixLength++;
  3413. }
  3414. }
  3415. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
  3416. {
  3417. // MORE - what to do about lock?
  3418. Owned<IPTree> wuXML = cassandraToWorkunitXML(session, wuid);
  3419. if (wuXML)
  3420. return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
  3421. else
  3422. return NULL;
  3423. }
  3424. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3425. {
  3426. // Ignore locking for now
  3427. // Note - in Dali, this would lock for write, whereas _openWorkUnit would either lock for read (if lock set) or not lock at all
  3428. Owned<IPTree> wuXML = cassandraToWorkunitXML(session, wuid);
  3429. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
  3430. wu->lockRemote(true);
  3431. return wu.getClear();
  3432. }
  3433. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) { UNIMPLEMENTED; }
  3434. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
  3435. {
  3436. return getWorkUnitsByXXX(ownerMappings, owner, secmgr, secuser);
  3437. }
  3438. virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3439. virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3440. virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3441. virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3442. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField * sortorder, WUSortField * filters, const void * filterbuf,
  3443. unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
  3444. ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3445. virtual unsigned numWorkUnits()
  3446. {
  3447. Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM HPCC.workunits;");
  3448. CassandraStatement statement(cass_prepared_bind(*prepared));
  3449. CassandraFuture future(cass_session_execute(session, statement));
  3450. future.wait("select count(*)");
  3451. CassandraResult result(cass_future_get_result(future));
  3452. return getUnsignedResult(NULL, getSingleResult(result));
  3453. }
  3454. /*
  3455. virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
  3456. virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
  3457. virtual bool isAborting(const char *wuid) const { UNIMPLEMENTED; }
  3458. virtual void clearAborting(const char *wuid) { UNIMPLEMENTED; }
  3459. */
  3460. virtual CassSession *querySession() const { return session; };
  3461. virtual unsigned queryTraceLevel() const { return traceLevel; };
  3462. virtual CassandraPrepared *prepareStatement(const char *query) const
  3463. {
  3464. CriticalBlock b(cacheCrit);
  3465. Linked<CassandraPrepared> cached = preparedCache.getValue(query);
  3466. if (cached)
  3467. {
  3468. if (traceLevel >= 2)
  3469. DBGLOG("prepareStatement: Reusing %s", query);
  3470. return cached.getClear();
  3471. }
  3472. {
  3473. if (traceLevel >= 2)
  3474. DBGLOG("prepareStatement: Binding %s", query);
  3475. // We don't want to block cache lookups while we prepare a new bound statement
  3476. // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
  3477. CriticalUnblock b(cacheCrit);
  3478. CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(query)));
  3479. futurePrep.wait("prepare statement");
  3480. cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep)));
  3481. }
  3482. preparedCache.setValue(query, cached); // NOTE - this links parameter
  3483. return cached.getClear();
  3484. }
  3485. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
  3486. {
  3487. VStringBuffer select("select state from hpcc.workunits where wuid = '%s';", wuid);
  3488. CassandraStatement statement(cass_statement_new(cass_string_init(select.str()), 0));
  3489. unsigned start = msTick();
  3490. loop
  3491. {
  3492. CassandraFuture future(cass_session_execute(session, statement));
  3493. future.wait("Lookup wu state");
  3494. CassandraResult result(cass_future_get_result(future));
  3495. const CassValue *value = getSingleResult(result);
  3496. if (value == NULL)
  3497. return WUStateUnknown;
  3498. CassString output;
  3499. check(cass_value_get_string(value, &output));
  3500. StringBuffer stateStr(output.length, output.data);
  3501. WUState state = getWorkUnitState(stateStr);
  3502. switch (state)
  3503. {
  3504. case WUStateCompiled:
  3505. case WUStateUploadingFiles:
  3506. if (compiled)
  3507. return state;
  3508. break;
  3509. case WUStateCompleted:
  3510. case WUStateFailed:
  3511. case WUStateAborted:
  3512. return state;
  3513. case WUStateWait:
  3514. if (returnOnWaitState)
  3515. return state;
  3516. break;
  3517. case WUStateCompiling:
  3518. case WUStateRunning:
  3519. case WUStateDebugPaused:
  3520. case WUStateDebugRunning:
  3521. case WUStateBlocked:
  3522. case WUStateAborting:
  3523. // MORE - can see if agent still running, and set to failed if it is not
  3524. break;
  3525. }
  3526. unsigned waited = msTick() - start;
  3527. if (timeout != -1 && waited > timeout)
  3528. {
  3529. return WUStateUnknown;
  3530. break;
  3531. }
  3532. Sleep(1000); // MORE - may want to back off as waited gets longer...
  3533. }
  3534. }
  3535. unsigned validateRepository(bool fix)
  3536. {
  3537. unsigned errCount = 0;
  3538. // MORE - if the batch gets too big you may need to flush it occasionally
  3539. CassandraBatch batch(fix ? cass_batch_new(CASS_BATCH_TYPE_LOGGED) : NULL);
  3540. // 1. Check that every entry in main wu table has matching entries in secondary tables
  3541. CassandraResult result(fetchDataForKey(NULL, session, workunitInfoMappings));
  3542. CassandraIterator rows(cass_iterator_from_result(result));
  3543. while (cass_iterator_next(rows))
  3544. {
  3545. Owned<IPTree> wuXML = rowToPTree(NULL, workunitInfoMappings, cass_iterator_get_row(rows));
  3546. const char *wuid = wuXML->queryName();
  3547. // For each secondary file, check that we get matching XML
  3548. errCount += validateSecondary(ownerMappings, wuid, wuXML, batch);
  3549. }
  3550. // 2. Check that there are no orphaned entries in secondary or child tables
  3551. errCount += checkOrphans(ownerMappings, 1, batch);
  3552. errCount += checkOrphans(wuResultsMappings, 0, batch);
  3553. errCount += checkOrphans(wuVariablesMappings, 0, batch);
  3554. errCount += checkOrphans(wuExceptionsMappings, 0, batch);
  3555. errCount += checkOrphans(wuStatisticsMappings, 0, batch);
  3556. // 3. Commit fixes
  3557. if (batch)
  3558. {
  3559. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  3560. futureBatch.wait("Fix_repository");
  3561. }
  3562. return errCount;
  3563. }
  3564. private:
  3565. bool checkWuExists(const char *wuid)
  3566. {
  3567. Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM HPCC.workunits where wuid=?;");
  3568. CassandraStatement statement(cass_prepared_bind(*prepared));
  3569. cass_statement_bind_string(statement, 0, cass_string_init(wuid));
  3570. CassandraFuture future(cass_session_execute(session, statement));
  3571. future.wait("select count(*)");
  3572. CassandraResult result(cass_future_get_result(future));
  3573. return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
  3574. }
  3575. IConstWorkUnitIterator * getWorkUnitsByXXX(const CassandraXmlMapping *mappings, const char *key, ISecManager *secmgr, ISecUser *secuser)
  3576. {
  3577. if (!key || !*key)
  3578. mappings=workunitInfoMappings; // Historically, providing no value on a call to getWorkUnitsByOwner (for example) filter meant unfiltered...
  3579. CassandraResult result(fetchDataForKey(key, session, mappings));
  3580. Owned<IPTree> parent = createPTree("WorkUnits");
  3581. CassandraIterator rows(cass_iterator_from_result(result));
  3582. while (cass_iterator_next(rows))
  3583. {
  3584. Owned<IPTree> wuXML = rowToPTree(key, mappings, cass_iterator_get_row(rows));
  3585. const char *wuid = wuXML->queryName();
  3586. parent->addPropTree(wuid, wuXML.getClear());
  3587. }
  3588. Owned<IPropertyTreeIterator> iter = parent->getElements("*");
  3589. return createConstWUIterator(iter, secmgr, secuser);
  3590. }
  3591. unsigned validateSecondary(const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuXML, CassBatch *batch)
  3592. {
  3593. unsigned errCount = 0;
  3594. const char *childKey = wuXML->queryProp(mappings->xpath);
  3595. if (childKey && *childKey)
  3596. {
  3597. CassandraResult result(fetchDataForKeyAndWuid(childKey, wuid, session, mappings));
  3598. switch (cass_result_row_count(result))
  3599. {
  3600. case 0:
  3601. DBGLOG("Missing secondary data in %s for wuid=%s %s=%s", queryTableName(mappings), wuid, mappings->columnName, childKey);
  3602. if (batch)
  3603. simpleXMLtoCassandra(this, batch, mappings, wuXML);
  3604. errCount++;
  3605. break;
  3606. case 1:
  3607. {
  3608. Owned<IPTree> secXML = rowToPTree(NULL, mappings+2, cass_result_first_row(result)); // wuid and key not returned
  3609. secXML->setProp(mappings->xpath, childKey);
  3610. secXML->renameProp("/", wuid);
  3611. if (!areMatchingPTrees(wuXML, secXML))
  3612. {
  3613. DBGLOG("Mismatched data in %s for wuid %s", queryTableName(mappings), wuid);
  3614. if (batch)
  3615. simpleXMLtoCassandra(this, batch, mappings, wuXML);
  3616. errCount++;
  3617. }
  3618. break;
  3619. }
  3620. default:
  3621. DBGLOG("Multiple secondary data %d in %s for wuid %s", (int) cass_result_row_count(result), queryTableName(mappings), wuid); // This should be impossible!
  3622. if (batch)
  3623. {
  3624. deleteSecondaryByKey(mappings, wuid, childKey, this, batch);
  3625. simpleXMLtoCassandra(this, batch, mappings, wuXML);
  3626. }
  3627. break;
  3628. }
  3629. }
  3630. return errCount;
  3631. }
  3632. unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, CassBatch *batch)
  3633. {
  3634. unsigned errCount = 0;
  3635. CassandraResult result(fetchDataForKey(NULL, session, mappings));
  3636. CassandraIterator rows(cass_iterator_from_result(result));
  3637. while (cass_iterator_next(rows))
  3638. {
  3639. const CassRow *row = cass_iterator_get_row(rows);
  3640. StringBuffer wuid;
  3641. getCassString(wuid, cass_row_get_column(row, wuidIndex));
  3642. if (!checkWuExists(wuid))
  3643. {
  3644. DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
  3645. if (batch)
  3646. {
  3647. if (wuidIndex)
  3648. {
  3649. StringBuffer key;
  3650. getCassString(key, cass_row_get_column(row, 0));
  3651. deleteSecondaryByKey(mappings, wuid, key, this, batch);
  3652. }
  3653. else
  3654. deleteChildByWuid(mappings, wuid, this, batch);
  3655. }
  3656. errCount++;
  3657. }
  3658. }
  3659. return errCount;
  3660. }
  3661. IPTree *rowToPTree(const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
  3662. {
  3663. CassandraIterator cols(cass_iterator_from_row(row));
  3664. Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
  3665. if (key && *key)
  3666. {
  3667. xml->setProp(mappings->xpath, key);
  3668. mappings++;
  3669. }
  3670. while (cass_iterator_next(cols))
  3671. {
  3672. assertex(mappings->columnName);
  3673. const CassValue *value = cass_iterator_get_column(cols);
  3674. if (value && !cass_value_is_null(value))
  3675. mappings->mapper.toXML(xml, mappings->xpath, value);
  3676. mappings++;
  3677. }
  3678. return xml.getClear();
  3679. }
  3680. unsigned randomizeSuffix;
  3681. unsigned traceLevel;
  3682. CassandraCluster cluster;
  3683. CassandraSession session;
  3684. mutable CriticalSection cacheCrit;
  3685. mutable MapStringToMyClass<CassandraPrepared> preparedCache;
  3686. };
  3687. } // namespace
  3688. extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *props)
  3689. {
  3690. return new cassandraembed::CCasssandraWorkUnitFactory(props);
  3691. }