cassandraembed.cpp 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "cassandraembed.hpp"
  28. static const char * compatibleVersions[] = {
  29. "Cassandra Embed Helper 1.0.0",
  30. NULL };
  31. static const char *version = "Cassandra Embed Helper 1.0.0";
  32. extern "C" DECL_EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  33. {
  34. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  35. {
  36. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  37. pbx->compatibleVersions = compatibleVersions;
  38. }
  39. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  40. return false;
  41. pb->magicVersion = PLUGIN_VERSION;
  42. pb->version = version;
  43. pb->moduleName = "cassandra";
  44. pb->ECL = NULL;
  45. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  46. pb->description = "Cassandra Embed Helper";
  47. return true;
  48. }
  49. namespace cassandraembed {
  50. extern void UNSUPPORTED(const char *feature)
  51. {
  52. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  53. }
  54. static void logCallBack(const CassLogMessage *message, void *data)
  55. {
  56. DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
  57. }
  58. extern void failx(const char *message, ...)
  59. {
  60. va_list args;
  61. va_start(args,message);
  62. StringBuffer msg;
  63. msg.append("cassandra: ").valist_appendf(message,args);
  64. va_end(args);
  65. rtlFail(0, msg.str());
  66. }
  67. extern void fail(const char *message)
  68. {
  69. StringBuffer msg;
  70. msg.append("cassandra: ").append(message);
  71. rtlFail(0, msg.str());
  72. }
  73. void check(CassError rc)
  74. {
  75. if (rc != CASS_OK)
  76. {
  77. fail(cass_error_desc(rc));
  78. }
  79. }
  80. //use cassandra.h mapping macros so we stay in sync
  81. #define CASS_CONSISTENCY_HPCC_MAP_ENTRY(value, desc) \
  82. if (strieq(desc, name)) \
  83. return value;
  84. CassConsistency cass_consistency_from_string(const char *name)
  85. {
  86. CASS_CONSISTENCY_MAP(CASS_CONSISTENCY_HPCC_MAP_ENTRY)
  87. return CASS_CONSISTENCY_UNKNOWN;
  88. }
  89. // Wrappers to Cassandra structures that require corresponding releases
  90. void CassandraClusterSession::setOptions(const StringArray &options)
  91. {
  92. const char *contact_points = "localhost";
  93. const char *user = "";
  94. const char *password = "";
  95. StringBuffer epText;
  96. ForEachItemIn(idx, options)
  97. {
  98. const char *opt = options.item(idx);
  99. const char *val = strchr(opt, '=');
  100. if (val)
  101. {
  102. StringBuffer optName(val-opt, opt);
  103. val++;
  104. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  105. {
  106. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  107. if (contact_points[0]=='.')
  108. {
  109. SocketEndpoint ep(contact_points);
  110. ep.getIpText(epText.clear());
  111. contact_points = epText.str();
  112. }
  113. }
  114. else if (stricmp(optName, "user")==0)
  115. user = val;
  116. else if (stricmp(optName, "password")==0)
  117. password = val;
  118. else if (stricmp(optName, "keyspace")==0)
  119. keyspace.set(val);
  120. else if (stricmp(optName, "maxFutures")==0)
  121. {
  122. if (!semaphore)
  123. {
  124. maxFutures=getUnsignedOption(val, "maxFutures");
  125. if (maxFutures)
  126. semaphore = new Semaphore(maxFutures);
  127. }
  128. }
  129. else if (stricmp(optName, "maxRetries")==0)
  130. maxRetries=getUnsignedOption(val, "maxRetries");
  131. else if (stricmp(optName, "port")==0)
  132. {
  133. unsigned port = getUnsignedOption(val, "port");
  134. checkSetOption(cass_cluster_set_port(cluster, port), "port");
  135. }
  136. else if (stricmp(optName, "protocol_version")==0)
  137. {
  138. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  139. checkSetOption(cass_cluster_set_protocol_version(cluster, protocol_version), "protocol_version");
  140. }
  141. else if (stricmp(optName, "num_threads_io")==0)
  142. {
  143. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  144. cass_cluster_set_num_threads_io(cluster, num_threads_io); // No status return
  145. }
  146. else if (stricmp(optName, "queue_size_io")==0)
  147. {
  148. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  149. checkSetOption(cass_cluster_set_queue_size_io(cluster, queue_size_io), "queue_size_io");
  150. }
  151. else if (stricmp(optName, "core_connections_per_host")==0)
  152. {
  153. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  154. checkSetOption(cass_cluster_set_core_connections_per_host(cluster, core_connections_per_host), "core_connections_per_host");
  155. }
  156. else if (stricmp(optName, "max_connections_per_host")==0)
  157. {
  158. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  159. checkSetOption(cass_cluster_set_max_connections_per_host(cluster, max_connections_per_host), "max_connections_per_host");
  160. }
  161. else if (stricmp(optName, "max_concurrent_creation")==0)
  162. {
  163. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  164. checkSetOption(cass_cluster_set_max_concurrent_creation(cluster, max_concurrent_creation), "max_concurrent_creation");
  165. }
  166. else if (stricmp(optName, "write_bytes_high_water_mark")==0)
  167. {
  168. unsigned write_bytes_high_water_mark = getUnsignedOption(val, "write_bytes_high_water_mark");
  169. checkSetOption(cass_cluster_set_write_bytes_high_water_mark(cluster, write_bytes_high_water_mark), "write_bytes_high_water_mark");
  170. }
  171. else if (stricmp(optName, "write_bytes_low_water_mark")==0)
  172. {
  173. unsigned write_bytes_low_water_mark = getUnsignedOption(val, "write_bytes_low_water_mark");
  174. checkSetOption(cass_cluster_set_write_bytes_low_water_mark(cluster, write_bytes_low_water_mark), "write_bytes_low_water_mark");
  175. }
  176. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  177. {
  178. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  179. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  180. }
  181. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  182. {
  183. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  184. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  185. }
  186. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  187. {
  188. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  189. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  190. }
  191. else if (stricmp(optName, "connect_timeout")==0)
  192. {
  193. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  194. cass_cluster_set_connect_timeout(cluster, connect_timeout);
  195. }
  196. else if (stricmp(optName, "request_timeout")==0)
  197. {
  198. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  199. cass_cluster_set_request_timeout(cluster, request_timeout);
  200. }
  201. else if (stricmp(optName, "load_balance_round_robin")==0)
  202. {
  203. cass_bool_t enable = getBoolOption(val, "load_balance_round_robin");
  204. if (enable==cass_true)
  205. cass_cluster_set_load_balance_round_robin(cluster);
  206. }
  207. else if (stricmp(optName, "load_balance_dc_aware")==0)
  208. {
  209. StringArray lbargs;
  210. lbargs.appendList(val, "|");
  211. if (lbargs.length() != 3)
  212. failx("Invalid value '%s' for option %s - expected 3 subvalues (separate with |)", val, optName.str());
  213. unsigned usedPerRemote = getUnsignedOption(lbargs.item(2), "load_balance_dc_aware");
  214. cass_bool_t allowRemote = getBoolOption(lbargs.item(2), "load_balance_dc_aware");
  215. checkSetOption(cass_cluster_set_load_balance_dc_aware(cluster, lbargs.item(0), usedPerRemote, allowRemote), "load_balance_dc_aware");
  216. }
  217. else if (stricmp(optName, "token_aware_routing")==0)
  218. {
  219. cass_bool_t enable = getBoolOption(val, "token_aware_routing");
  220. cass_cluster_set_token_aware_routing(cluster, enable);
  221. }
  222. else if (stricmp(optName, "latency_aware_routing")==0)
  223. {
  224. cass_bool_t enable = getBoolOption(val, "latency_aware_routing");
  225. cass_cluster_set_latency_aware_routing(cluster, enable);
  226. }
  227. else if (stricmp(optName, "latency_aware_routing_settings")==0)
  228. {
  229. StringArray subargs;
  230. subargs.appendList(val, "|");
  231. if (subargs.length() != 5)
  232. failx("Invalid value '%s' for option %s - expected 5 subvalues (separate with |)", val, optName.str());
  233. cass_double_t exclusion_threshold = getDoubleOption(subargs.item(0), "exclusion_threshold");
  234. cass_uint64_t scale_ms = getUnsigned64Option(subargs.item(1), "scale_ms");
  235. cass_uint64_t retry_period_ms = getUnsigned64Option(subargs.item(2), "retry_period_ms");
  236. cass_uint64_t update_rate_ms = getUnsigned64Option(subargs.item(3), "update_rate_ms");
  237. cass_uint64_t min_measured = getUnsigned64Option(subargs.item(4), "min_measured");
  238. cass_cluster_set_latency_aware_routing_settings(cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
  239. }
  240. else if (stricmp(optName, "tcp_nodelay")==0)
  241. {
  242. cass_bool_t enable = getBoolOption(val, "tcp_nodelay");
  243. cass_cluster_set_tcp_nodelay(cluster, enable);
  244. }
  245. else if (stricmp(optName, "tcp_keepalive")==0)
  246. {
  247. StringArray subargs;
  248. subargs.appendList(val, "|");
  249. if (subargs.length() != 2)
  250. failx("Invalid value '%s' for option %s - expected 2 subvalues (separate with |)", val, optName.str());
  251. cass_bool_t enabled = getBoolOption(subargs.item(0), "enabled");
  252. unsigned delay_secs = getUnsignedOption(subargs.item(0), "delay_secs");
  253. cass_cluster_set_tcp_keepalive(cluster, enabled, delay_secs);
  254. }
  255. else if (strieq(optName, "consistency"))
  256. {
  257. CassConsistency optConsistency = cass_consistency_from_string(val);
  258. if (optConsistency == CASS_CONSISTENCY_UNKNOWN)
  259. failx("Unrecognized cassandra consistency value '%s'", val);
  260. checkSetOption(cass_cluster_set_consistency(cluster, optConsistency), "consistency");
  261. }
  262. else
  263. failx("Unrecognized option %s", optName.str());
  264. }
  265. }
  266. cass_cluster_set_contact_points(cluster, contact_points);
  267. if (*user || *password)
  268. cass_cluster_set_credentials(cluster, user, password);
  269. }
  270. void CassandraClusterSession::checkSetOption(CassError rc, const char *name)
  271. {
  272. if (rc != CASS_OK)
  273. {
  274. failx("While setting option %s: %s", name, cass_error_desc(rc));
  275. }
  276. }
  277. cass_bool_t CassandraClusterSession::getBoolOption(const char *val, const char *option)
  278. {
  279. return strToBool(val) ? cass_true : cass_false;
  280. }
  281. unsigned CassandraClusterSession::getUnsignedOption(const char *val, const char *option)
  282. {
  283. char *endp;
  284. long value = strtoul(val, &endp, 0);
  285. if (endp==val || *endp != '\0' || value > UINT_MAX || value < 0)
  286. failx("Invalid value '%s' for option %s", val, option);
  287. return (unsigned) value;
  288. }
  289. unsigned CassandraClusterSession::getDoubleOption(const char *val, const char *option)
  290. {
  291. char *endp;
  292. double value = strtod(val, &endp);
  293. if (endp==val || *endp != '\0')
  294. failx("Invalid value '%s' for option %s", val, option);
  295. return value;
  296. }
  297. __uint64 CassandraClusterSession::getUnsigned64Option(const char *val, const char *option)
  298. {
  299. // MORE - could check it's all digits (with optional leading spaces...), if we cared.
  300. return rtlVStrToUInt8(val);
  301. }
  302. void CassandraClusterSession::connect()
  303. {
  304. assertex(cluster && !session);
  305. session.setown(new CassandraSession(cass_session_new()));
  306. CassandraFuture future(keyspace.isEmpty() ? cass_session_connect(*session, cluster) : cass_session_connect_keyspace(*session, cluster, keyspace));
  307. future.wait("connect");
  308. }
  309. void CassandraClusterSession::disconnect()
  310. {
  311. session.clear();
  312. }
  313. CassandraPrepared *CassandraClusterSession::prepareStatement(const char *query, bool trace) const
  314. {
  315. assertex(session);
  316. CriticalBlock b(cacheCrit);
  317. Linked<CassandraPrepared> cached = preparedCache.getValue(query);
  318. if (cached)
  319. return cached.getClear();
  320. {
  321. // We don't want to block cache lookups while we prepare a new bound statement
  322. // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
  323. CriticalUnblock b(cacheCrit);
  324. CassandraFuture futurePrep(cass_session_prepare(*session, query));
  325. futurePrep.wait("prepare statement");
  326. cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep), trace ? query : NULL));
  327. }
  328. preparedCache.setValue(query, cached); // NOTE - this links parameter
  329. return cached.getClear();
  330. }
  331. CassandraStatementInfo *CassandraClusterSession::createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const
  332. {
  333. Owned<CassandraPrepared> prepared = prepareStatement(script, false); // We could make tracing selectable
  334. return new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize, semaphore, maxRetries);
  335. }
  336. void CassandraClusterSession::executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const
  337. {
  338. CIArrayOf<CassandraRetryingFuture> futures;
  339. ForEachItemIn(idx, batch)
  340. futures.append(*new CassandraRetryingFuture(*session, batch.item(idx).getClear(), semaphore, maxRetries));
  341. ForEachItemIn(idx2, futures)
  342. futures.item(idx2).wait(what);
  343. }
  344. typedef CassandraClusterSession *CassandraClusterSessionPtr;
  345. typedef MapBetween<hash64_t, hash64_t, CassandraClusterSessionPtr, CassandraClusterSessionPtr> ClusterSessionMap;
  346. static CriticalSection clusterCacheCrit;
  347. static ClusterSessionMap cachedSessions;
  348. CassandraClusterSession *lookupCachedSession(hash64_t hash, const StringArray &opts)
  349. {
  350. Owned<CassandraClusterSession> cluster;
  351. CassandraClusterSessionPtr *found = cachedSessions.getValue(hash);
  352. if (found)
  353. cluster.set(*found);
  354. if (!cluster)
  355. {
  356. cluster.setown(new CassandraClusterSession(cass_cluster_new()));
  357. cluster->setOptions(opts);
  358. cluster->connect();
  359. cachedSessions.setValue(hash, cluster.getLink());
  360. }
  361. return cluster.getClear();
  362. }
  363. MODULE_INIT(INIT_PRIORITY_STANDARD)
  364. {
  365. cass_log_set_callback(logCallBack, NULL);
  366. cass_log_set_level(CASS_LOG_WARN);
  367. return true;
  368. }
  369. MODULE_EXIT()
  370. {
  371. HashIterator i(cachedSessions);
  372. ForEach(i)
  373. {
  374. CassandraClusterSession *session = *cachedSessions.mapToValue(&i.query());
  375. ::Release(session);
  376. }
  377. }
  378. //------------------
  379. void CassandraFuture::wait(const char *why) const
  380. {
  381. cass_future_wait(future);
  382. CassError rc = cass_future_error_code(future);
  383. if(rc != CASS_OK)
  384. {
  385. const char *message;
  386. size_t length;
  387. cass_future_error_message(future, &message, &length);
  388. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
  389. #ifdef _DEBUG
  390. DBGLOG("%s", err.str());
  391. #endif
  392. rtlFail(0, err.str());
  393. }
  394. }
  395. void CassandraSession::set(CassSession *_session)
  396. {
  397. if (session)
  398. {
  399. CassandraFuture close_future(cass_session_close(session));
  400. cass_future_wait(close_future);
  401. cass_session_free(session);
  402. }
  403. session = _session;
  404. }
  405. //----------------------
  406. CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries)
  407. : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
  408. {
  409. execute();
  410. }
  411. CassandraRetryingFuture::~CassandraRetryingFuture()
  412. {
  413. if (future)
  414. cass_future_free(future);
  415. }
  416. void CassandraRetryingFuture::wait(const char *why)
  417. {
  418. cass_future_wait(future);
  419. CassError rc = cass_future_error_code(future);
  420. if(rc != CASS_OK)
  421. {
  422. switch (rc)
  423. {
  424. case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: // MORE - are there others we should retry?
  425. if (retry(why))
  426. break;
  427. // fall into
  428. default:
  429. const char *message;
  430. size_t length;
  431. cass_future_error_message(future, &message, &length);
  432. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
  433. rtlFail(0, err.str());
  434. }
  435. }
  436. }
  437. bool CassandraRetryingFuture::retry(const char *why)
  438. {
  439. for (int i = 0; i < retries; i++)
  440. {
  441. execute();
  442. cass_future_wait(future);
  443. CassError rc = cass_future_error_code(future);
  444. if (rc == CASS_OK)
  445. return true;
  446. Sleep(10);
  447. }
  448. return false;
  449. }
  450. void CassandraRetryingFuture::execute()
  451. {
  452. if (limiter)
  453. limiter->wait();
  454. future = cass_session_execute(session, statement);
  455. if (limiter)
  456. cass_future_set_callback(future, signaller, limiter); // Note - this will call the callback if the future has already completed
  457. }
  458. void CassandraRetryingFuture::signaller(CassFuture *future, void *data)
  459. {
  460. Semaphore *sem = (Semaphore *) data;
  461. sem->signal();
  462. }
  463. //----------------------
  464. CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries)
  465. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(_semaphore), maxRetries(_maxRetries)
  466. {
  467. assertex(prepared && *prepared);
  468. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  469. if (pageSize)
  470. cass_statement_set_paging_size(*statement, pageSize);
  471. inBatch = false;
  472. }
  473. CassandraStatementInfo::~CassandraStatementInfo()
  474. {
  475. stop();
  476. futures.kill();
  477. }
  478. void CassandraStatementInfo::stop()
  479. {
  480. iterator.clear();
  481. result.clear();
  482. prepared.clear();
  483. }
  484. bool CassandraStatementInfo::next()
  485. {
  486. for (;;)
  487. {
  488. if (!iterator)
  489. {
  490. if (result)
  491. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  492. else
  493. return false;
  494. }
  495. if (cass_iterator_next(*iterator))
  496. return true;
  497. iterator.clear();
  498. if (!cass_result_has_more_pages(*result))
  499. {
  500. result.clear();
  501. break;
  502. }
  503. cass_statement_set_paging_state(*statement, *result);
  504. result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
  505. }
  506. return false;
  507. }
  508. void CassandraStatementInfo::startStream()
  509. {
  510. if (batchMode != (CassBatchType) -1)
  511. batch.setown(new CassandraBatch(batchMode));
  512. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  513. inBatch = true;
  514. }
  515. void CassandraStatementInfo::endStream()
  516. {
  517. if (batch)
  518. {
  519. result.setown(new CassandraFutureResult (cass_session_execute_batch(*session, *batch)));
  520. assertex (rowCount() == 0);
  521. }
  522. else
  523. {
  524. ForEachItemIn(idx, futures)
  525. {
  526. futures.item(idx).wait("endStream");
  527. }
  528. }
  529. }
  530. void CassandraStatementInfo::execute()
  531. {
  532. assertex(statement && *statement);
  533. if (batch)
  534. {
  535. check(cass_batch_add_statement(*batch, *statement));
  536. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  537. }
  538. else if (inBatch)
  539. {
  540. futures.append(*new CassandraRetryingFuture(*session, statement->getClear(), semaphore, maxRetries));
  541. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  542. }
  543. else
  544. {
  545. result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
  546. }
  547. }
  548. // Conversions from Cassandra values to ECL data
  549. static const char *getTypeName(CassValueType type)
  550. {
  551. switch (type)
  552. {
  553. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  554. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  555. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  556. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  557. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  558. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  559. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  560. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  561. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  562. case CASS_VALUE_TYPE_INT: return "INT";
  563. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  564. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  565. case CASS_VALUE_TYPE_UUID: return "UUID";
  566. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  567. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  568. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  569. case CASS_VALUE_TYPE_INET: return "INET";
  570. case CASS_VALUE_TYPE_LIST: return "LIST";
  571. case CASS_VALUE_TYPE_MAP: return "MAP";
  572. case CASS_VALUE_TYPE_SET: return "SET";
  573. default: return "UNKNOWN";
  574. }
  575. }
  576. __declspec(noreturn) static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  577. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  578. {
  579. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  580. if (field)
  581. msg.appendf(" for field %s", field->name);
  582. if (value)
  583. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  584. rtlFail(0, msg.str());
  585. }
  586. extern bool isInteger(const CassValueType t)
  587. {
  588. switch (t)
  589. {
  590. case CASS_VALUE_TYPE_TIMESTAMP:
  591. case CASS_VALUE_TYPE_INT:
  592. case CASS_VALUE_TYPE_BIGINT:
  593. case CASS_VALUE_TYPE_COUNTER:
  594. case CASS_VALUE_TYPE_VARINT:
  595. return true;
  596. default:
  597. return false;
  598. }
  599. }
  600. extern bool isString(CassValueType t)
  601. {
  602. switch (t)
  603. {
  604. case CASS_VALUE_TYPE_VARCHAR:
  605. case CASS_VALUE_TYPE_TEXT:
  606. case CASS_VALUE_TYPE_ASCII:
  607. return true;
  608. default:
  609. return false;
  610. }
  611. }
  612. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  613. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  614. {
  615. const RtlTypeInfo *type = field->type;
  616. if ((type->fieldType & RFTMkind) == type_set)
  617. return type->queryChildType();
  618. else
  619. return type;
  620. }
  621. static int getNumFields(const RtlTypeInfo *record)
  622. {
  623. int count = 0;
  624. const RtlFieldInfo * const *fields = record->queryFields();
  625. assertex(fields);
  626. while (*fields++)
  627. count++;
  628. return count;
  629. }
  630. extern bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  631. {
  632. if (cass_value_is_null(value))
  633. {
  634. NullFieldProcessor p(field);
  635. return p.boolResult;
  636. }
  637. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  638. typeError("boolean", value, field);
  639. cass_bool_t output;
  640. check(cass_value_get_bool(value, &output));
  641. return output != cass_false;
  642. }
  643. extern void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  644. {
  645. if (cass_value_is_null(value))
  646. {
  647. NullFieldProcessor p(field);
  648. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  649. return;
  650. }
  651. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  652. // it seems like it could be more useful to support anything
  653. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  654. // typeError("blob", value, field);
  655. const cass_byte_t *bytes;
  656. size_t size;
  657. check(cass_value_get_bytes(value, &bytes, &size));
  658. rtlStrToDataX(chars, result, size, bytes);
  659. }
  660. extern double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  661. {
  662. if (cass_value_is_null(value))
  663. {
  664. NullFieldProcessor p(field);
  665. return p.doubleResult;
  666. }
  667. else if (isInteger(cass_value_type(value)))
  668. return (double) getSignedResult(field, value);
  669. else switch (cass_value_type(value))
  670. {
  671. case CASS_VALUE_TYPE_FLOAT:
  672. {
  673. cass_float_t output_f;
  674. check(cass_value_get_float(value, &output_f));
  675. return output_f;
  676. }
  677. case CASS_VALUE_TYPE_DOUBLE:
  678. {
  679. cass_double_t output_d;
  680. check(cass_value_get_double(value, &output_d));
  681. return output_d;
  682. }
  683. default:
  684. typeError("double", value, field);
  685. }
  686. }
  687. extern __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  688. {
  689. if (cass_value_is_null(value))
  690. {
  691. NullFieldProcessor p(field);
  692. return p.intResult;
  693. }
  694. switch (cass_value_type(value))
  695. {
  696. case CASS_VALUE_TYPE_INT:
  697. {
  698. cass_int32_t output;
  699. check(cass_value_get_int32(value, &output));
  700. return output;
  701. }
  702. case CASS_VALUE_TYPE_TIMESTAMP:
  703. case CASS_VALUE_TYPE_BIGINT:
  704. case CASS_VALUE_TYPE_COUNTER:
  705. case CASS_VALUE_TYPE_VARINT:
  706. {
  707. cass_int64_t output;
  708. check(cass_value_get_int64(value, &output));
  709. return output;
  710. }
  711. default:
  712. typeError("integer", value, field);
  713. }
  714. }
  715. extern unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  716. {
  717. if (cass_value_is_null(value))
  718. {
  719. NullFieldProcessor p(field);
  720. return p.uintResult;
  721. }
  722. return (__uint64) getSignedResult(field, value);
  723. }
  724. extern void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  725. {
  726. if (cass_value_is_null(value))
  727. {
  728. NullFieldProcessor p(field);
  729. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  730. return;
  731. }
  732. switch (cass_value_type(value))
  733. {
  734. case CASS_VALUE_TYPE_ASCII:
  735. {
  736. const char *output;
  737. size_t length;
  738. check(cass_value_get_string(value, &output, &length));
  739. rtlStrToStrX(chars, result, length, output);
  740. break;
  741. }
  742. case CASS_VALUE_TYPE_VARCHAR:
  743. case CASS_VALUE_TYPE_TEXT:
  744. {
  745. const char *output;
  746. size_t length;
  747. check(cass_value_get_string(value, &output, &length));
  748. unsigned numchars = rtlUtf8Length(length, output);
  749. rtlUtf8ToStrX(chars, result, numchars, output);
  750. break;
  751. }
  752. default:
  753. typeError("string", value, field);
  754. }
  755. }
  756. extern void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  757. {
  758. if (cass_value_is_null(value))
  759. {
  760. NullFieldProcessor p(field);
  761. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  762. return;
  763. }
  764. switch (cass_value_type(value))
  765. {
  766. case CASS_VALUE_TYPE_ASCII:
  767. {
  768. const char *output;
  769. size_t length;
  770. check(cass_value_get_string(value, &output, &length));
  771. rtlStrToUtf8X(chars, result, length, output);
  772. break;
  773. }
  774. case CASS_VALUE_TYPE_VARCHAR:
  775. case CASS_VALUE_TYPE_TEXT:
  776. {
  777. const char * output;
  778. size_t length;
  779. check(cass_value_get_string(value, &output, &length));
  780. unsigned numchars = rtlUtf8Length(length, output);
  781. rtlUtf8ToUtf8X(chars, result, numchars, output);
  782. break;
  783. }
  784. default:
  785. typeError("string", value, field);
  786. }
  787. }
  788. extern void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  789. {
  790. if (cass_value_is_null(value))
  791. {
  792. NullFieldProcessor p(field);
  793. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  794. return;
  795. }
  796. switch (cass_value_type(value))
  797. {
  798. case CASS_VALUE_TYPE_ASCII:
  799. {
  800. const char * output;
  801. size_t length;
  802. check(cass_value_get_string(value, &output, &length));
  803. rtlStrToUnicodeX(chars, result, length, output);
  804. break;
  805. }
  806. case CASS_VALUE_TYPE_VARCHAR:
  807. case CASS_VALUE_TYPE_TEXT:
  808. {
  809. const char * output;
  810. size_t length;
  811. check(cass_value_get_string(value, &output, &length));
  812. unsigned numchars = rtlUtf8Length(length, output);
  813. rtlUtf8ToUnicodeX(chars, result, numchars, output);
  814. break;
  815. }
  816. default:
  817. typeError("string", value, field);
  818. }
  819. }
  820. extern void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  821. {
  822. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  823. if (cass_value_is_null(value))
  824. {
  825. NullFieldProcessor p(field);
  826. result.set(p.decimalResult);
  827. return;
  828. }
  829. size32_t chars;
  830. rtlDataAttr tempStr;
  831. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  832. result.setString(chars, tempStr.getstr());
  833. if (field)
  834. {
  835. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  836. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  837. }
  838. }
  839. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  840. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  841. {
  842. public:
  843. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  844. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  845. {
  846. }
  847. virtual bool getBooleanResult(const RtlFieldInfo *field)
  848. {
  849. return cassandraembed::getBooleanResult(field, nextField(field));
  850. }
  851. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  852. {
  853. cassandraembed::getDataResult(field, nextField(field), len, result);
  854. }
  855. virtual double getRealResult(const RtlFieldInfo *field)
  856. {
  857. return cassandraembed::getRealResult(field, nextField(field));
  858. }
  859. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  860. {
  861. return cassandraembed::getSignedResult(field, nextField(field));
  862. }
  863. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  864. {
  865. return cassandraembed::getUnsignedResult(field, nextField(field));
  866. }
  867. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  868. {
  869. cassandraembed::getStringResult(field, nextField(field), chars, result);
  870. }
  871. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  872. {
  873. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  874. }
  875. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  876. {
  877. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  878. }
  879. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  880. {
  881. cassandraembed::getDecimalResult(field, nextField(field), value);
  882. }
  883. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  884. {
  885. isAll = false;
  886. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  887. }
  888. virtual bool processNextSet(const RtlFieldInfo * field)
  889. {
  890. numIteratorFields = 1;
  891. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  892. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  893. }
  894. virtual void processBeginDataset(const RtlFieldInfo * field)
  895. {
  896. numIteratorFields = getNumFields(field->type->queryChildType());
  897. switch (numIteratorFields)
  898. {
  899. case 1:
  900. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  901. break;
  902. case 2:
  903. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  904. break;
  905. default:
  906. UNSUPPORTED("Nested datasets with > 2 fields");
  907. }
  908. }
  909. virtual void processBeginRow(const RtlFieldInfo * field)
  910. {
  911. }
  912. virtual bool processNextRow(const RtlFieldInfo * field)
  913. {
  914. nextIteratedField = 0;
  915. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  916. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  917. }
  918. virtual void processEndSet(const RtlFieldInfo * field)
  919. {
  920. iterator.clear();
  921. numIteratorFields = 0;
  922. }
  923. virtual void processEndDataset(const RtlFieldInfo * field)
  924. {
  925. iterator.clear();
  926. numIteratorFields = 0;
  927. }
  928. virtual void processEndRow(const RtlFieldInfo * field)
  929. {
  930. }
  931. protected:
  932. const CassValue *nextField(const RtlFieldInfo * field)
  933. {
  934. const CassValue *ret;
  935. if (iterator)
  936. {
  937. switch (numIteratorFields)
  938. {
  939. case 1:
  940. ret = cass_iterator_get_value(*iterator);
  941. break;
  942. case 2:
  943. if (nextIteratedField==0)
  944. ret = cass_iterator_get_map_key(*iterator);
  945. else
  946. ret = cass_iterator_get_map_value(*iterator);
  947. nextIteratedField++;
  948. break;
  949. default:
  950. throwUnexpected();
  951. }
  952. }
  953. else
  954. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  955. if (!ret)
  956. failx("Too many fields in ECL output row, reading field %s", field->name);
  957. return ret;
  958. }
  959. const CassandraStatementInfo *stmtInfo;
  960. Owned<CassandraIterator> iterator;
  961. int colIdx;
  962. int numIteratorFields;
  963. int nextIteratedField;
  964. };
  965. // Bind Cassandra columns from an ECL record
  966. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  967. {
  968. public:
  969. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  970. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  971. {
  972. }
  973. int numFields()
  974. {
  975. int count = 0;
  976. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  977. assertex(fields);
  978. while (*fields++)
  979. count++;
  980. return count;
  981. }
  982. void processRow(const byte *row)
  983. {
  984. thisParam = firstParam;
  985. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  986. }
  987. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  988. {
  989. size32_t utf8chars;
  990. rtlDataAttr utfText;
  991. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  992. if (collection)
  993. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  994. field);
  995. else
  996. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  997. checkNextParam(field),
  998. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  999. field);
  1000. }
  1001. virtual void processBool(bool value, const RtlFieldInfo * field)
  1002. {
  1003. if (collection)
  1004. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  1005. else
  1006. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  1007. }
  1008. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  1009. {
  1010. if (collection)
  1011. checkBind(cass_collection_append_bytes(*collection, (const cass_byte_t*) value, len), field);
  1012. else
  1013. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), (const cass_byte_t*) value, len), field);
  1014. }
  1015. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  1016. {
  1017. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  1018. {
  1019. if (collection)
  1020. checkBind(cass_collection_append_int64(*collection, value), field);
  1021. else
  1022. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1023. }
  1024. else
  1025. {
  1026. if (collection)
  1027. checkBind(cass_collection_append_int32(*collection, value), field);
  1028. else
  1029. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1030. }
  1031. }
  1032. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  1033. {
  1034. UNSUPPORTED("UNSIGNED columns");
  1035. }
  1036. virtual void processReal(double value, const RtlFieldInfo * field)
  1037. {
  1038. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  1039. {
  1040. if (collection)
  1041. checkBind(cass_collection_append_double(*collection, value), field);
  1042. else
  1043. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1044. }
  1045. else
  1046. {
  1047. if (collection)
  1048. checkBind(cass_collection_append_float(*collection, (float) value), field);
  1049. else
  1050. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  1051. }
  1052. }
  1053. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1054. {
  1055. Decimal val;
  1056. size32_t bytes;
  1057. rtlDataAttr decText;
  1058. val.setDecimal(digits, precision, value);
  1059. val.getStringX(bytes, decText.refstr());
  1060. processUtf8(bytes, decText.getstr(), field);
  1061. }
  1062. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1063. {
  1064. UNSUPPORTED("UNSIGNED decimals");
  1065. }
  1066. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  1067. {
  1068. size32_t utf8chars;
  1069. rtlDataAttr utfText;
  1070. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  1071. if (collection)
  1072. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1073. field);
  1074. else
  1075. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1076. checkNextParam(field),
  1077. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1078. field);
  1079. }
  1080. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1081. {
  1082. size32_t charCount;
  1083. rtlDataAttr text;
  1084. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1085. processUtf8(charCount, text.getstr(), field);
  1086. }
  1087. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  1088. {
  1089. if (collection)
  1090. checkBind(cass_collection_append_string_n(*collection, value, rtlUtf8Size(chars, value)), field);
  1091. else
  1092. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(field), value, rtlUtf8Size(chars, value)), field);
  1093. }
  1094. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1095. {
  1096. if (isAll)
  1097. UNSUPPORTED("SET(ALL)");
  1098. // We don't know whether the corresponding field in Cassandra is a list or a set. Try binding a dummy list to tell which.
  1099. CassandraCollection temp(cass_collection_new(CASS_COLLECTION_TYPE_LIST, 0));
  1100. if (cass_statement_bind_collection(stmtInfo->queryStatement(), thisParam, temp) == CASS_OK)
  1101. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numElements)));
  1102. else
  1103. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  1104. return true;
  1105. }
  1106. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1107. {
  1108. // If there's a single field, assume we are mapping to a SET/LIST
  1109. // If there are two, assume it's a MAP
  1110. // Otherwise, fail
  1111. int numFields = getNumFields(field->type->queryChildType());
  1112. if (numFields < 1 || numFields > 2)
  1113. {
  1114. UNSUPPORTED("Nested datasets with > 2 fields");
  1115. }
  1116. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  1117. return true;
  1118. }
  1119. virtual bool processBeginRow(const RtlFieldInfo * field)
  1120. {
  1121. return true;
  1122. }
  1123. virtual void processEndSet(const RtlFieldInfo * field)
  1124. {
  1125. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1126. collection.clear();
  1127. }
  1128. virtual void processEndDataset(const RtlFieldInfo * field)
  1129. {
  1130. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1131. collection.clear();
  1132. }
  1133. virtual void processEndRow(const RtlFieldInfo * field)
  1134. {
  1135. }
  1136. protected:
  1137. inline unsigned checkNextParam(const RtlFieldInfo * field)
  1138. {
  1139. if (logctx.queryTraceLevel() > 4)
  1140. logctx.CTXLOG("Binding %s to %d", field->name, thisParam);
  1141. return thisParam++;
  1142. }
  1143. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  1144. {
  1145. if (rc != CASS_OK)
  1146. {
  1147. failx("While binding parameter %s: %s", field->name, cass_error_desc(rc));
  1148. }
  1149. }
  1150. const RtlTypeInfo *typeInfo;
  1151. const CassandraStatementInfo *stmtInfo;
  1152. Owned<CassandraCollection> collection;
  1153. const IContextLogger &logctx;
  1154. int firstParam;
  1155. RtlFieldStrInfo dummyField;
  1156. int thisParam;
  1157. };
  1158. //
  1159. class CassandraDatasetBinder : public CassandraRecordBinder
  1160. {
  1161. public:
  1162. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  1163. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  1164. {
  1165. }
  1166. bool bindNext()
  1167. {
  1168. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1169. if (!nextRow)
  1170. return false;
  1171. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1172. return true;
  1173. }
  1174. void executeAll(CassandraStatementInfo *stmtInfo)
  1175. {
  1176. stmtInfo->startStream();
  1177. while (bindNext())
  1178. {
  1179. stmtInfo->execute();
  1180. }
  1181. stmtInfo->endStream();
  1182. }
  1183. protected:
  1184. Owned<IRowStream> input;
  1185. };
  1186. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  1187. // interrogated to return each row of the result in turn
  1188. class CassandraRowStream : public CInterfaceOf<IRowStream>
  1189. {
  1190. public:
  1191. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1192. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1193. {
  1194. executePending = true;
  1195. eof = false;
  1196. }
  1197. virtual const void *nextRow()
  1198. {
  1199. // A little complex when streaming data in as well as out - want to execute for every input record
  1200. if (eof)
  1201. return NULL;
  1202. for (;;)
  1203. {
  1204. if (executePending)
  1205. {
  1206. executePending = false;
  1207. if (inputStream && !inputStream->bindNext())
  1208. {
  1209. noteEOF();
  1210. return NULL;
  1211. }
  1212. stmtInfo->execute();
  1213. }
  1214. if (stmtInfo->next())
  1215. break;
  1216. if (inputStream)
  1217. executePending = true;
  1218. else
  1219. {
  1220. noteEOF();
  1221. return NULL;
  1222. }
  1223. }
  1224. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1225. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1226. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1227. assertex(typeInfo);
  1228. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1229. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1230. return rowBuilder.finalizeRowClear(len);
  1231. }
  1232. virtual void stop()
  1233. {
  1234. resultAllocator.clear();
  1235. stmtInfo->stop();
  1236. }
  1237. protected:
  1238. void noteEOF()
  1239. {
  1240. if (!eof)
  1241. {
  1242. eof = true;
  1243. stop();
  1244. }
  1245. }
  1246. Linked<CassandraDatasetBinder> inputStream;
  1247. Linked<CassandraStatementInfo> stmtInfo;
  1248. Linked<IEngineRowAllocator> resultAllocator;
  1249. bool executePending;
  1250. bool eof;
  1251. };
  1252. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1253. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1254. {
  1255. public:
  1256. CassandraEmbedFunctionContext(const IContextLogger &_logctx, const IThorActivityContext *_activityCtx, unsigned _flags, const char *options)
  1257. : logctx(_logctx), activityCtx(_activityCtx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
  1258. {
  1259. StringArray opts;
  1260. opts.appendList(options, ",");
  1261. hash64_t hash = 0;
  1262. ForEachItemInRev(idx, opts)
  1263. {
  1264. const char *opt = opts.item(idx);
  1265. if (strnicmp(opt, "batch=", 6)==0)
  1266. {
  1267. const char *val=opt+6;
  1268. if (stricmp(val, "LOGGED")==0)
  1269. batchMode = CASS_BATCH_TYPE_LOGGED;
  1270. else if (stricmp(val, "UNLOGGED")==0)
  1271. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  1272. else if (stricmp(val, "COUNTER")==0)
  1273. batchMode = CASS_BATCH_TYPE_COUNTER;
  1274. opts.remove(idx);
  1275. }
  1276. else if (strnicmp(opt, "pagesize=", 9)==0)
  1277. {
  1278. pageSize = atoi(opt+9);
  1279. opts.remove(idx);
  1280. }
  1281. else
  1282. hash = rtlHash64VStr(opt, hash);
  1283. }
  1284. cluster.setown(lookupCachedSession(hash, opts));
  1285. }
  1286. virtual bool getBooleanResult()
  1287. {
  1288. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1289. checkSingleRow();
  1290. return ret;
  1291. }
  1292. virtual void getDataResult(size32_t &len, void * &result)
  1293. {
  1294. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1295. checkSingleRow();
  1296. }
  1297. virtual double getRealResult()
  1298. {
  1299. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1300. checkSingleRow();
  1301. return ret;
  1302. }
  1303. virtual __int64 getSignedResult()
  1304. {
  1305. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1306. checkSingleRow();
  1307. return ret;
  1308. }
  1309. virtual unsigned __int64 getUnsignedResult()
  1310. {
  1311. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1312. checkSingleRow();
  1313. return ret;
  1314. }
  1315. virtual void getStringResult(size32_t &chars, char * &result)
  1316. {
  1317. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1318. checkSingleRow();
  1319. }
  1320. virtual void getUTF8Result(size32_t &chars, char * &result)
  1321. {
  1322. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1323. checkSingleRow();
  1324. }
  1325. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1326. {
  1327. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1328. checkSingleRow();
  1329. }
  1330. virtual void getDecimalResult(Decimal &value)
  1331. {
  1332. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1333. checkSingleRow();
  1334. }
  1335. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1336. {
  1337. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1338. rtlRowBuilder out;
  1339. byte *outData = NULL;
  1340. size32_t outBytes = 0;
  1341. while (cass_iterator_next(iterator))
  1342. {
  1343. const CassValue *value = cass_iterator_get_value(iterator);
  1344. assertex(value);
  1345. if (elemSize != UNKNOWN_LENGTH)
  1346. {
  1347. out.ensureAvailable(outBytes + elemSize);
  1348. outData = out.getbytes() + outBytes;
  1349. }
  1350. switch ((type_t) elemType)
  1351. {
  1352. case type_int:
  1353. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1354. break;
  1355. case type_unsigned:
  1356. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1357. break;
  1358. case type_real:
  1359. if (elemSize == sizeof(double))
  1360. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1361. else
  1362. {
  1363. assertex(elemSize == sizeof(float));
  1364. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1365. }
  1366. break;
  1367. case type_boolean:
  1368. assertex(elemSize == sizeof(bool));
  1369. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1370. break;
  1371. case type_string:
  1372. case type_varstring:
  1373. {
  1374. rtlDataAttr str;
  1375. size32_t lenBytes;
  1376. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1377. if (elemSize == UNKNOWN_LENGTH)
  1378. {
  1379. if (elemType == type_string)
  1380. {
  1381. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1382. outData = out.getbytes() + outBytes;
  1383. * (size32_t *) outData = lenBytes;
  1384. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1385. outBytes += lenBytes + sizeof(size32_t);
  1386. }
  1387. else
  1388. {
  1389. out.ensureAvailable(outBytes + lenBytes + 1);
  1390. outData = out.getbytes() + outBytes;
  1391. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1392. outBytes += lenBytes + 1;
  1393. }
  1394. }
  1395. else
  1396. {
  1397. if (elemType == type_string)
  1398. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1399. else
  1400. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1401. }
  1402. break;
  1403. }
  1404. case type_unicode:
  1405. case type_utf8:
  1406. {
  1407. rtlDataAttr str;
  1408. size32_t lenChars;
  1409. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1410. const char * text = str.getstr();
  1411. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1412. if (elemType == type_utf8)
  1413. {
  1414. assertex (elemSize == UNKNOWN_LENGTH);
  1415. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1416. outData = out.getbytes() + outBytes;
  1417. * (size32_t *) outData = lenChars;
  1418. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1419. outBytes += lenBytes + sizeof(size32_t);
  1420. }
  1421. else
  1422. {
  1423. if (elemSize == UNKNOWN_LENGTH)
  1424. {
  1425. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1426. size32_t numchars16;
  1427. rtlDataAttr unicode16;
  1428. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1429. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1430. outData = out.getbytes() + outBytes;
  1431. * (size32_t *) outData = numchars16;
  1432. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1433. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1434. }
  1435. else
  1436. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1437. }
  1438. break;
  1439. }
  1440. default:
  1441. fail("type mismatch - unsupported return type");
  1442. }
  1443. if (elemSize != UNKNOWN_LENGTH)
  1444. outBytes += elemSize;
  1445. }
  1446. __isAllResult = false;
  1447. __resultBytes = outBytes;
  1448. __result = out.detachdata();
  1449. }
  1450. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1451. {
  1452. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1453. }
  1454. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1455. {
  1456. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1457. typeError("row", NULL, NULL);
  1458. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1459. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1460. stream.stop();
  1461. if (ret == NULL) // Check for exactly one returned row
  1462. typeError("row", NULL, NULL);
  1463. return (byte *) ret.getClear();
  1464. }
  1465. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1466. {
  1467. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1468. typeError("row", NULL, NULL);
  1469. if (!stmtInfo->next())
  1470. fail("Failed to read row");
  1471. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1472. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1473. assertex(typeInfo);
  1474. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1475. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1476. }
  1477. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, const byte *val) override
  1478. {
  1479. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1480. binder.processRow(val);
  1481. nextParam += binder.numFields();
  1482. }
  1483. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1484. {
  1485. // We only support a single dataset parameter...
  1486. // MORE - look into batch?
  1487. if (inputStream)
  1488. {
  1489. fail("At most one dataset parameter supported");
  1490. }
  1491. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1492. nextParam += inputStream->numFields();
  1493. }
  1494. virtual void bindBooleanParam(const char *name, bool val)
  1495. {
  1496. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1497. }
  1498. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1499. {
  1500. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), (const cass_byte_t*) val, len), name);
  1501. }
  1502. virtual void bindFloatParam(const char *name, float val)
  1503. {
  1504. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1505. }
  1506. virtual void bindRealParam(const char *name, double val)
  1507. {
  1508. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1509. }
  1510. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1511. {
  1512. if (size > 4)
  1513. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1514. else
  1515. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1516. }
  1517. virtual void bindSignedParam(const char *name, __int64 val)
  1518. {
  1519. bindSignedSizeParam(name, 8, val);
  1520. }
  1521. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1522. {
  1523. UNSUPPORTED("UNSIGNED columns");
  1524. }
  1525. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1526. {
  1527. UNSUPPORTED("UNSIGNED columns");
  1528. }
  1529. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1530. {
  1531. size32_t utf8chars;
  1532. rtlDataAttr utfText;
  1533. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1534. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1535. checkNextParam(name),
  1536. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1537. name);
  1538. }
  1539. virtual void bindVStringParam(const char *name, const char *val)
  1540. {
  1541. bindStringParam(name, strlen(val), val);
  1542. }
  1543. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1544. {
  1545. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(name), val, rtlUtf8Size(chars, val)), name);
  1546. }
  1547. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1548. {
  1549. size32_t utf8chars;
  1550. rtlDataAttr utfText;
  1551. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1552. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1553. checkNextParam(name),
  1554. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1555. name);
  1556. }
  1557. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  1558. {
  1559. if (isAll)
  1560. UNSUPPORTED("SET(ALL)");
  1561. type_t typecode = (type_t) elemType;
  1562. const byte *inData = (const byte *) setData;
  1563. const byte *endData = inData + totalBytes;
  1564. int numElems;
  1565. if (elemSize == UNKNOWN_LENGTH)
  1566. {
  1567. numElems = 0;
  1568. // Will need 2 passes to work out how many elements there are in the set :(
  1569. while (inData < endData)
  1570. {
  1571. int thisSize;
  1572. switch (elemType)
  1573. {
  1574. case type_varstring:
  1575. thisSize = strlen((const char *) inData) + 1;
  1576. break;
  1577. case type_string:
  1578. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1579. break;
  1580. case type_unicode:
  1581. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1582. break;
  1583. case type_utf8:
  1584. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1585. break;
  1586. default:
  1587. fail("Unsupported parameter type");
  1588. break;
  1589. }
  1590. inData += thisSize;
  1591. numElems++;
  1592. }
  1593. inData = (const byte *) setData;
  1594. }
  1595. else
  1596. numElems = totalBytes / elemSize;
  1597. // We don't know whether the corresponding field in Cassandra is a list or a set. Try binding a dummy list to tell which.
  1598. Owned<CassandraCollection> collection;
  1599. CassandraCollection temp(cass_collection_new(CASS_COLLECTION_TYPE_LIST, 0));
  1600. if (cass_statement_bind_collection(stmtInfo->queryStatement(), nextParam, temp) == CASS_OK)
  1601. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numElems)));
  1602. else
  1603. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems)));
  1604. while (inData < endData)
  1605. {
  1606. size32_t thisSize = elemSize;
  1607. CassError rc;
  1608. switch (typecode)
  1609. {
  1610. case type_int:
  1611. if (elemSize > 4)
  1612. rc = cass_collection_append_int64(*collection, rtlReadInt(inData, elemSize));
  1613. else
  1614. rc = cass_collection_append_int32(*collection, rtlReadInt(inData, elemSize));
  1615. break;
  1616. case type_unsigned:
  1617. UNSUPPORTED("UNSIGNED columns");
  1618. break;
  1619. case type_varstring:
  1620. {
  1621. size32_t numChars = strlen((const char *) inData);
  1622. if (elemSize == UNKNOWN_LENGTH)
  1623. thisSize = numChars + 1;
  1624. size32_t utf8chars;
  1625. rtlDataAttr utfText;
  1626. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1627. rc = cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1628. break;
  1629. }
  1630. case type_string:
  1631. {
  1632. if (elemSize == UNKNOWN_LENGTH)
  1633. {
  1634. thisSize = * (size32_t *) inData;
  1635. inData += sizeof(size32_t);
  1636. }
  1637. size32_t utf8chars;
  1638. rtlDataAttr utfText;
  1639. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1640. rc = cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1641. break;
  1642. }
  1643. case type_real:
  1644. if (elemSize == sizeof(double))
  1645. rc = cass_collection_append_double(*collection, * (double *) inData);
  1646. else
  1647. rc = cass_collection_append_float(*collection, * (float *) inData);
  1648. break;
  1649. case type_boolean:
  1650. assertex(elemSize == sizeof(bool));
  1651. rc = cass_collection_append_bool(*collection, *(bool*)inData ? cass_true : cass_false);
  1652. break;
  1653. case type_unicode:
  1654. {
  1655. if (elemSize == UNKNOWN_LENGTH)
  1656. {
  1657. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1658. inData += sizeof(size32_t);
  1659. }
  1660. unsigned unicodeChars;
  1661. rtlDataAttr unicode;
  1662. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1663. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1664. rc = cass_collection_append_string_n(*collection, unicode.getstr(), sizeBytes);
  1665. break;
  1666. }
  1667. case type_utf8:
  1668. {
  1669. assertex (elemSize == UNKNOWN_LENGTH);
  1670. size32_t numChars = * (size32_t *) inData;
  1671. inData += sizeof(size32_t);
  1672. thisSize = rtlUtf8Size(numChars, inData);
  1673. rc = cass_collection_append_string_n(*collection, (const char *) inData, thisSize);
  1674. break;
  1675. }
  1676. case type_data:
  1677. if (elemSize == UNKNOWN_LENGTH)
  1678. {
  1679. thisSize = * (size32_t *) inData;
  1680. inData += sizeof(size32_t);
  1681. }
  1682. rc = cass_collection_append_bytes(*collection, (const cass_byte_t*) inData, thisSize);
  1683. break;
  1684. default:
  1685. fail("Unsupported parameter type");
  1686. }
  1687. checkBind(rc, name);
  1688. inData += thisSize;
  1689. }
  1690. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1691. checkNextParam(name),
  1692. *collection),
  1693. name);
  1694. }
  1695. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  1696. {
  1697. return NULL;
  1698. }
  1699. virtual void paramWriterCommit(IInterface *writer)
  1700. {
  1701. }
  1702. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  1703. {
  1704. }
  1705. virtual void importFunction(size32_t lenChars, const char *text)
  1706. {
  1707. throwUnexpected();
  1708. }
  1709. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1710. {
  1711. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1712. rtlSubstituteActivityContext(queryString, activityCtx, chars, _script);
  1713. const char *script = queryString.str(); // Now null terminated
  1714. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1715. {
  1716. for (;;)
  1717. {
  1718. const char *nextScript = findUnquoted(script, ';');
  1719. if (!nextScript)
  1720. {
  1721. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1722. break;
  1723. }
  1724. CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
  1725. CassandraFuture future(cass_session_execute(cluster->querySession(), statement));
  1726. future.wait("execute statement");
  1727. script = nextScript;
  1728. }
  1729. }
  1730. else
  1731. {
  1732. if ((flags & EFnoparams) == 0)
  1733. numParams = countBindings(script);
  1734. else
  1735. numParams = 0;
  1736. stmtInfo.setown(cluster->createStatementInfo(script, numParams, batchMode, pageSize));
  1737. }
  1738. }
  1739. virtual void callFunction()
  1740. {
  1741. // Does not seem to be a way to check number of parameters expected...
  1742. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1743. // fail("Not enough parameters");
  1744. try
  1745. {
  1746. if (stmtInfo && !stmtInfo->hasResult())
  1747. lazyExecute();
  1748. }
  1749. catch (IException *E)
  1750. {
  1751. StringBuffer msg;
  1752. E->errorMessage(msg);
  1753. msg.appendf(" (processing query %s)", queryString.str());
  1754. throw makeStringException(E->errorCode(), msg);
  1755. }
  1756. }
  1757. virtual void loadCompiledScript(size32_t chars, const void *_script) override
  1758. {
  1759. throwUnexpected();
  1760. }
  1761. virtual void enter() override {}
  1762. virtual void reenter(ICodeContext *codeCtx) override {}
  1763. virtual void exit() override {}
  1764. protected:
  1765. void lazyExecute()
  1766. {
  1767. if (inputStream)
  1768. inputStream->executeAll(stmtInfo);
  1769. else
  1770. stmtInfo->execute();
  1771. }
  1772. const CassValue *getScalarResult()
  1773. {
  1774. if (!stmtInfo->next())
  1775. typeError("scalar", NULL, NULL);
  1776. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1777. typeError("scalar", NULL, NULL);
  1778. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1779. if (!result)
  1780. typeError("scalar", NULL, NULL);
  1781. return result;
  1782. }
  1783. void checkSingleRow()
  1784. {
  1785. if (stmtInfo->rowCount() != 1)
  1786. typeError("scalar", NULL, NULL);
  1787. }
  1788. unsigned countBindings(const char *query)
  1789. {
  1790. unsigned queryCount = 0;
  1791. while ((query = findUnquoted(query, '?')) != NULL)
  1792. queryCount++;
  1793. return queryCount;
  1794. }
  1795. const char *findUnquoted(const char *query, char searchFor)
  1796. {
  1797. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1798. char inStr = '\0';
  1799. char ch;
  1800. while ((ch = *query++) != 0)
  1801. {
  1802. if (ch == inStr)
  1803. inStr = false;
  1804. else switch (ch)
  1805. {
  1806. case '\'':
  1807. case '"':
  1808. inStr = ch;
  1809. break;
  1810. case '\\':
  1811. if (inStr && *query)
  1812. query++;
  1813. break;
  1814. case '/':
  1815. if (!inStr)
  1816. {
  1817. if (*query=='/')
  1818. {
  1819. while (*query && *query != '\n')
  1820. query++;
  1821. }
  1822. else if (*query=='*')
  1823. {
  1824. query++;
  1825. for (;;)
  1826. {
  1827. if (!*query)
  1828. fail("Unterminated comment in query string");
  1829. if (*query=='*' && query[1]=='/')
  1830. {
  1831. query+= 2;
  1832. break;
  1833. }
  1834. query++;
  1835. }
  1836. }
  1837. }
  1838. break;
  1839. default:
  1840. if (!inStr && ch==searchFor)
  1841. return query;
  1842. break;
  1843. }
  1844. }
  1845. return NULL;
  1846. }
  1847. inline unsigned checkNextParam(const char *name)
  1848. {
  1849. if (nextParam == numParams)
  1850. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1851. return nextParam++;
  1852. }
  1853. inline void checkBind(CassError rc, const char *name)
  1854. {
  1855. if (rc != CASS_OK)
  1856. {
  1857. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1858. }
  1859. }
  1860. Owned<CassandraClusterSession> cluster;
  1861. Owned<CassandraStatementInfo> stmtInfo;
  1862. Owned<CassandraDatasetBinder> inputStream;
  1863. const IContextLogger &logctx;
  1864. const IThorActivityContext *activityCtx;
  1865. unsigned flags;
  1866. unsigned nextParam;
  1867. unsigned numParams;
  1868. StringBuffer queryString;
  1869. CassBatchType batchMode;
  1870. unsigned pageSize;
  1871. };
  1872. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1873. {
  1874. public:
  1875. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
  1876. {
  1877. return createFunctionContextEx(nullptr, nullptr, flags, options);
  1878. }
  1879. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
  1880. {
  1881. if (flags & EFimport)
  1882. UNSUPPORTED("IMPORT");
  1883. else
  1884. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), activityCtx, flags, options);
  1885. }
  1886. virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
  1887. {
  1888. throwUnexpected();
  1889. }
  1890. };
  1891. extern DECL_EXPORT IEmbedContext* getEmbedContext()
  1892. {
  1893. return new CassandraEmbedContext();
  1894. }
  1895. extern DECL_EXPORT bool syntaxCheck(const char *script)
  1896. {
  1897. return true; // MORE
  1898. }
  1899. } // namespace