cassandraembed.cpp 64 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #ifdef _WIN32
  28. #define EXPORT __declspec(dllexport)
  29. #else
  30. #define EXPORT
  31. #endif
  32. static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  33. static void UNSUPPORTED(const char *feature)
  34. {
  35. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  36. }
  37. static const char * compatibleVersions[] = {
  38. "Cassandra Embed Helper 1.0.0",
  39. NULL };
  40. static const char *version = "Cassandra Embed Helper 1.0.0";
  41. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  42. {
  43. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  44. {
  45. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  46. pbx->compatibleVersions = compatibleVersions;
  47. }
  48. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  49. return false;
  50. pb->magicVersion = PLUGIN_VERSION;
  51. pb->version = version;
  52. pb->moduleName = "cassandra";
  53. pb->ECL = NULL;
  54. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  55. pb->description = "Cassandra Embed Helper";
  56. return true;
  57. }
  58. namespace cassandraembed {
  59. static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  60. static void fail(const char *msg) __attribute__((noreturn));
  61. static void failx(const char *message, ...)
  62. {
  63. va_list args;
  64. va_start(args,message);
  65. StringBuffer msg;
  66. msg.append("cassandra: ").valist_appendf(message,args);
  67. va_end(args);
  68. rtlFail(0, msg.str());
  69. }
  70. static void fail(const char *message)
  71. {
  72. StringBuffer msg;
  73. msg.append("cassandra: ").append(message);
  74. rtlFail(0, msg.str());
  75. }
  76. // Wrappers to Cassandra structures that require corresponding releases
  77. class CassandraCluster : public CInterface
  78. {
  79. public:
  80. CassandraCluster(CassCluster *_cluster) : cluster(_cluster)
  81. {
  82. }
  83. ~CassandraCluster()
  84. {
  85. if (cluster)
  86. cass_cluster_free(cluster);
  87. }
  88. inline operator CassCluster *() const
  89. {
  90. return cluster;
  91. }
  92. private:
  93. CassandraCluster(const CassandraCluster &);
  94. CassCluster *cluster;
  95. };
  96. class CassandraFuture : public CInterface
  97. {
  98. public:
  99. CassandraFuture(CassFuture *_future) : future(_future)
  100. {
  101. }
  102. ~CassandraFuture()
  103. {
  104. if (future)
  105. cass_future_free(future);
  106. }
  107. inline operator CassFuture *() const
  108. {
  109. return future;
  110. }
  111. void wait(const char *why)
  112. {
  113. cass_future_wait(future);
  114. CassError rc = cass_future_error_code(future);
  115. if(rc != CASS_OK)
  116. {
  117. CassString message = cass_future_error_message(future);
  118. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int)message.length, message.data);
  119. rtlFail(0, err.str());
  120. }
  121. }
  122. private:
  123. CassandraFuture(const CassandraFuture &);
  124. CassFuture *future;
  125. };
  126. class CassandraSession : public CInterface
  127. {
  128. public:
  129. CassandraSession(CassSession *_session) : session(_session)
  130. {
  131. }
  132. ~CassandraSession()
  133. {
  134. if (session)
  135. {
  136. CassandraFuture close_future(cass_session_close(session));
  137. cass_future_wait(close_future);
  138. cass_session_free(session);
  139. }
  140. }
  141. inline operator CassSession *() const
  142. {
  143. return session;
  144. }
  145. private:
  146. CassandraSession(const CassandraSession &);
  147. CassSession *session;
  148. };
  149. class CassandraBatch : public CInterface
  150. {
  151. public:
  152. CassandraBatch(CassBatch *_batch) : batch(_batch)
  153. {
  154. }
  155. ~CassandraBatch()
  156. {
  157. if (batch)
  158. cass_batch_free(batch);
  159. }
  160. inline operator CassBatch *() const
  161. {
  162. return batch;
  163. }
  164. private:
  165. CassandraBatch(const CassandraBatch &);
  166. CassBatch *batch;
  167. };
  168. class CassandraStatement : public CInterface
  169. {
  170. public:
  171. CassandraStatement(CassStatement *_statement) : statement(_statement)
  172. {
  173. }
  174. ~CassandraStatement()
  175. {
  176. if (statement)
  177. cass_statement_free(statement);
  178. }
  179. inline operator CassStatement *() const
  180. {
  181. return statement;
  182. }
  183. private:
  184. CassandraStatement(const CassandraStatement &);
  185. CassStatement *statement;
  186. };
  187. class CassandraPrepared : public CInterface
  188. {
  189. public:
  190. CassandraPrepared(const CassPrepared *_prepared) : prepared(_prepared)
  191. {
  192. }
  193. ~CassandraPrepared()
  194. {
  195. if (prepared)
  196. cass_prepared_free(prepared);
  197. }
  198. inline operator const CassPrepared *() const
  199. {
  200. return prepared;
  201. }
  202. private:
  203. CassandraPrepared(const CassandraPrepared &);
  204. const CassPrepared *prepared;
  205. };
  206. class CassandraResult : public CInterface
  207. {
  208. public:
  209. CassandraResult(const CassResult *_result) : result(_result)
  210. {
  211. }
  212. ~CassandraResult()
  213. {
  214. if (result)
  215. cass_result_free(result);
  216. }
  217. inline operator const CassResult *() const
  218. {
  219. return result;
  220. }
  221. private:
  222. CassandraResult(const CassandraResult &);
  223. const CassResult *result;
  224. };
  225. class CassandraIterator : public CInterface
  226. {
  227. public:
  228. CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  229. {
  230. }
  231. ~CassandraIterator()
  232. {
  233. if (iterator)
  234. cass_iterator_free(iterator);
  235. }
  236. inline operator CassIterator *() const
  237. {
  238. return iterator;
  239. }
  240. private:
  241. CassandraIterator(const CassandraIterator &);
  242. CassIterator *iterator;
  243. };
  244. class CassandraCollection : public CInterface
  245. {
  246. public:
  247. CassandraCollection(CassCollection *_collection) : collection(_collection)
  248. {
  249. }
  250. ~CassandraCollection()
  251. {
  252. if (collection)
  253. cass_collection_free(collection);
  254. }
  255. inline operator CassCollection *() const
  256. {
  257. return collection;
  258. }
  259. private:
  260. CassandraCollection(const CassandraCollection &);
  261. CassCollection *collection;
  262. };
  263. void check(CassError rc)
  264. {
  265. if (rc != CASS_OK)
  266. {
  267. fail(cass_error_desc(rc));
  268. }
  269. }
  270. class CassandraStatementInfo : public CInterface
  271. {
  272. public:
  273. IMPLEMENT_IINTERFACE;
  274. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode)
  275. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
  276. {
  277. assertex(prepared && *prepared);
  278. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  279. }
  280. ~CassandraStatementInfo()
  281. {
  282. stop();
  283. }
  284. inline void stop()
  285. {
  286. iterator.clear();
  287. result.clear();
  288. prepared.clear();
  289. }
  290. bool next()
  291. {
  292. if (!iterator)
  293. return false;
  294. return cass_iterator_next(*iterator);
  295. }
  296. void startStream()
  297. {
  298. if (batchMode != (CassBatchType) -1)
  299. {
  300. batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
  301. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  302. }
  303. }
  304. void endStream()
  305. {
  306. if (batch)
  307. {
  308. CassandraFuture future(cass_session_execute_batch(*session, *batch));
  309. future.wait("execute");
  310. result.setown(new CassandraResult(cass_future_get_result(future)));
  311. assertex (rowCount() == 0);
  312. }
  313. }
  314. void execute()
  315. {
  316. assertex(statement && *statement);
  317. if (batch)
  318. {
  319. check(cass_batch_add_statement(*batch, *statement));
  320. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  321. }
  322. else
  323. {
  324. CassandraFuture future(cass_session_execute(*session, *statement));
  325. future.wait("execute");
  326. result.setown(new CassandraResult(cass_future_get_result(future)));
  327. if (rowCount() > 0)
  328. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  329. }
  330. }
  331. inline size_t rowCount() const
  332. {
  333. return cass_result_row_count(*result);
  334. }
  335. inline bool hasResult() const
  336. {
  337. return result != NULL;
  338. }
  339. inline const CassRow *queryRow() const
  340. {
  341. assertex(iterator && *iterator);
  342. return cass_iterator_get_row(*iterator);
  343. }
  344. inline CassStatement *queryStatement() const
  345. {
  346. assertex(statement && *statement);
  347. return *statement;
  348. }
  349. protected:
  350. Linked<CassandraSession> session;
  351. Linked<CassandraPrepared> prepared;
  352. Owned<CassandraBatch> batch;
  353. Owned<CassandraStatement> statement;
  354. Owned<CassandraResult> result;
  355. Owned<CassandraIterator> iterator;
  356. unsigned numBindings;
  357. CassBatchType(batchMode);
  358. };
  359. // Conversions from Cassandra values to ECL data
  360. static const char *getTypeName(CassValueType type)
  361. {
  362. switch (type)
  363. {
  364. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  365. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  366. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  367. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  368. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  369. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  370. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  371. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  372. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  373. case CASS_VALUE_TYPE_INT: return "INT";
  374. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  375. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  376. case CASS_VALUE_TYPE_UUID: return "UUID";
  377. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  378. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  379. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  380. case CASS_VALUE_TYPE_INET: return "INET";
  381. case CASS_VALUE_TYPE_LIST: return "LIST";
  382. case CASS_VALUE_TYPE_MAP: return "MAP";
  383. case CASS_VALUE_TYPE_SET: return "SET";
  384. default: return "UNKNOWN";
  385. }
  386. }
  387. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  388. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  389. {
  390. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  391. if (field)
  392. msg.appendf(" for field %s", field->name->str());
  393. if (value)
  394. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  395. rtlFail(0, msg.str());
  396. }
  397. static bool isInteger(const CassValue *value)
  398. {
  399. switch (cass_value_type(value))
  400. {
  401. case CASS_VALUE_TYPE_TIMESTAMP:
  402. case CASS_VALUE_TYPE_INT:
  403. case CASS_VALUE_TYPE_BIGINT:
  404. case CASS_VALUE_TYPE_COUNTER:
  405. case CASS_VALUE_TYPE_VARINT:
  406. return true;
  407. default:
  408. return false;
  409. }
  410. }
  411. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  412. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  413. {
  414. const RtlTypeInfo *type = field->type;
  415. if ((type->fieldType & RFTMkind) == type_set)
  416. return type->queryChildType();
  417. else
  418. return type;
  419. }
  420. static int getNumFields(const RtlTypeInfo *record)
  421. {
  422. int count = 0;
  423. const RtlFieldInfo * const *fields = record->queryFields();
  424. assertex(fields);
  425. while (*fields++)
  426. count++;
  427. return count;
  428. }
  429. static bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  430. {
  431. if (cass_value_is_null(value))
  432. {
  433. NullFieldProcessor p(field);
  434. return p.boolResult;
  435. }
  436. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  437. typeError("boolean", value, field);
  438. cass_bool_t output;
  439. check(cass_value_get_bool(value, &output));
  440. return output != cass_false;
  441. }
  442. static void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  443. {
  444. if (cass_value_is_null(value))
  445. {
  446. NullFieldProcessor p(field);
  447. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  448. return;
  449. }
  450. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  451. // it seems like it could be more useful to support anything
  452. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  453. // typeError("blob", value, field);
  454. CassBytes bytes;
  455. check(cass_value_get_bytes(value, &bytes));
  456. rtlStrToDataX(chars, result, bytes.size, bytes.data);
  457. }
  458. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  459. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  460. static double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  461. {
  462. if (cass_value_is_null(value))
  463. {
  464. NullFieldProcessor p(field);
  465. return p.doubleResult;
  466. }
  467. else if (isInteger(value))
  468. return (double) getSignedResult(field, value);
  469. else switch (cass_value_type(value))
  470. {
  471. case CASS_VALUE_TYPE_FLOAT:
  472. {
  473. cass_float_t output_f;
  474. check(cass_value_get_float(value, &output_f));
  475. return output_f;
  476. }
  477. case CASS_VALUE_TYPE_DOUBLE:
  478. {
  479. cass_double_t output_d;
  480. check(cass_value_get_double(value, &output_d));
  481. return output_d;
  482. }
  483. default:
  484. typeError("double", value, field);
  485. }
  486. }
  487. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  488. {
  489. if (cass_value_is_null(value))
  490. {
  491. NullFieldProcessor p(field);
  492. return p.intResult;
  493. }
  494. switch (cass_value_type(value))
  495. {
  496. case CASS_VALUE_TYPE_INT:
  497. {
  498. cass_int32_t output;
  499. check(cass_value_get_int32(value, &output));
  500. return output;
  501. }
  502. case CASS_VALUE_TYPE_TIMESTAMP:
  503. case CASS_VALUE_TYPE_BIGINT:
  504. case CASS_VALUE_TYPE_COUNTER:
  505. case CASS_VALUE_TYPE_VARINT:
  506. {
  507. cass_int64_t output;
  508. check(cass_value_get_int64(value, &output));
  509. return output;
  510. }
  511. default:
  512. typeError("integer", value, field);
  513. }
  514. }
  515. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  516. {
  517. if (cass_value_is_null(value))
  518. {
  519. NullFieldProcessor p(field);
  520. return p.uintResult;
  521. }
  522. return (__uint64) getSignedResult(field, value);
  523. }
  524. static void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  525. {
  526. if (cass_value_is_null(value))
  527. {
  528. NullFieldProcessor p(field);
  529. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  530. return;
  531. }
  532. switch (cass_value_type(value))
  533. {
  534. case CASS_VALUE_TYPE_ASCII:
  535. {
  536. CassString output;
  537. check(cass_value_get_string(value, &output));
  538. const char *text = output.data;
  539. unsigned long bytes = output.length;
  540. rtlStrToStrX(chars, result, bytes, text);
  541. break;
  542. }
  543. case CASS_VALUE_TYPE_VARCHAR:
  544. case CASS_VALUE_TYPE_TEXT:
  545. {
  546. CassString output;
  547. check(cass_value_get_string(value, &output));
  548. const char *text = output.data;
  549. unsigned long bytes = output.length;
  550. unsigned numchars = rtlUtf8Length(bytes, text);
  551. rtlUtf8ToStrX(chars, result, numchars, text);
  552. break;
  553. }
  554. default:
  555. typeError("string", value, field);
  556. }
  557. }
  558. static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  559. {
  560. if (cass_value_is_null(value))
  561. {
  562. NullFieldProcessor p(field);
  563. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  564. return;
  565. }
  566. switch (cass_value_type(value))
  567. {
  568. case CASS_VALUE_TYPE_ASCII:
  569. {
  570. CassString output;
  571. check(cass_value_get_string(value, &output));
  572. const char *text = output.data;
  573. unsigned long bytes = output.length;
  574. rtlStrToUtf8X(chars, result, bytes, text);
  575. break;
  576. }
  577. case CASS_VALUE_TYPE_VARCHAR:
  578. case CASS_VALUE_TYPE_TEXT:
  579. {
  580. CassString output;
  581. check(cass_value_get_string(value, &output));
  582. const char *text = output.data;
  583. unsigned long bytes = output.length;
  584. unsigned numchars = rtlUtf8Length(bytes, text);
  585. rtlUtf8ToUtf8X(chars, result, numchars, text);
  586. break;
  587. }
  588. default:
  589. typeError("string", value, field);
  590. }
  591. }
  592. static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  593. {
  594. if (cass_value_is_null(value))
  595. {
  596. NullFieldProcessor p(field);
  597. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  598. return;
  599. }
  600. switch (cass_value_type(value))
  601. {
  602. case CASS_VALUE_TYPE_ASCII:
  603. {
  604. CassString output;
  605. check(cass_value_get_string(value, &output));
  606. const char *text = output.data;
  607. unsigned long bytes = output.length;
  608. rtlStrToUnicodeX(chars, result, bytes, text);
  609. break;
  610. }
  611. case CASS_VALUE_TYPE_VARCHAR:
  612. case CASS_VALUE_TYPE_TEXT:
  613. {
  614. CassString output;
  615. check(cass_value_get_string(value, &output));
  616. const char *text = output.data;
  617. unsigned long bytes = output.length;
  618. unsigned numchars = rtlUtf8Length(bytes, text);
  619. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  620. break;
  621. }
  622. default:
  623. typeError("string", value, field);
  624. }
  625. }
  626. static void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  627. {
  628. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  629. if (cass_value_is_null(value))
  630. {
  631. NullFieldProcessor p(field);
  632. result.set(p.decimalResult);
  633. return;
  634. }
  635. size32_t chars;
  636. rtlDataAttr tempStr;
  637. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  638. result.setString(chars, tempStr.getstr());
  639. if (field)
  640. {
  641. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  642. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  643. }
  644. }
  645. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  646. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  647. {
  648. public:
  649. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  650. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  651. {
  652. }
  653. virtual bool getBooleanResult(const RtlFieldInfo *field)
  654. {
  655. return cassandraembed::getBooleanResult(field, nextField(field));
  656. }
  657. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  658. {
  659. cassandraembed::getDataResult(field, nextField(field), len, result);
  660. }
  661. virtual double getRealResult(const RtlFieldInfo *field)
  662. {
  663. return cassandraembed::getRealResult(field, nextField(field));
  664. }
  665. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  666. {
  667. return cassandraembed::getSignedResult(field, nextField(field));
  668. }
  669. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  670. {
  671. return cassandraembed::getUnsignedResult(field, nextField(field));
  672. }
  673. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  674. {
  675. cassandraembed::getStringResult(field, nextField(field), chars, result);
  676. }
  677. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  678. {
  679. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  680. }
  681. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  682. {
  683. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  684. }
  685. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  686. {
  687. cassandraembed::getDecimalResult(field, nextField(field), value);
  688. }
  689. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  690. {
  691. isAll = false;
  692. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  693. }
  694. virtual bool processNextSet(const RtlFieldInfo * field)
  695. {
  696. numIteratorFields = 1;
  697. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  698. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  699. }
  700. virtual void processBeginDataset(const RtlFieldInfo * field)
  701. {
  702. numIteratorFields = getNumFields(field->type->queryChildType());
  703. switch (numIteratorFields)
  704. {
  705. case 1:
  706. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  707. break;
  708. case 2:
  709. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  710. break;
  711. default:
  712. UNSUPPORTED("Nested datasets with > 2 fields");
  713. }
  714. }
  715. virtual void processBeginRow(const RtlFieldInfo * field)
  716. {
  717. }
  718. virtual bool processNextRow(const RtlFieldInfo * field)
  719. {
  720. nextIteratedField = 0;
  721. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  722. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  723. }
  724. virtual void processEndSet(const RtlFieldInfo * field)
  725. {
  726. iterator.clear();
  727. numIteratorFields = 0;
  728. }
  729. virtual void processEndDataset(const RtlFieldInfo * field)
  730. {
  731. iterator.clear();
  732. numIteratorFields = 0;
  733. }
  734. virtual void processEndRow(const RtlFieldInfo * field)
  735. {
  736. }
  737. protected:
  738. const CassValue *nextField(const RtlFieldInfo * field)
  739. {
  740. const CassValue *ret;
  741. if (iterator)
  742. {
  743. switch (numIteratorFields)
  744. {
  745. case 1:
  746. ret = cass_iterator_get_value(*iterator);
  747. break;
  748. case 2:
  749. if (nextIteratedField==0)
  750. ret = cass_iterator_get_map_key(*iterator);
  751. else
  752. ret = cass_iterator_get_map_value(*iterator);
  753. nextIteratedField++;
  754. break;
  755. default:
  756. throwUnexpected();
  757. }
  758. }
  759. else
  760. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  761. if (!ret)
  762. failx("Too many fields in ECL output row, reading field %s", field->name->getAtomNamePtr());
  763. return ret;
  764. }
  765. const CassandraStatementInfo *stmtInfo;
  766. Owned<CassandraIterator> iterator;
  767. int colIdx;
  768. int numIteratorFields;
  769. int nextIteratedField;
  770. };
  771. // Bind Cassandra columns from an ECL record
  772. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  773. {
  774. public:
  775. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  776. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  777. {
  778. }
  779. int numFields()
  780. {
  781. int count = 0;
  782. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  783. assertex(fields);
  784. while (*fields++)
  785. count++;
  786. return count;
  787. }
  788. void processRow(const byte *row)
  789. {
  790. thisParam = firstParam;
  791. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  792. }
  793. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  794. {
  795. size32_t utf8chars;
  796. rtlDataAttr utfText;
  797. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  798. if (collection)
  799. checkBind(cass_collection_append_string(*collection,
  800. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  801. field);
  802. else
  803. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  804. checkNextParam(field),
  805. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  806. field);
  807. }
  808. virtual void processBool(bool value, const RtlFieldInfo * field)
  809. {
  810. if (collection)
  811. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  812. else
  813. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  814. }
  815. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  816. {
  817. if (collection)
  818. checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
  819. else
  820. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
  821. }
  822. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  823. {
  824. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  825. {
  826. if (collection)
  827. checkBind(cass_collection_append_int64(*collection, value), field);
  828. else
  829. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  830. }
  831. else
  832. {
  833. if (collection)
  834. checkBind(cass_collection_append_int32(*collection, value), field);
  835. else
  836. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  837. }
  838. }
  839. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  840. {
  841. UNSUPPORTED("UNSIGNED columns");
  842. }
  843. virtual void processReal(double value, const RtlFieldInfo * field)
  844. {
  845. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  846. {
  847. if (collection)
  848. checkBind(cass_collection_append_double(*collection, value), field);
  849. else
  850. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  851. }
  852. else
  853. {
  854. if (collection)
  855. checkBind(cass_collection_append_float(*collection, (float) value), field);
  856. else
  857. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  858. }
  859. }
  860. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  861. {
  862. Decimal val;
  863. size32_t bytes;
  864. rtlDataAttr decText;
  865. val.setDecimal(digits, precision, value);
  866. val.getStringX(bytes, decText.refstr());
  867. processUtf8(bytes, decText.getstr(), field);
  868. }
  869. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  870. {
  871. UNSUPPORTED("UNSIGNED decimals");
  872. }
  873. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  874. {
  875. size32_t utf8chars;
  876. rtlDataAttr utfText;
  877. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  878. if (collection)
  879. checkBind(cass_collection_append_string(*collection,
  880. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  881. field);
  882. else
  883. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  884. checkNextParam(field),
  885. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  886. field);
  887. }
  888. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  889. {
  890. size32_t charCount;
  891. rtlDataAttr text;
  892. rtlQStrToStrX(charCount, text.refstr(), len, value);
  893. processUtf8(charCount, text.getstr(), field);
  894. }
  895. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  896. {
  897. if (collection)
  898. checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  899. else
  900. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  901. }
  902. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  903. {
  904. if (isAll)
  905. UNSUPPORTED("SET(ALL)");
  906. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  907. return true;
  908. }
  909. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  910. {
  911. // If there's a single field, assume we are mapping to a SET/LIST
  912. // If there are two, assume it's a MAP
  913. // Otherwise, fail
  914. int numFields = getNumFields(field->type->queryChildType());
  915. if (numFields < 1 || numFields > 2)
  916. {
  917. UNSUPPORTED("Nested datasets with > 2 fields");
  918. }
  919. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  920. return true;
  921. }
  922. virtual bool processBeginRow(const RtlFieldInfo * field)
  923. {
  924. return true;
  925. }
  926. virtual void processEndSet(const RtlFieldInfo * field)
  927. {
  928. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  929. collection.clear();
  930. }
  931. virtual void processEndDataset(const RtlFieldInfo * field)
  932. {
  933. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  934. collection.clear();
  935. }
  936. virtual void processEndRow(const RtlFieldInfo * field)
  937. {
  938. }
  939. protected:
  940. inline unsigned checkNextParam(const RtlFieldInfo * field)
  941. {
  942. if (logctx.queryTraceLevel() > 4)
  943. logctx.CTXLOG("Binding %s to %d", field->name->str(), thisParam);
  944. return thisParam++;
  945. }
  946. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  947. {
  948. if (rc != CASS_OK)
  949. {
  950. failx("While binding parameter %s: %s", field->name->getAtomNamePtr(), cass_error_desc(rc));
  951. }
  952. }
  953. const RtlTypeInfo *typeInfo;
  954. const CassandraStatementInfo *stmtInfo;
  955. Owned<CassandraCollection> collection;
  956. const IContextLogger &logctx;
  957. int firstParam;
  958. RtlFieldStrInfo dummyField;
  959. int thisParam;
  960. };
  961. //
  962. class CassandraDatasetBinder : public CassandraRecordBinder
  963. {
  964. public:
  965. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  966. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  967. {
  968. }
  969. bool bindNext()
  970. {
  971. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  972. if (!nextRow)
  973. return false;
  974. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  975. return true;
  976. }
  977. void executeAll(CassandraStatementInfo *stmtInfo)
  978. {
  979. stmtInfo->startStream();
  980. while (bindNext())
  981. {
  982. stmtInfo->execute();
  983. }
  984. stmtInfo->endStream();
  985. }
  986. protected:
  987. Owned<IRowStream> input;
  988. };
  989. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  990. // interrogated to return each row of the result in turn
  991. class CassandraRowStream : public CInterfaceOf<IRowStream>
  992. {
  993. public:
  994. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  995. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  996. {
  997. executePending = true;
  998. eof = false;
  999. }
  1000. virtual const void *nextRow()
  1001. {
  1002. // A little complex when streaming data in as well as out - want to execute for every input record
  1003. if (eof)
  1004. return NULL;
  1005. loop
  1006. {
  1007. if (executePending)
  1008. {
  1009. executePending = false;
  1010. if (inputStream && !inputStream->bindNext())
  1011. {
  1012. noteEOF();
  1013. return NULL;
  1014. }
  1015. stmtInfo->execute();
  1016. }
  1017. if (stmtInfo->next())
  1018. break;
  1019. if (inputStream)
  1020. executePending = true;
  1021. else
  1022. {
  1023. noteEOF();
  1024. return NULL;
  1025. }
  1026. }
  1027. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1028. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1029. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1030. assertex(typeInfo);
  1031. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1032. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1033. return rowBuilder.finalizeRowClear(len);
  1034. }
  1035. virtual void stop()
  1036. {
  1037. resultAllocator.clear();
  1038. stmtInfo->stop();
  1039. }
  1040. protected:
  1041. void noteEOF()
  1042. {
  1043. if (!eof)
  1044. {
  1045. eof = true;
  1046. stop();
  1047. }
  1048. }
  1049. Linked<CassandraDatasetBinder> inputStream;
  1050. Linked<CassandraStatementInfo> stmtInfo;
  1051. Linked<IEngineRowAllocator> resultAllocator;
  1052. bool executePending;
  1053. bool eof;
  1054. };
  1055. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1056. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1057. {
  1058. public:
  1059. CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
  1060. : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
  1061. {
  1062. cluster.setown(new CassandraCluster(cass_cluster_new()));
  1063. const char *contact_points = "localhost";
  1064. const char *user = "";
  1065. const char *password = "";
  1066. const char *keyspace = "";
  1067. StringArray opts;
  1068. opts.appendList(options, ",");
  1069. ForEachItemIn(idx, opts)
  1070. {
  1071. const char *opt = opts.item(idx);
  1072. const char *val = strchr(opt, '=');
  1073. if (val)
  1074. {
  1075. StringBuffer optName(val-opt, opt);
  1076. val++;
  1077. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  1078. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  1079. else if (stricmp(optName, "user")==0)
  1080. user = val;
  1081. else if (stricmp(optName, "password")==0)
  1082. password = val;
  1083. else if (stricmp(optName, "keyspace")==0)
  1084. keyspace = val;
  1085. else if (stricmp(optName, "batch")==0)
  1086. {
  1087. if (stricmp(val, "LOGGED")==0)
  1088. batchMode = CASS_BATCH_TYPE_LOGGED;
  1089. else if (stricmp(val, "UNLOGGED")==0)
  1090. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  1091. else if (stricmp(val, "COUNTER")==0)
  1092. batchMode = CASS_BATCH_TYPE_COUNTER;
  1093. }
  1094. else if (stricmp(optName, "port")==0)
  1095. {
  1096. unsigned port = getUnsignedOption(val, "port");
  1097. checkSetOption(cass_cluster_set_port(*cluster, port), "port");
  1098. }
  1099. else if (stricmp(optName, "protocol_version")==0)
  1100. {
  1101. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  1102. checkSetOption(cass_cluster_set_protocol_version(*cluster, protocol_version), "protocol_version");
  1103. }
  1104. else if (stricmp(optName, "num_threads_io")==0)
  1105. {
  1106. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  1107. cass_cluster_set_num_threads_io(*cluster, num_threads_io); // No status return
  1108. }
  1109. else if (stricmp(optName, "queue_size_io")==0)
  1110. {
  1111. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  1112. checkSetOption(cass_cluster_set_queue_size_io(*cluster, queue_size_io), "queue_size_io");
  1113. }
  1114. else if (stricmp(optName, "core_connections_per_host")==0)
  1115. {
  1116. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  1117. checkSetOption(cass_cluster_set_core_connections_per_host(*cluster, core_connections_per_host), "core_connections_per_host");
  1118. }
  1119. else if (stricmp(optName, "max_connections_per_host")==0)
  1120. {
  1121. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  1122. checkSetOption(cass_cluster_set_max_connections_per_host(*cluster, max_connections_per_host), "max_connections_per_host");
  1123. }
  1124. else if (stricmp(optName, "max_concurrent_creation")==0)
  1125. {
  1126. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  1127. checkSetOption(cass_cluster_set_max_concurrent_creation(*cluster, max_concurrent_creation), "max_concurrent_creation");
  1128. }
  1129. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  1130. {
  1131. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  1132. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(*cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  1133. }
  1134. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  1135. {
  1136. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  1137. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(*cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  1138. }
  1139. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  1140. {
  1141. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  1142. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(*cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  1143. }
  1144. else if (stricmp(optName, "connect_timeout")==0)
  1145. {
  1146. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  1147. cass_cluster_set_connect_timeout(*cluster, connect_timeout);
  1148. }
  1149. else if (stricmp(optName, "request_timeout")==0)
  1150. {
  1151. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  1152. cass_cluster_set_request_timeout(*cluster, request_timeout);
  1153. }
  1154. else
  1155. failx("Unrecognized option %s", optName.str());
  1156. }
  1157. }
  1158. cass_cluster_set_contact_points(*cluster, contact_points);
  1159. if (*user || *password)
  1160. cass_cluster_set_credentials(*cluster, user, password);
  1161. session.setown(new CassandraSession(cass_session_new()));
  1162. CassandraFuture future(keyspace ? cass_session_connect_keyspace(*session, *cluster, keyspace) : cass_session_connect(*session, *cluster));
  1163. future.wait("connect");
  1164. }
  1165. virtual bool getBooleanResult()
  1166. {
  1167. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1168. checkSingleRow();
  1169. return ret;
  1170. }
  1171. virtual void getDataResult(size32_t &len, void * &result)
  1172. {
  1173. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1174. checkSingleRow();
  1175. }
  1176. virtual double getRealResult()
  1177. {
  1178. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1179. checkSingleRow();
  1180. return ret;
  1181. }
  1182. virtual __int64 getSignedResult()
  1183. {
  1184. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1185. checkSingleRow();
  1186. return ret;
  1187. }
  1188. virtual unsigned __int64 getUnsignedResult()
  1189. {
  1190. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1191. checkSingleRow();
  1192. return ret;
  1193. }
  1194. virtual void getStringResult(size32_t &chars, char * &result)
  1195. {
  1196. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1197. checkSingleRow();
  1198. }
  1199. virtual void getUTF8Result(size32_t &chars, char * &result)
  1200. {
  1201. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1202. checkSingleRow();
  1203. }
  1204. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1205. {
  1206. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1207. checkSingleRow();
  1208. }
  1209. virtual void getDecimalResult(Decimal &value)
  1210. {
  1211. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1212. checkSingleRow();
  1213. }
  1214. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1215. {
  1216. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1217. rtlRowBuilder out;
  1218. byte *outData = NULL;
  1219. size32_t outBytes = 0;
  1220. while (cass_iterator_next(iterator))
  1221. {
  1222. const CassValue *value = cass_iterator_get_value(iterator);
  1223. assertex(value);
  1224. if (elemSize != UNKNOWN_LENGTH)
  1225. {
  1226. out.ensureAvailable(outBytes + elemSize);
  1227. outData = out.getbytes() + outBytes;
  1228. }
  1229. switch ((type_t) elemType)
  1230. {
  1231. case type_int:
  1232. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1233. break;
  1234. case type_unsigned:
  1235. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1236. break;
  1237. case type_real:
  1238. if (elemSize == sizeof(double))
  1239. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1240. else
  1241. {
  1242. assertex(elemSize == sizeof(float));
  1243. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1244. }
  1245. break;
  1246. case type_boolean:
  1247. assertex(elemSize == sizeof(bool));
  1248. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1249. break;
  1250. case type_string:
  1251. case type_varstring:
  1252. {
  1253. rtlDataAttr str;
  1254. size32_t lenBytes;
  1255. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1256. if (elemSize == UNKNOWN_LENGTH)
  1257. {
  1258. if (elemType == type_string)
  1259. {
  1260. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1261. outData = out.getbytes() + outBytes;
  1262. * (size32_t *) outData = lenBytes;
  1263. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1264. outBytes += lenBytes + sizeof(size32_t);
  1265. }
  1266. else
  1267. {
  1268. out.ensureAvailable(outBytes + lenBytes + 1);
  1269. outData = out.getbytes() + outBytes;
  1270. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1271. outBytes += lenBytes + 1;
  1272. }
  1273. }
  1274. else
  1275. {
  1276. if (elemType == type_string)
  1277. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1278. else
  1279. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1280. }
  1281. break;
  1282. }
  1283. case type_unicode:
  1284. case type_utf8:
  1285. {
  1286. rtlDataAttr str;
  1287. size32_t lenChars;
  1288. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1289. const char * text = str.getstr();
  1290. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1291. if (elemType == type_utf8)
  1292. {
  1293. assertex (elemSize == UNKNOWN_LENGTH);
  1294. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1295. outData = out.getbytes() + outBytes;
  1296. * (size32_t *) outData = lenChars;
  1297. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1298. outBytes += lenBytes + sizeof(size32_t);
  1299. }
  1300. else
  1301. {
  1302. if (elemSize == UNKNOWN_LENGTH)
  1303. {
  1304. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1305. size32_t numchars16;
  1306. rtlDataAttr unicode16;
  1307. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1308. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1309. outData = out.getbytes() + outBytes;
  1310. * (size32_t *) outData = numchars16;
  1311. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1312. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1313. }
  1314. else
  1315. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1316. }
  1317. break;
  1318. }
  1319. default:
  1320. fail("type mismatch - unsupported return type");
  1321. }
  1322. if (elemSize != UNKNOWN_LENGTH)
  1323. outBytes += elemSize;
  1324. }
  1325. __isAllResult = false;
  1326. __resultBytes = outBytes;
  1327. __result = out.detachdata();
  1328. }
  1329. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1330. {
  1331. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1332. }
  1333. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1334. {
  1335. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1336. typeError("row", NULL, NULL);
  1337. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1338. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1339. stream.stop();
  1340. if (ret == NULL) // Check for exactly one returned row
  1341. typeError("row", NULL, NULL);
  1342. return (byte *) ret.getClear();
  1343. }
  1344. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1345. {
  1346. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1347. typeError("row", NULL, NULL);
  1348. if (!stmtInfo->next())
  1349. fail("Failed to read row");
  1350. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1351. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1352. assertex(typeInfo);
  1353. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1354. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1355. }
  1356. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1357. {
  1358. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1359. binder.processRow(val);
  1360. nextParam += binder.numFields();
  1361. }
  1362. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1363. {
  1364. // We only support a single dataset parameter...
  1365. // MORE - look into batch?
  1366. if (inputStream)
  1367. {
  1368. fail("At most one dataset parameter supported");
  1369. }
  1370. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1371. nextParam += inputStream->numFields();
  1372. }
  1373. virtual void bindBooleanParam(const char *name, bool val)
  1374. {
  1375. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1376. }
  1377. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1378. {
  1379. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), cass_bytes_init((const cass_byte_t*) val, len)), name);
  1380. }
  1381. virtual void bindFloatParam(const char *name, float val)
  1382. {
  1383. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1384. }
  1385. virtual void bindRealParam(const char *name, double val)
  1386. {
  1387. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1388. }
  1389. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1390. {
  1391. if (size > 4)
  1392. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1393. else
  1394. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1395. }
  1396. virtual void bindSignedParam(const char *name, __int64 val)
  1397. {
  1398. bindSignedSizeParam(name, 8, val);
  1399. }
  1400. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1401. {
  1402. UNSUPPORTED("UNSIGNED columns");
  1403. }
  1404. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1405. {
  1406. UNSUPPORTED("UNSIGNED columns");
  1407. }
  1408. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1409. {
  1410. size32_t utf8chars;
  1411. rtlDataAttr utfText;
  1412. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1413. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1414. checkNextParam(name),
  1415. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1416. name);
  1417. }
  1418. virtual void bindVStringParam(const char *name, const char *val)
  1419. {
  1420. bindStringParam(name, strlen(val), val);
  1421. }
  1422. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1423. {
  1424. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(name), cass_string_init2(val, rtlUtf8Size(chars, val))), name);
  1425. }
  1426. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1427. {
  1428. size32_t utf8chars;
  1429. rtlDataAttr utfText;
  1430. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1431. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1432. checkNextParam(name),
  1433. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1434. name);
  1435. }
  1436. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1437. {
  1438. if (isAll)
  1439. UNSUPPORTED("SET(ALL)");
  1440. type_t typecode = (type_t) elemType;
  1441. const byte *inData = (const byte *) setData;
  1442. const byte *endData = inData + totalBytes;
  1443. int numElems;
  1444. if (elemSize == UNKNOWN_LENGTH)
  1445. {
  1446. numElems = 0;
  1447. // Will need 2 passes to work out how many elements there are in the set :(
  1448. while (inData < endData)
  1449. {
  1450. int thisSize;
  1451. switch (elemType)
  1452. {
  1453. case type_varstring:
  1454. thisSize = strlen((const char *) inData) + 1;
  1455. break;
  1456. case type_string:
  1457. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1458. break;
  1459. case type_unicode:
  1460. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1461. break;
  1462. case type_utf8:
  1463. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1464. break;
  1465. default:
  1466. fail("Unsupported parameter type");
  1467. break;
  1468. }
  1469. inData += thisSize;
  1470. numElems++;
  1471. }
  1472. inData = (const byte *) setData;
  1473. }
  1474. else
  1475. numElems = totalBytes / elemSize;
  1476. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
  1477. while (inData < endData)
  1478. {
  1479. size32_t thisSize = elemSize;
  1480. CassError rc;
  1481. switch (typecode)
  1482. {
  1483. case type_int:
  1484. if (elemSize > 4)
  1485. rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
  1486. else
  1487. rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
  1488. break;
  1489. case type_unsigned:
  1490. UNSUPPORTED("UNSIGNED columns");
  1491. break;
  1492. case type_varstring:
  1493. {
  1494. size32_t numChars = strlen((const char *) inData);
  1495. if (elemSize == UNKNOWN_LENGTH)
  1496. thisSize = numChars + 1;
  1497. size32_t utf8chars;
  1498. rtlDataAttr utfText;
  1499. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1500. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1501. break;
  1502. }
  1503. case type_string:
  1504. {
  1505. if (elemSize == UNKNOWN_LENGTH)
  1506. {
  1507. thisSize = * (size32_t *) inData;
  1508. inData += sizeof(size32_t);
  1509. }
  1510. size32_t utf8chars;
  1511. rtlDataAttr utfText;
  1512. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1513. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1514. break;
  1515. }
  1516. case type_real:
  1517. if (elemSize == sizeof(double))
  1518. rc = cass_collection_append_double(collection, * (double *) inData);
  1519. else
  1520. rc = cass_collection_append_float(collection, * (float *) inData);
  1521. break;
  1522. case type_boolean:
  1523. assertex(elemSize == sizeof(bool));
  1524. rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
  1525. break;
  1526. case type_unicode:
  1527. {
  1528. if (elemSize == UNKNOWN_LENGTH)
  1529. {
  1530. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1531. inData += sizeof(size32_t);
  1532. }
  1533. unsigned unicodeChars;
  1534. rtlDataAttr unicode;
  1535. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1536. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1537. rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
  1538. break;
  1539. }
  1540. case type_utf8:
  1541. {
  1542. assertex (elemSize == UNKNOWN_LENGTH);
  1543. size32_t numChars = * (size32_t *) inData;
  1544. inData += sizeof(size32_t);
  1545. thisSize = rtlUtf8Size(numChars, inData);
  1546. rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
  1547. break;
  1548. }
  1549. case type_data:
  1550. if (elemSize == UNKNOWN_LENGTH)
  1551. {
  1552. thisSize = * (size32_t *) inData;
  1553. inData += sizeof(size32_t);
  1554. }
  1555. rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
  1556. break;
  1557. }
  1558. checkBind(rc, name);
  1559. inData += thisSize;
  1560. }
  1561. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1562. checkNextParam(name),
  1563. collection),
  1564. name);
  1565. }
  1566. virtual void importFunction(size32_t lenChars, const char *text)
  1567. {
  1568. throwUnexpected();
  1569. }
  1570. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1571. {
  1572. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1573. size32_t len = rtlUtf8Size(chars, _script);
  1574. queryString.set(_script, len);
  1575. const char *script = queryString.get(); // Now null terminated
  1576. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1577. {
  1578. loop
  1579. {
  1580. const char *nextScript = findUnquoted(script, ';');
  1581. if (!nextScript)
  1582. {
  1583. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1584. break;
  1585. }
  1586. CassandraStatement statement(cass_statement_new(cass_string_init2(script, nextScript-script), 0));
  1587. CassandraFuture future(cass_session_execute(*session, statement));
  1588. future.wait("execute statement");
  1589. script = nextScript;
  1590. }
  1591. }
  1592. else
  1593. {
  1594. // MORE - can cache this, perhaps, if script is same as last time?
  1595. CassandraFuture future(cass_session_prepare(*session, cass_string_init(script)));
  1596. future.wait("prepare statement");
  1597. Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
  1598. if ((flags & EFnoparams) == 0)
  1599. numParams = countBindings(script);
  1600. else
  1601. numParams = 0;
  1602. stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode));
  1603. }
  1604. }
  1605. virtual void callFunction()
  1606. {
  1607. // Does not seem to be a way to check number of parameters expected...
  1608. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1609. // fail("Not enough parameters");
  1610. try
  1611. {
  1612. if (stmtInfo && !stmtInfo->hasResult())
  1613. lazyExecute();
  1614. }
  1615. catch (IException *E)
  1616. {
  1617. StringBuffer msg;
  1618. E->errorMessage(msg);
  1619. msg.appendf(" (processing query %s)", queryString.get());
  1620. throw makeStringException(E->errorCode(), msg);
  1621. }
  1622. }
  1623. protected:
  1624. void lazyExecute()
  1625. {
  1626. if (inputStream)
  1627. inputStream->executeAll(stmtInfo);
  1628. else
  1629. stmtInfo->execute();
  1630. }
  1631. const CassValue *getScalarResult()
  1632. {
  1633. if (!stmtInfo->next())
  1634. typeError("scalar", NULL, NULL);
  1635. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1636. typeError("scalar", NULL, NULL);
  1637. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1638. if (!result)
  1639. typeError("scalar", NULL, NULL);
  1640. return result;
  1641. }
  1642. void checkSingleRow()
  1643. {
  1644. if (stmtInfo->rowCount() != 1)
  1645. typeError("scalar", NULL, NULL);
  1646. }
  1647. unsigned countBindings(const char *query)
  1648. {
  1649. unsigned queryCount = 0;
  1650. while ((query = findUnquoted(query, '?')) != NULL)
  1651. queryCount++;
  1652. return queryCount;
  1653. }
  1654. const char *findUnquoted(const char *query, char searchFor)
  1655. {
  1656. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1657. char inStr = '\0';
  1658. char ch;
  1659. while ((ch = *query++) != 0)
  1660. {
  1661. if (ch == inStr)
  1662. inStr = false;
  1663. else switch (ch)
  1664. {
  1665. case '\'':
  1666. case '"':
  1667. inStr = ch;
  1668. break;
  1669. case '\\':
  1670. if (inStr && *query)
  1671. query++;
  1672. break;
  1673. case '/':
  1674. if (!inStr)
  1675. {
  1676. if (*query=='/')
  1677. {
  1678. while (*query && *query != '\n')
  1679. query++;
  1680. }
  1681. else if (*query=='*')
  1682. {
  1683. query++;
  1684. loop
  1685. {
  1686. if (!*query)
  1687. fail("Unterminated comment in query string");
  1688. if (*query=='*' && query[1]=='/')
  1689. {
  1690. query+= 2;
  1691. break;
  1692. }
  1693. query++;
  1694. }
  1695. }
  1696. }
  1697. break;
  1698. default:
  1699. if (!inStr && ch==searchFor)
  1700. return query;
  1701. break;
  1702. }
  1703. }
  1704. return NULL;
  1705. }
  1706. inline unsigned checkNextParam(const char *name)
  1707. {
  1708. if (nextParam == numParams)
  1709. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1710. return nextParam++;
  1711. }
  1712. inline void checkBind(CassError rc, const char *name)
  1713. {
  1714. if (rc != CASS_OK)
  1715. {
  1716. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1717. }
  1718. }
  1719. inline void checkSetOption(CassError rc, const char *name)
  1720. {
  1721. if (rc != CASS_OK)
  1722. {
  1723. failx("While setting option %s: %s", name, cass_error_desc(rc));
  1724. }
  1725. }
  1726. unsigned getUnsignedOption(const char *val, const char *option)
  1727. {
  1728. char *endp;
  1729. long value = strtoul(val, &endp, 0);
  1730. if (endp==val || *endp != '\0' || value > INT_MAX || value < INT_MIN)
  1731. failx("Invalid value '%s' for option %s", val, option);
  1732. return (int) value;
  1733. }
  1734. Owned<CassandraCluster> cluster;
  1735. Owned<CassandraSession> session;
  1736. Owned<CassandraStatementInfo> stmtInfo;
  1737. Owned<CassandraDatasetBinder> inputStream;
  1738. const IContextLogger &logctx;
  1739. unsigned flags;
  1740. unsigned nextParam;
  1741. unsigned numParams;
  1742. CassBatchType batchMode;
  1743. StringAttr queryString;
  1744. };
  1745. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1746. {
  1747. public:
  1748. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1749. {
  1750. return createFunctionContextEx(NULL, flags, options);
  1751. }
  1752. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1753. {
  1754. if (flags & EFimport)
  1755. UNSUPPORTED("IMPORT");
  1756. else
  1757. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
  1758. }
  1759. };
  1760. extern IEmbedContext* getEmbedContext()
  1761. {
  1762. return new CassandraEmbedContext();
  1763. }
  1764. extern bool syntaxCheck(const char *script)
  1765. {
  1766. return true; // MORE
  1767. }
  1768. } // namespace