cassandraembed.cpp 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "cassandraembed.hpp"
  28. #ifdef _WIN32
  29. #define EXPORT __declspec(dllexport)
  30. #else
  31. #define EXPORT
  32. #endif
  33. static const char * compatibleVersions[] = {
  34. "Cassandra Embed Helper 1.0.0",
  35. NULL };
  36. static const char *version = "Cassandra Embed Helper 1.0.0";
  37. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  38. {
  39. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  40. {
  41. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  42. pbx->compatibleVersions = compatibleVersions;
  43. }
  44. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  45. return false;
  46. pb->magicVersion = PLUGIN_VERSION;
  47. pb->version = version;
  48. pb->moduleName = "cassandra";
  49. pb->ECL = NULL;
  50. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  51. pb->description = "Cassandra Embed Helper";
  52. return true;
  53. }
  54. namespace cassandraembed {
  55. extern void UNSUPPORTED(const char *feature)
  56. {
  57. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  58. }
  59. static void logCallBack(const CassLogMessage *message, void *data)
  60. {
  61. DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
  62. }
  63. extern void failx(const char *message, ...)
  64. {
  65. va_list args;
  66. va_start(args,message);
  67. StringBuffer msg;
  68. msg.append("cassandra: ").valist_appendf(message,args);
  69. va_end(args);
  70. rtlFail(0, msg.str());
  71. }
  72. extern void fail(const char *message)
  73. {
  74. StringBuffer msg;
  75. msg.append("cassandra: ").append(message);
  76. rtlFail(0, msg.str());
  77. }
  78. void check(CassError rc)
  79. {
  80. if (rc != CASS_OK)
  81. {
  82. fail(cass_error_desc(rc));
  83. }
  84. }
  85. // Wrappers to Cassandra structures that require corresponding releases
  86. void CassandraClusterSession::setOptions(const StringArray &options)
  87. {
  88. const char *contact_points = "localhost";
  89. const char *user = "";
  90. const char *password = "";
  91. ForEachItemIn(idx, options)
  92. {
  93. const char *opt = options.item(idx);
  94. const char *val = strchr(opt, '=');
  95. if (val)
  96. {
  97. StringBuffer optName(val-opt, opt);
  98. val++;
  99. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  100. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  101. else if (stricmp(optName, "user")==0)
  102. user = val;
  103. else if (stricmp(optName, "password")==0)
  104. password = val;
  105. else if (stricmp(optName, "keyspace")==0)
  106. keyspace.set(val);
  107. else if (stricmp(optName, "maxFutures")==0)
  108. {
  109. if (!semaphore)
  110. {
  111. maxFutures=getUnsignedOption(val, "maxFutures");
  112. if (maxFutures)
  113. semaphore = new Semaphore(maxFutures);
  114. }
  115. }
  116. else if (stricmp(optName, "maxRetries")==0)
  117. maxRetries=getUnsignedOption(val, "maxRetries");
  118. else if (stricmp(optName, "port")==0)
  119. {
  120. unsigned port = getUnsignedOption(val, "port");
  121. checkSetOption(cass_cluster_set_port(cluster, port), "port");
  122. }
  123. else if (stricmp(optName, "protocol_version")==0)
  124. {
  125. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  126. checkSetOption(cass_cluster_set_protocol_version(cluster, protocol_version), "protocol_version");
  127. }
  128. else if (stricmp(optName, "num_threads_io")==0)
  129. {
  130. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  131. cass_cluster_set_num_threads_io(cluster, num_threads_io); // No status return
  132. }
  133. else if (stricmp(optName, "queue_size_io")==0)
  134. {
  135. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  136. checkSetOption(cass_cluster_set_queue_size_io(cluster, queue_size_io), "queue_size_io");
  137. }
  138. else if (stricmp(optName, "core_connections_per_host")==0)
  139. {
  140. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  141. checkSetOption(cass_cluster_set_core_connections_per_host(cluster, core_connections_per_host), "core_connections_per_host");
  142. }
  143. else if (stricmp(optName, "max_connections_per_host")==0)
  144. {
  145. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  146. checkSetOption(cass_cluster_set_max_connections_per_host(cluster, max_connections_per_host), "max_connections_per_host");
  147. }
  148. else if (stricmp(optName, "max_concurrent_creation")==0)
  149. {
  150. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  151. checkSetOption(cass_cluster_set_max_concurrent_creation(cluster, max_concurrent_creation), "max_concurrent_creation");
  152. }
  153. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  154. {
  155. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  156. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  157. }
  158. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  159. {
  160. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  161. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  162. }
  163. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  164. {
  165. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  166. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  167. }
  168. else if (stricmp(optName, "connect_timeout")==0)
  169. {
  170. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  171. cass_cluster_set_connect_timeout(cluster, connect_timeout);
  172. }
  173. else if (stricmp(optName, "request_timeout")==0)
  174. {
  175. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  176. cass_cluster_set_request_timeout(cluster, request_timeout);
  177. }
  178. else if (stricmp(optName, "load_balance_round_robin")==0)
  179. {
  180. cass_bool_t enable = getBoolOption(val, "load_balance_round_robin");
  181. if (enable==cass_true)
  182. cass_cluster_set_load_balance_round_robin(cluster);
  183. }
  184. else if (stricmp(optName, "load_balance_dc_aware")==0)
  185. {
  186. StringArray lbargs;
  187. lbargs.appendList(val, "|");
  188. if (lbargs.length() != 3)
  189. failx("Invalid value '%s' for option %s - expected 3 subvalues (separate with |)", val, optName.str());
  190. unsigned usedPerRemote = getUnsignedOption(lbargs.item(2), "load_balance_dc_aware");
  191. cass_bool_t allowRemote = getBoolOption(lbargs.item(2), "load_balance_dc_aware");
  192. checkSetOption(cass_cluster_set_load_balance_dc_aware(cluster, lbargs.item(0), usedPerRemote, allowRemote), "load_balance_dc_aware");
  193. }
  194. else if (stricmp(optName, "token_aware_routing")==0)
  195. {
  196. cass_bool_t enable = getBoolOption(val, "token_aware_routing");
  197. cass_cluster_set_token_aware_routing(cluster, enable);
  198. }
  199. else if (stricmp(optName, "latency_aware_routing")==0)
  200. {
  201. cass_bool_t enable = getBoolOption(val, "latency_aware_routing");
  202. cass_cluster_set_latency_aware_routing(cluster, enable);
  203. }
  204. else if (stricmp(optName, "latency_aware_routing_settings")==0)
  205. {
  206. StringArray subargs;
  207. subargs.appendList(val, "|");
  208. if (subargs.length() != 5)
  209. failx("Invalid value '%s' for option %s - expected 5 subvalues (separate with |)", val, optName.str());
  210. cass_double_t exclusion_threshold = getDoubleOption(subargs.item(0), "exclusion_threshold");
  211. cass_uint64_t scale_ms = getUnsigned64Option(subargs.item(1), "scale_ms");
  212. cass_uint64_t retry_period_ms = getUnsigned64Option(subargs.item(2), "retry_period_ms");
  213. cass_uint64_t update_rate_ms = getUnsigned64Option(subargs.item(3), "update_rate_ms");
  214. cass_uint64_t min_measured = getUnsigned64Option(subargs.item(4), "min_measured");
  215. cass_cluster_set_latency_aware_routing_settings(cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
  216. }
  217. else if (stricmp(optName, "tcp_nodelay")==0)
  218. {
  219. cass_bool_t enable = getBoolOption(val, "tcp_nodelay");
  220. cass_cluster_set_tcp_nodelay(cluster, enable);
  221. }
  222. else if (stricmp(optName, "tcp_keepalive")==0)
  223. {
  224. StringArray subargs;
  225. subargs.appendList(val, "|");
  226. if (subargs.length() != 2)
  227. failx("Invalid value '%s' for option %s - expected 2 subvalues (separate with |)", val, optName.str());
  228. cass_bool_t enabled = getBoolOption(subargs.item(0), "enabled");
  229. unsigned delay_secs = getUnsignedOption(subargs.item(0), "delay_secs");
  230. cass_cluster_set_tcp_keepalive(cluster, enabled, delay_secs);
  231. }
  232. else
  233. failx("Unrecognized option %s", optName.str());
  234. }
  235. }
  236. cass_cluster_set_contact_points(cluster, contact_points);
  237. if (*user || *password)
  238. cass_cluster_set_credentials(cluster, user, password);
  239. }
  240. void CassandraClusterSession::checkSetOption(CassError rc, const char *name)
  241. {
  242. if (rc != CASS_OK)
  243. {
  244. failx("While setting option %s: %s", name, cass_error_desc(rc));
  245. }
  246. }
  247. cass_bool_t CassandraClusterSession::getBoolOption(const char *val, const char *option)
  248. {
  249. return strToBool(val) ? cass_true : cass_false;
  250. }
  251. unsigned CassandraClusterSession::getUnsignedOption(const char *val, const char *option)
  252. {
  253. char *endp;
  254. long value = strtoul(val, &endp, 0);
  255. if (endp==val || *endp != '\0' || value > UINT_MAX || value < 0)
  256. failx("Invalid value '%s' for option %s", val, option);
  257. return (unsigned) value;
  258. }
  259. unsigned CassandraClusterSession::getDoubleOption(const char *val, const char *option)
  260. {
  261. char *endp;
  262. double value = strtod(val, &endp);
  263. if (endp==val || *endp != '\0')
  264. failx("Invalid value '%s' for option %s", val, option);
  265. return value;
  266. }
  267. __uint64 CassandraClusterSession::getUnsigned64Option(const char *val, const char *option)
  268. {
  269. // MORE - could check it's all digits (with optional leading spaces...), if we cared.
  270. return rtlVStrToUInt8(val);
  271. }
  272. void CassandraClusterSession::connect()
  273. {
  274. assertex(cluster && !session);
  275. session.setown(new CassandraSession(cass_session_new()));
  276. CassandraFuture future(keyspace.isEmpty() ? cass_session_connect(*session, cluster) : cass_session_connect_keyspace(*session, cluster, keyspace));
  277. future.wait("connect");
  278. }
  279. void CassandraClusterSession::disconnect()
  280. {
  281. session.clear();
  282. }
  283. CassandraPrepared *CassandraClusterSession::prepareStatement(const char *query, bool trace) const
  284. {
  285. assertex(session);
  286. CriticalBlock b(cacheCrit);
  287. Linked<CassandraPrepared> cached = preparedCache.getValue(query);
  288. if (cached)
  289. return cached.getClear();
  290. {
  291. // We don't want to block cache lookups while we prepare a new bound statement
  292. // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
  293. CriticalUnblock b(cacheCrit);
  294. CassandraFuture futurePrep(cass_session_prepare(*session, query));
  295. futurePrep.wait("prepare statement");
  296. cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep), trace ? query : NULL));
  297. }
  298. preparedCache.setValue(query, cached); // NOTE - this links parameter
  299. return cached.getClear();
  300. }
  301. CassandraStatementInfo *CassandraClusterSession::createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const
  302. {
  303. Owned<CassandraPrepared> prepared = prepareStatement(script, false); // We could make tracing selectable
  304. return new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize, semaphore, maxRetries);
  305. }
  306. typedef CassandraClusterSession *CassandraClusterSessionPtr;
  307. typedef MapBetween<hash64_t, hash64_t, CassandraClusterSessionPtr, CassandraClusterSessionPtr> ClusterSessionMap;
  308. static CriticalSection clusterCacheCrit;
  309. static ClusterSessionMap cachedSessions;
  310. CassandraClusterSession *lookupCachedSession(hash64_t hash, const StringArray &opts)
  311. {
  312. Owned<CassandraClusterSession> cluster;
  313. CassandraClusterSessionPtr *found = cachedSessions.getValue(hash);
  314. if (found)
  315. cluster.set(*found);
  316. if (!cluster)
  317. {
  318. cluster.setown(new CassandraClusterSession(cass_cluster_new()));
  319. cluster->setOptions(opts);
  320. cluster->connect();
  321. cachedSessions.setValue(hash, cluster.getLink());
  322. }
  323. return cluster.getClear();
  324. }
  325. MODULE_INIT(INIT_PRIORITY_STANDARD)
  326. {
  327. cass_log_set_callback(logCallBack, NULL);
  328. cass_log_set_level(CASS_LOG_WARN);
  329. return true;
  330. }
  331. MODULE_EXIT()
  332. {
  333. HashIterator i(cachedSessions);
  334. ForEach(i)
  335. {
  336. CassandraClusterSession *session = *cachedSessions.mapToValue(&i.query());
  337. ::Release(session);
  338. }
  339. }
  340. //------------------
  341. void CassandraFuture::wait(const char *why) const
  342. {
  343. cass_future_wait(future);
  344. CassError rc = cass_future_error_code(future);
  345. if(rc != CASS_OK)
  346. {
  347. const char *message;
  348. size_t length;
  349. cass_future_error_message(future, &message, &length);
  350. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
  351. #ifdef _DEBUG
  352. DBGLOG("%s", err.str());
  353. #endif
  354. rtlFail(0, err.str());
  355. }
  356. }
  357. void CassandraSession::set(CassSession *_session)
  358. {
  359. if (session)
  360. {
  361. CassandraFuture close_future(cass_session_close(session));
  362. cass_future_wait(close_future);
  363. cass_session_free(session);
  364. }
  365. session = _session;
  366. }
  367. //----------------------
  368. CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries)
  369. : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
  370. {
  371. execute();
  372. }
  373. CassandraRetryingFuture::~CassandraRetryingFuture()
  374. {
  375. if (future)
  376. cass_future_free(future);
  377. }
  378. void CassandraRetryingFuture::wait(const char *why)
  379. {
  380. cass_future_wait(future);
  381. CassError rc = cass_future_error_code(future);
  382. if(rc != CASS_OK)
  383. {
  384. switch (rc)
  385. {
  386. case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: // MORE - are there others we should retry?
  387. if (retry(why))
  388. break;
  389. // fall into
  390. default:
  391. const char *message;
  392. size_t length;
  393. cass_future_error_message(future, &message, &length);
  394. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
  395. rtlFail(0, err.str());
  396. }
  397. }
  398. }
  399. bool CassandraRetryingFuture::retry(const char *why)
  400. {
  401. for (int i = 0; i < retries; i++)
  402. {
  403. execute();
  404. cass_future_wait(future);
  405. CassError rc = cass_future_error_code(future);
  406. if (rc == CASS_OK)
  407. return true;
  408. Sleep(10);
  409. }
  410. return false;
  411. }
  412. void CassandraRetryingFuture::execute()
  413. {
  414. if (limiter)
  415. limiter->wait();
  416. future = cass_session_execute(session, statement);
  417. if (limiter)
  418. cass_future_set_callback(future, signaller, limiter); // Note - this will call the callback if the future has already completed
  419. }
  420. void CassandraRetryingFuture::signaller(CassFuture *future, void *data)
  421. {
  422. Semaphore *sem = (Semaphore *) data;
  423. sem->signal();
  424. }
  425. //----------------------
  426. CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries)
  427. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(_semaphore), maxRetries(_maxRetries)
  428. {
  429. assertex(prepared && *prepared);
  430. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  431. if (pageSize)
  432. cass_statement_set_paging_size(*statement, pageSize);
  433. inBatch = false;
  434. }
  435. CassandraStatementInfo::~CassandraStatementInfo()
  436. {
  437. stop();
  438. futures.kill();
  439. }
  440. void CassandraStatementInfo::stop()
  441. {
  442. iterator.clear();
  443. result.clear();
  444. prepared.clear();
  445. }
  446. bool CassandraStatementInfo::next()
  447. {
  448. loop
  449. {
  450. if (!iterator)
  451. {
  452. if (result)
  453. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  454. else
  455. return false;
  456. }
  457. if (cass_iterator_next(*iterator))
  458. return true;
  459. iterator.clear();
  460. if (!cass_result_has_more_pages(*result))
  461. {
  462. result.clear();
  463. break;
  464. }
  465. cass_statement_set_paging_state(*statement, *result);
  466. result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
  467. }
  468. return false;
  469. }
  470. void CassandraStatementInfo::startStream()
  471. {
  472. if (batchMode != (CassBatchType) -1)
  473. batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
  474. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  475. inBatch = true;
  476. }
  477. void CassandraStatementInfo::endStream()
  478. {
  479. if (batch)
  480. {
  481. result.setown(new CassandraFutureResult (cass_session_execute_batch(*session, *batch)));
  482. assertex (rowCount() == 0);
  483. }
  484. else
  485. {
  486. ForEachItemIn(idx, futures)
  487. {
  488. futures.item(idx).wait("endStream");
  489. }
  490. }
  491. }
  492. void CassandraStatementInfo::execute()
  493. {
  494. assertex(statement && *statement);
  495. if (batch)
  496. {
  497. check(cass_batch_add_statement(*batch, *statement));
  498. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  499. }
  500. else if (inBatch)
  501. {
  502. futures.append(*new CassandraRetryingFuture(*session, statement->getClear(), semaphore, maxRetries));
  503. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  504. }
  505. else
  506. {
  507. result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
  508. }
  509. }
  510. // Conversions from Cassandra values to ECL data
  511. static const char *getTypeName(CassValueType type)
  512. {
  513. switch (type)
  514. {
  515. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  516. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  517. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  518. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  519. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  520. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  521. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  522. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  523. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  524. case CASS_VALUE_TYPE_INT: return "INT";
  525. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  526. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  527. case CASS_VALUE_TYPE_UUID: return "UUID";
  528. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  529. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  530. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  531. case CASS_VALUE_TYPE_INET: return "INET";
  532. case CASS_VALUE_TYPE_LIST: return "LIST";
  533. case CASS_VALUE_TYPE_MAP: return "MAP";
  534. case CASS_VALUE_TYPE_SET: return "SET";
  535. default: return "UNKNOWN";
  536. }
  537. }
  538. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  539. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  540. {
  541. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  542. if (field)
  543. msg.appendf(" for field %s", str(field->name));
  544. if (value)
  545. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  546. rtlFail(0, msg.str());
  547. }
  548. extern bool isInteger(const CassValueType t)
  549. {
  550. switch (t)
  551. {
  552. case CASS_VALUE_TYPE_TIMESTAMP:
  553. case CASS_VALUE_TYPE_INT:
  554. case CASS_VALUE_TYPE_BIGINT:
  555. case CASS_VALUE_TYPE_COUNTER:
  556. case CASS_VALUE_TYPE_VARINT:
  557. return true;
  558. default:
  559. return false;
  560. }
  561. }
  562. extern bool isString(CassValueType t)
  563. {
  564. switch (t)
  565. {
  566. case CASS_VALUE_TYPE_VARCHAR:
  567. case CASS_VALUE_TYPE_TEXT:
  568. case CASS_VALUE_TYPE_ASCII:
  569. return true;
  570. default:
  571. return false;
  572. }
  573. }
  574. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  575. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  576. {
  577. const RtlTypeInfo *type = field->type;
  578. if ((type->fieldType & RFTMkind) == type_set)
  579. return type->queryChildType();
  580. else
  581. return type;
  582. }
  583. static int getNumFields(const RtlTypeInfo *record)
  584. {
  585. int count = 0;
  586. const RtlFieldInfo * const *fields = record->queryFields();
  587. assertex(fields);
  588. while (*fields++)
  589. count++;
  590. return count;
  591. }
  592. extern bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  593. {
  594. if (cass_value_is_null(value))
  595. {
  596. NullFieldProcessor p(field);
  597. return p.boolResult;
  598. }
  599. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  600. typeError("boolean", value, field);
  601. cass_bool_t output;
  602. check(cass_value_get_bool(value, &output));
  603. return output != cass_false;
  604. }
  605. extern void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  606. {
  607. if (cass_value_is_null(value))
  608. {
  609. NullFieldProcessor p(field);
  610. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  611. return;
  612. }
  613. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  614. // it seems like it could be more useful to support anything
  615. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  616. // typeError("blob", value, field);
  617. const cass_byte_t *bytes;
  618. size_t size;
  619. check(cass_value_get_bytes(value, &bytes, &size));
  620. rtlStrToDataX(chars, result, size, bytes);
  621. }
  622. extern double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  623. {
  624. if (cass_value_is_null(value))
  625. {
  626. NullFieldProcessor p(field);
  627. return p.doubleResult;
  628. }
  629. else if (isInteger(cass_value_type(value)))
  630. return (double) getSignedResult(field, value);
  631. else switch (cass_value_type(value))
  632. {
  633. case CASS_VALUE_TYPE_FLOAT:
  634. {
  635. cass_float_t output_f;
  636. check(cass_value_get_float(value, &output_f));
  637. return output_f;
  638. }
  639. case CASS_VALUE_TYPE_DOUBLE:
  640. {
  641. cass_double_t output_d;
  642. check(cass_value_get_double(value, &output_d));
  643. return output_d;
  644. }
  645. default:
  646. typeError("double", value, field);
  647. }
  648. }
  649. extern __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  650. {
  651. if (cass_value_is_null(value))
  652. {
  653. NullFieldProcessor p(field);
  654. return p.intResult;
  655. }
  656. switch (cass_value_type(value))
  657. {
  658. case CASS_VALUE_TYPE_INT:
  659. {
  660. cass_int32_t output;
  661. check(cass_value_get_int32(value, &output));
  662. return output;
  663. }
  664. case CASS_VALUE_TYPE_TIMESTAMP:
  665. case CASS_VALUE_TYPE_BIGINT:
  666. case CASS_VALUE_TYPE_COUNTER:
  667. case CASS_VALUE_TYPE_VARINT:
  668. {
  669. cass_int64_t output;
  670. check(cass_value_get_int64(value, &output));
  671. return output;
  672. }
  673. default:
  674. typeError("integer", value, field);
  675. }
  676. }
  677. extern unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  678. {
  679. if (cass_value_is_null(value))
  680. {
  681. NullFieldProcessor p(field);
  682. return p.uintResult;
  683. }
  684. return (__uint64) getSignedResult(field, value);
  685. }
  686. extern void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  687. {
  688. if (cass_value_is_null(value))
  689. {
  690. NullFieldProcessor p(field);
  691. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  692. return;
  693. }
  694. switch (cass_value_type(value))
  695. {
  696. case CASS_VALUE_TYPE_ASCII:
  697. {
  698. const char *output;
  699. size_t length;
  700. check(cass_value_get_string(value, &output, &length));
  701. rtlStrToStrX(chars, result, length, output);
  702. break;
  703. }
  704. case CASS_VALUE_TYPE_VARCHAR:
  705. case CASS_VALUE_TYPE_TEXT:
  706. {
  707. const char *output;
  708. size_t length;
  709. check(cass_value_get_string(value, &output, &length));
  710. unsigned numchars = rtlUtf8Length(length, output);
  711. rtlUtf8ToStrX(chars, result, numchars, output);
  712. break;
  713. }
  714. default:
  715. typeError("string", value, field);
  716. }
  717. }
  718. extern void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  719. {
  720. if (cass_value_is_null(value))
  721. {
  722. NullFieldProcessor p(field);
  723. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  724. return;
  725. }
  726. switch (cass_value_type(value))
  727. {
  728. case CASS_VALUE_TYPE_ASCII:
  729. {
  730. const char *output;
  731. size_t length;
  732. check(cass_value_get_string(value, &output, &length));
  733. rtlStrToUtf8X(chars, result, length, output);
  734. break;
  735. }
  736. case CASS_VALUE_TYPE_VARCHAR:
  737. case CASS_VALUE_TYPE_TEXT:
  738. {
  739. const char * output;
  740. size_t length;
  741. check(cass_value_get_string(value, &output, &length));
  742. unsigned numchars = rtlUtf8Length(length, output);
  743. rtlUtf8ToUtf8X(chars, result, numchars, output);
  744. break;
  745. }
  746. default:
  747. typeError("string", value, field);
  748. }
  749. }
  750. extern void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  751. {
  752. if (cass_value_is_null(value))
  753. {
  754. NullFieldProcessor p(field);
  755. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  756. return;
  757. }
  758. switch (cass_value_type(value))
  759. {
  760. case CASS_VALUE_TYPE_ASCII:
  761. {
  762. const char * output;
  763. size_t length;
  764. check(cass_value_get_string(value, &output, &length));
  765. rtlStrToUnicodeX(chars, result, length, output);
  766. break;
  767. }
  768. case CASS_VALUE_TYPE_VARCHAR:
  769. case CASS_VALUE_TYPE_TEXT:
  770. {
  771. const char * output;
  772. size_t length;
  773. check(cass_value_get_string(value, &output, &length));
  774. unsigned numchars = rtlUtf8Length(length, output);
  775. rtlUtf8ToUnicodeX(chars, result, numchars, output);
  776. break;
  777. }
  778. default:
  779. typeError("string", value, field);
  780. }
  781. }
  782. extern void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  783. {
  784. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  785. if (cass_value_is_null(value))
  786. {
  787. NullFieldProcessor p(field);
  788. result.set(p.decimalResult);
  789. return;
  790. }
  791. size32_t chars;
  792. rtlDataAttr tempStr;
  793. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  794. result.setString(chars, tempStr.getstr());
  795. if (field)
  796. {
  797. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  798. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  799. }
  800. }
  801. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  802. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  803. {
  804. public:
  805. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  806. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  807. {
  808. }
  809. virtual bool getBooleanResult(const RtlFieldInfo *field)
  810. {
  811. return cassandraembed::getBooleanResult(field, nextField(field));
  812. }
  813. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  814. {
  815. cassandraembed::getDataResult(field, nextField(field), len, result);
  816. }
  817. virtual double getRealResult(const RtlFieldInfo *field)
  818. {
  819. return cassandraembed::getRealResult(field, nextField(field));
  820. }
  821. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  822. {
  823. return cassandraembed::getSignedResult(field, nextField(field));
  824. }
  825. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  826. {
  827. return cassandraembed::getUnsignedResult(field, nextField(field));
  828. }
  829. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  830. {
  831. cassandraembed::getStringResult(field, nextField(field), chars, result);
  832. }
  833. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  834. {
  835. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  836. }
  837. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  838. {
  839. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  840. }
  841. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  842. {
  843. cassandraembed::getDecimalResult(field, nextField(field), value);
  844. }
  845. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  846. {
  847. isAll = false;
  848. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  849. }
  850. virtual bool processNextSet(const RtlFieldInfo * field)
  851. {
  852. numIteratorFields = 1;
  853. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  854. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  855. }
  856. virtual void processBeginDataset(const RtlFieldInfo * field)
  857. {
  858. numIteratorFields = getNumFields(field->type->queryChildType());
  859. switch (numIteratorFields)
  860. {
  861. case 1:
  862. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  863. break;
  864. case 2:
  865. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  866. break;
  867. default:
  868. UNSUPPORTED("Nested datasets with > 2 fields");
  869. }
  870. }
  871. virtual void processBeginRow(const RtlFieldInfo * field)
  872. {
  873. }
  874. virtual bool processNextRow(const RtlFieldInfo * field)
  875. {
  876. nextIteratedField = 0;
  877. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  878. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  879. }
  880. virtual void processEndSet(const RtlFieldInfo * field)
  881. {
  882. iterator.clear();
  883. numIteratorFields = 0;
  884. }
  885. virtual void processEndDataset(const RtlFieldInfo * field)
  886. {
  887. iterator.clear();
  888. numIteratorFields = 0;
  889. }
  890. virtual void processEndRow(const RtlFieldInfo * field)
  891. {
  892. }
  893. protected:
  894. const CassValue *nextField(const RtlFieldInfo * field)
  895. {
  896. const CassValue *ret;
  897. if (iterator)
  898. {
  899. switch (numIteratorFields)
  900. {
  901. case 1:
  902. ret = cass_iterator_get_value(*iterator);
  903. break;
  904. case 2:
  905. if (nextIteratedField==0)
  906. ret = cass_iterator_get_map_key(*iterator);
  907. else
  908. ret = cass_iterator_get_map_value(*iterator);
  909. nextIteratedField++;
  910. break;
  911. default:
  912. throwUnexpected();
  913. }
  914. }
  915. else
  916. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  917. if (!ret)
  918. failx("Too many fields in ECL output row, reading field %s", str(field->name));
  919. return ret;
  920. }
  921. const CassandraStatementInfo *stmtInfo;
  922. Owned<CassandraIterator> iterator;
  923. int colIdx;
  924. int numIteratorFields;
  925. int nextIteratedField;
  926. };
  927. // Bind Cassandra columns from an ECL record
  928. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  929. {
  930. public:
  931. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  932. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  933. {
  934. }
  935. int numFields()
  936. {
  937. int count = 0;
  938. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  939. assertex(fields);
  940. while (*fields++)
  941. count++;
  942. return count;
  943. }
  944. void processRow(const byte *row)
  945. {
  946. thisParam = firstParam;
  947. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  948. }
  949. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  950. {
  951. size32_t utf8chars;
  952. rtlDataAttr utfText;
  953. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  954. if (collection)
  955. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  956. field);
  957. else
  958. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  959. checkNextParam(field),
  960. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  961. field);
  962. }
  963. virtual void processBool(bool value, const RtlFieldInfo * field)
  964. {
  965. if (collection)
  966. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  967. else
  968. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  969. }
  970. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  971. {
  972. if (collection)
  973. checkBind(cass_collection_append_bytes(*collection, (const cass_byte_t*) value, len), field);
  974. else
  975. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), (const cass_byte_t*) value, len), field);
  976. }
  977. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  978. {
  979. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  980. {
  981. if (collection)
  982. checkBind(cass_collection_append_int64(*collection, value), field);
  983. else
  984. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  985. }
  986. else
  987. {
  988. if (collection)
  989. checkBind(cass_collection_append_int32(*collection, value), field);
  990. else
  991. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  992. }
  993. }
  994. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  995. {
  996. UNSUPPORTED("UNSIGNED columns");
  997. }
  998. virtual void processReal(double value, const RtlFieldInfo * field)
  999. {
  1000. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  1001. {
  1002. if (collection)
  1003. checkBind(cass_collection_append_double(*collection, value), field);
  1004. else
  1005. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  1006. }
  1007. else
  1008. {
  1009. if (collection)
  1010. checkBind(cass_collection_append_float(*collection, (float) value), field);
  1011. else
  1012. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  1013. }
  1014. }
  1015. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1016. {
  1017. Decimal val;
  1018. size32_t bytes;
  1019. rtlDataAttr decText;
  1020. val.setDecimal(digits, precision, value);
  1021. val.getStringX(bytes, decText.refstr());
  1022. processUtf8(bytes, decText.getstr(), field);
  1023. }
  1024. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1025. {
  1026. UNSUPPORTED("UNSIGNED decimals");
  1027. }
  1028. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  1029. {
  1030. size32_t utf8chars;
  1031. rtlDataAttr utfText;
  1032. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  1033. if (collection)
  1034. checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1035. field);
  1036. else
  1037. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1038. checkNextParam(field),
  1039. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1040. field);
  1041. }
  1042. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1043. {
  1044. size32_t charCount;
  1045. rtlDataAttr text;
  1046. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1047. processUtf8(charCount, text.getstr(), field);
  1048. }
  1049. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  1050. {
  1051. if (collection)
  1052. checkBind(cass_collection_append_string_n(*collection, value, rtlUtf8Size(chars, value)), field);
  1053. else
  1054. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(field), value, rtlUtf8Size(chars, value)), field);
  1055. }
  1056. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1057. {
  1058. if (isAll)
  1059. UNSUPPORTED("SET(ALL)");
  1060. // We don't know whether the corresponding field in Cassandra is a list or a set. Try binding a dummy list to tell which.
  1061. CassandraCollection temp(cass_collection_new(CASS_COLLECTION_TYPE_LIST, 0));
  1062. if (cass_statement_bind_collection(stmtInfo->queryStatement(), thisParam, temp) == CASS_OK)
  1063. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numElements)));
  1064. else
  1065. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  1066. return true;
  1067. }
  1068. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1069. {
  1070. // If there's a single field, assume we are mapping to a SET/LIST
  1071. // If there are two, assume it's a MAP
  1072. // Otherwise, fail
  1073. int numFields = getNumFields(field->type->queryChildType());
  1074. if (numFields < 1 || numFields > 2)
  1075. {
  1076. UNSUPPORTED("Nested datasets with > 2 fields");
  1077. }
  1078. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  1079. return true;
  1080. }
  1081. virtual bool processBeginRow(const RtlFieldInfo * field)
  1082. {
  1083. return true;
  1084. }
  1085. virtual void processEndSet(const RtlFieldInfo * field)
  1086. {
  1087. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1088. collection.clear();
  1089. }
  1090. virtual void processEndDataset(const RtlFieldInfo * field)
  1091. {
  1092. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  1093. collection.clear();
  1094. }
  1095. virtual void processEndRow(const RtlFieldInfo * field)
  1096. {
  1097. }
  1098. protected:
  1099. inline unsigned checkNextParam(const RtlFieldInfo * field)
  1100. {
  1101. if (logctx.queryTraceLevel() > 4)
  1102. logctx.CTXLOG("Binding %s to %d", str(field->name), thisParam);
  1103. return thisParam++;
  1104. }
  1105. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  1106. {
  1107. if (rc != CASS_OK)
  1108. {
  1109. failx("While binding parameter %s: %s", str(field->name), cass_error_desc(rc));
  1110. }
  1111. }
  1112. const RtlTypeInfo *typeInfo;
  1113. const CassandraStatementInfo *stmtInfo;
  1114. Owned<CassandraCollection> collection;
  1115. const IContextLogger &logctx;
  1116. int firstParam;
  1117. RtlFieldStrInfo dummyField;
  1118. int thisParam;
  1119. };
  1120. //
  1121. class CassandraDatasetBinder : public CassandraRecordBinder
  1122. {
  1123. public:
  1124. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  1125. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  1126. {
  1127. }
  1128. bool bindNext()
  1129. {
  1130. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1131. if (!nextRow)
  1132. return false;
  1133. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1134. return true;
  1135. }
  1136. void executeAll(CassandraStatementInfo *stmtInfo)
  1137. {
  1138. stmtInfo->startStream();
  1139. while (bindNext())
  1140. {
  1141. stmtInfo->execute();
  1142. }
  1143. stmtInfo->endStream();
  1144. }
  1145. protected:
  1146. Owned<IRowStream> input;
  1147. };
  1148. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  1149. // interrogated to return each row of the result in turn
  1150. class CassandraRowStream : public CInterfaceOf<IRowStream>
  1151. {
  1152. public:
  1153. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1154. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1155. {
  1156. executePending = true;
  1157. eof = false;
  1158. }
  1159. virtual const void *nextRow()
  1160. {
  1161. // A little complex when streaming data in as well as out - want to execute for every input record
  1162. if (eof)
  1163. return NULL;
  1164. loop
  1165. {
  1166. if (executePending)
  1167. {
  1168. executePending = false;
  1169. if (inputStream && !inputStream->bindNext())
  1170. {
  1171. noteEOF();
  1172. return NULL;
  1173. }
  1174. stmtInfo->execute();
  1175. }
  1176. if (stmtInfo->next())
  1177. break;
  1178. if (inputStream)
  1179. executePending = true;
  1180. else
  1181. {
  1182. noteEOF();
  1183. return NULL;
  1184. }
  1185. }
  1186. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1187. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1188. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1189. assertex(typeInfo);
  1190. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1191. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1192. return rowBuilder.finalizeRowClear(len);
  1193. }
  1194. virtual void stop()
  1195. {
  1196. resultAllocator.clear();
  1197. stmtInfo->stop();
  1198. }
  1199. protected:
  1200. void noteEOF()
  1201. {
  1202. if (!eof)
  1203. {
  1204. eof = true;
  1205. stop();
  1206. }
  1207. }
  1208. Linked<CassandraDatasetBinder> inputStream;
  1209. Linked<CassandraStatementInfo> stmtInfo;
  1210. Linked<IEngineRowAllocator> resultAllocator;
  1211. bool executePending;
  1212. bool eof;
  1213. };
  1214. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1215. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1216. {
  1217. public:
  1218. CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
  1219. : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
  1220. {
  1221. StringArray opts;
  1222. opts.appendList(options, ",");
  1223. hash64_t hash = 0;
  1224. ForEachItemInRev(idx, opts)
  1225. {
  1226. const char *opt = opts.item(idx);
  1227. if (strnicmp(opt, "batch=", 6)==0)
  1228. {
  1229. const char *val=opt+6;
  1230. if (stricmp(val, "LOGGED")==0)
  1231. batchMode = CASS_BATCH_TYPE_LOGGED;
  1232. else if (stricmp(val, "UNLOGGED")==0)
  1233. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  1234. else if (stricmp(val, "COUNTER")==0)
  1235. batchMode = CASS_BATCH_TYPE_COUNTER;
  1236. opts.remove(idx);
  1237. }
  1238. else if (strnicmp(opt, "pagesize=", 9)==0)
  1239. {
  1240. pageSize = atoi(opt+9);
  1241. opts.remove(idx);
  1242. }
  1243. else
  1244. hash = rtlHash64VStr(opt, hash);
  1245. }
  1246. cluster.setown(lookupCachedSession(hash, opts));
  1247. }
  1248. virtual bool getBooleanResult()
  1249. {
  1250. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1251. checkSingleRow();
  1252. return ret;
  1253. }
  1254. virtual void getDataResult(size32_t &len, void * &result)
  1255. {
  1256. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1257. checkSingleRow();
  1258. }
  1259. virtual double getRealResult()
  1260. {
  1261. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1262. checkSingleRow();
  1263. return ret;
  1264. }
  1265. virtual __int64 getSignedResult()
  1266. {
  1267. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1268. checkSingleRow();
  1269. return ret;
  1270. }
  1271. virtual unsigned __int64 getUnsignedResult()
  1272. {
  1273. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1274. checkSingleRow();
  1275. return ret;
  1276. }
  1277. virtual void getStringResult(size32_t &chars, char * &result)
  1278. {
  1279. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1280. checkSingleRow();
  1281. }
  1282. virtual void getUTF8Result(size32_t &chars, char * &result)
  1283. {
  1284. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1285. checkSingleRow();
  1286. }
  1287. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1288. {
  1289. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1290. checkSingleRow();
  1291. }
  1292. virtual void getDecimalResult(Decimal &value)
  1293. {
  1294. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1295. checkSingleRow();
  1296. }
  1297. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1298. {
  1299. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1300. rtlRowBuilder out;
  1301. byte *outData = NULL;
  1302. size32_t outBytes = 0;
  1303. while (cass_iterator_next(iterator))
  1304. {
  1305. const CassValue *value = cass_iterator_get_value(iterator);
  1306. assertex(value);
  1307. if (elemSize != UNKNOWN_LENGTH)
  1308. {
  1309. out.ensureAvailable(outBytes + elemSize);
  1310. outData = out.getbytes() + outBytes;
  1311. }
  1312. switch ((type_t) elemType)
  1313. {
  1314. case type_int:
  1315. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1316. break;
  1317. case type_unsigned:
  1318. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1319. break;
  1320. case type_real:
  1321. if (elemSize == sizeof(double))
  1322. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1323. else
  1324. {
  1325. assertex(elemSize == sizeof(float));
  1326. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1327. }
  1328. break;
  1329. case type_boolean:
  1330. assertex(elemSize == sizeof(bool));
  1331. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1332. break;
  1333. case type_string:
  1334. case type_varstring:
  1335. {
  1336. rtlDataAttr str;
  1337. size32_t lenBytes;
  1338. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1339. if (elemSize == UNKNOWN_LENGTH)
  1340. {
  1341. if (elemType == type_string)
  1342. {
  1343. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1344. outData = out.getbytes() + outBytes;
  1345. * (size32_t *) outData = lenBytes;
  1346. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1347. outBytes += lenBytes + sizeof(size32_t);
  1348. }
  1349. else
  1350. {
  1351. out.ensureAvailable(outBytes + lenBytes + 1);
  1352. outData = out.getbytes() + outBytes;
  1353. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1354. outBytes += lenBytes + 1;
  1355. }
  1356. }
  1357. else
  1358. {
  1359. if (elemType == type_string)
  1360. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1361. else
  1362. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1363. }
  1364. break;
  1365. }
  1366. case type_unicode:
  1367. case type_utf8:
  1368. {
  1369. rtlDataAttr str;
  1370. size32_t lenChars;
  1371. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1372. const char * text = str.getstr();
  1373. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1374. if (elemType == type_utf8)
  1375. {
  1376. assertex (elemSize == UNKNOWN_LENGTH);
  1377. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1378. outData = out.getbytes() + outBytes;
  1379. * (size32_t *) outData = lenChars;
  1380. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1381. outBytes += lenBytes + sizeof(size32_t);
  1382. }
  1383. else
  1384. {
  1385. if (elemSize == UNKNOWN_LENGTH)
  1386. {
  1387. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1388. size32_t numchars16;
  1389. rtlDataAttr unicode16;
  1390. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1391. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1392. outData = out.getbytes() + outBytes;
  1393. * (size32_t *) outData = numchars16;
  1394. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1395. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1396. }
  1397. else
  1398. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1399. }
  1400. break;
  1401. }
  1402. default:
  1403. fail("type mismatch - unsupported return type");
  1404. }
  1405. if (elemSize != UNKNOWN_LENGTH)
  1406. outBytes += elemSize;
  1407. }
  1408. __isAllResult = false;
  1409. __resultBytes = outBytes;
  1410. __result = out.detachdata();
  1411. }
  1412. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1413. {
  1414. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1415. }
  1416. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1417. {
  1418. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1419. typeError("row", NULL, NULL);
  1420. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1421. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1422. stream.stop();
  1423. if (ret == NULL) // Check for exactly one returned row
  1424. typeError("row", NULL, NULL);
  1425. return (byte *) ret.getClear();
  1426. }
  1427. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1428. {
  1429. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1430. typeError("row", NULL, NULL);
  1431. if (!stmtInfo->next())
  1432. fail("Failed to read row");
  1433. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1434. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1435. assertex(typeInfo);
  1436. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1437. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1438. }
  1439. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1440. {
  1441. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1442. binder.processRow(val);
  1443. nextParam += binder.numFields();
  1444. }
  1445. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1446. {
  1447. // We only support a single dataset parameter...
  1448. // MORE - look into batch?
  1449. if (inputStream)
  1450. {
  1451. fail("At most one dataset parameter supported");
  1452. }
  1453. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1454. nextParam += inputStream->numFields();
  1455. }
  1456. virtual void bindBooleanParam(const char *name, bool val)
  1457. {
  1458. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1459. }
  1460. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1461. {
  1462. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), (const cass_byte_t*) val, len), name);
  1463. }
  1464. virtual void bindFloatParam(const char *name, float val)
  1465. {
  1466. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1467. }
  1468. virtual void bindRealParam(const char *name, double val)
  1469. {
  1470. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1471. }
  1472. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1473. {
  1474. if (size > 4)
  1475. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1476. else
  1477. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1478. }
  1479. virtual void bindSignedParam(const char *name, __int64 val)
  1480. {
  1481. bindSignedSizeParam(name, 8, val);
  1482. }
  1483. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1484. {
  1485. UNSUPPORTED("UNSIGNED columns");
  1486. }
  1487. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1488. {
  1489. UNSUPPORTED("UNSIGNED columns");
  1490. }
  1491. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1492. {
  1493. size32_t utf8chars;
  1494. rtlDataAttr utfText;
  1495. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1496. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1497. checkNextParam(name),
  1498. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1499. name);
  1500. }
  1501. virtual void bindVStringParam(const char *name, const char *val)
  1502. {
  1503. bindStringParam(name, strlen(val), val);
  1504. }
  1505. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1506. {
  1507. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(name), val, rtlUtf8Size(chars, val)), name);
  1508. }
  1509. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1510. {
  1511. size32_t utf8chars;
  1512. rtlDataAttr utfText;
  1513. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1514. checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
  1515. checkNextParam(name),
  1516. utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
  1517. name);
  1518. }
  1519. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1520. {
  1521. if (isAll)
  1522. UNSUPPORTED("SET(ALL)");
  1523. type_t typecode = (type_t) elemType;
  1524. const byte *inData = (const byte *) setData;
  1525. const byte *endData = inData + totalBytes;
  1526. int numElems;
  1527. if (elemSize == UNKNOWN_LENGTH)
  1528. {
  1529. numElems = 0;
  1530. // Will need 2 passes to work out how many elements there are in the set :(
  1531. while (inData < endData)
  1532. {
  1533. int thisSize;
  1534. switch (elemType)
  1535. {
  1536. case type_varstring:
  1537. thisSize = strlen((const char *) inData) + 1;
  1538. break;
  1539. case type_string:
  1540. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1541. break;
  1542. case type_unicode:
  1543. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1544. break;
  1545. case type_utf8:
  1546. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1547. break;
  1548. default:
  1549. fail("Unsupported parameter type");
  1550. break;
  1551. }
  1552. inData += thisSize;
  1553. numElems++;
  1554. }
  1555. inData = (const byte *) setData;
  1556. }
  1557. else
  1558. numElems = totalBytes / elemSize;
  1559. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
  1560. while (inData < endData)
  1561. {
  1562. size32_t thisSize = elemSize;
  1563. CassError rc;
  1564. switch (typecode)
  1565. {
  1566. case type_int:
  1567. if (elemSize > 4)
  1568. rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
  1569. else
  1570. rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
  1571. break;
  1572. case type_unsigned:
  1573. UNSUPPORTED("UNSIGNED columns");
  1574. break;
  1575. case type_varstring:
  1576. {
  1577. size32_t numChars = strlen((const char *) inData);
  1578. if (elemSize == UNKNOWN_LENGTH)
  1579. thisSize = numChars + 1;
  1580. size32_t utf8chars;
  1581. rtlDataAttr utfText;
  1582. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1583. rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1584. break;
  1585. }
  1586. case type_string:
  1587. {
  1588. if (elemSize == UNKNOWN_LENGTH)
  1589. {
  1590. thisSize = * (size32_t *) inData;
  1591. inData += sizeof(size32_t);
  1592. }
  1593. size32_t utf8chars;
  1594. rtlDataAttr utfText;
  1595. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1596. rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
  1597. break;
  1598. }
  1599. case type_real:
  1600. if (elemSize == sizeof(double))
  1601. rc = cass_collection_append_double(collection, * (double *) inData);
  1602. else
  1603. rc = cass_collection_append_float(collection, * (float *) inData);
  1604. break;
  1605. case type_boolean:
  1606. assertex(elemSize == sizeof(bool));
  1607. rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
  1608. break;
  1609. case type_unicode:
  1610. {
  1611. if (elemSize == UNKNOWN_LENGTH)
  1612. {
  1613. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1614. inData += sizeof(size32_t);
  1615. }
  1616. unsigned unicodeChars;
  1617. rtlDataAttr unicode;
  1618. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1619. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1620. rc = cass_collection_append_string_n(collection, unicode.getstr(), sizeBytes);
  1621. break;
  1622. }
  1623. case type_utf8:
  1624. {
  1625. assertex (elemSize == UNKNOWN_LENGTH);
  1626. size32_t numChars = * (size32_t *) inData;
  1627. inData += sizeof(size32_t);
  1628. thisSize = rtlUtf8Size(numChars, inData);
  1629. rc = cass_collection_append_string_n(collection, (const char *) inData, thisSize);
  1630. break;
  1631. }
  1632. case type_data:
  1633. if (elemSize == UNKNOWN_LENGTH)
  1634. {
  1635. thisSize = * (size32_t *) inData;
  1636. inData += sizeof(size32_t);
  1637. }
  1638. rc = cass_collection_append_bytes(collection, (const cass_byte_t*) inData, thisSize);
  1639. break;
  1640. }
  1641. checkBind(rc, name);
  1642. inData += thisSize;
  1643. }
  1644. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1645. checkNextParam(name),
  1646. collection),
  1647. name);
  1648. }
  1649. virtual void importFunction(size32_t lenChars, const char *text)
  1650. {
  1651. throwUnexpected();
  1652. }
  1653. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1654. {
  1655. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1656. size32_t len = rtlUtf8Size(chars, _script);
  1657. queryString.set(_script, len);
  1658. const char *script = queryString.get(); // Now null terminated
  1659. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1660. {
  1661. loop
  1662. {
  1663. const char *nextScript = findUnquoted(script, ';');
  1664. if (!nextScript)
  1665. {
  1666. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1667. break;
  1668. }
  1669. CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
  1670. CassandraFuture future(cass_session_execute(cluster->querySession(), statement));
  1671. future.wait("execute statement");
  1672. script = nextScript;
  1673. }
  1674. }
  1675. else
  1676. {
  1677. if ((flags & EFnoparams) == 0)
  1678. numParams = countBindings(script);
  1679. else
  1680. numParams = 0;
  1681. stmtInfo.setown(cluster->createStatementInfo(script, numParams, batchMode, pageSize));
  1682. }
  1683. }
  1684. virtual void callFunction()
  1685. {
  1686. // Does not seem to be a way to check number of parameters expected...
  1687. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1688. // fail("Not enough parameters");
  1689. try
  1690. {
  1691. if (stmtInfo && !stmtInfo->hasResult())
  1692. lazyExecute();
  1693. }
  1694. catch (IException *E)
  1695. {
  1696. StringBuffer msg;
  1697. E->errorMessage(msg);
  1698. msg.appendf(" (processing query %s)", queryString.get());
  1699. throw makeStringException(E->errorCode(), msg);
  1700. }
  1701. }
  1702. protected:
  1703. void lazyExecute()
  1704. {
  1705. if (inputStream)
  1706. inputStream->executeAll(stmtInfo);
  1707. else
  1708. stmtInfo->execute();
  1709. }
  1710. const CassValue *getScalarResult()
  1711. {
  1712. if (!stmtInfo->next())
  1713. typeError("scalar", NULL, NULL);
  1714. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1715. typeError("scalar", NULL, NULL);
  1716. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1717. if (!result)
  1718. typeError("scalar", NULL, NULL);
  1719. return result;
  1720. }
  1721. void checkSingleRow()
  1722. {
  1723. if (stmtInfo->rowCount() != 1)
  1724. typeError("scalar", NULL, NULL);
  1725. }
  1726. unsigned countBindings(const char *query)
  1727. {
  1728. unsigned queryCount = 0;
  1729. while ((query = findUnquoted(query, '?')) != NULL)
  1730. queryCount++;
  1731. return queryCount;
  1732. }
  1733. const char *findUnquoted(const char *query, char searchFor)
  1734. {
  1735. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1736. char inStr = '\0';
  1737. char ch;
  1738. while ((ch = *query++) != 0)
  1739. {
  1740. if (ch == inStr)
  1741. inStr = false;
  1742. else switch (ch)
  1743. {
  1744. case '\'':
  1745. case '"':
  1746. inStr = ch;
  1747. break;
  1748. case '\\':
  1749. if (inStr && *query)
  1750. query++;
  1751. break;
  1752. case '/':
  1753. if (!inStr)
  1754. {
  1755. if (*query=='/')
  1756. {
  1757. while (*query && *query != '\n')
  1758. query++;
  1759. }
  1760. else if (*query=='*')
  1761. {
  1762. query++;
  1763. loop
  1764. {
  1765. if (!*query)
  1766. fail("Unterminated comment in query string");
  1767. if (*query=='*' && query[1]=='/')
  1768. {
  1769. query+= 2;
  1770. break;
  1771. }
  1772. query++;
  1773. }
  1774. }
  1775. }
  1776. break;
  1777. default:
  1778. if (!inStr && ch==searchFor)
  1779. return query;
  1780. break;
  1781. }
  1782. }
  1783. return NULL;
  1784. }
  1785. inline unsigned checkNextParam(const char *name)
  1786. {
  1787. if (nextParam == numParams)
  1788. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1789. return nextParam++;
  1790. }
  1791. inline void checkBind(CassError rc, const char *name)
  1792. {
  1793. if (rc != CASS_OK)
  1794. {
  1795. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1796. }
  1797. }
  1798. Owned<CassandraClusterSession> cluster;
  1799. Owned<CassandraStatementInfo> stmtInfo;
  1800. Owned<CassandraDatasetBinder> inputStream;
  1801. const IContextLogger &logctx;
  1802. unsigned flags;
  1803. unsigned nextParam;
  1804. unsigned numParams;
  1805. StringAttr queryString;
  1806. CassBatchType batchMode;
  1807. unsigned pageSize;
  1808. };
  1809. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1810. {
  1811. public:
  1812. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1813. {
  1814. return createFunctionContextEx(NULL, flags, options);
  1815. }
  1816. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1817. {
  1818. if (flags & EFimport)
  1819. UNSUPPORTED("IMPORT");
  1820. else
  1821. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
  1822. }
  1823. };
  1824. extern IEmbedContext* getEmbedContext()
  1825. {
  1826. return new CassandraEmbedContext();
  1827. }
  1828. extern bool syntaxCheck(const char *script)
  1829. {
  1830. return true; // MORE
  1831. }
  1832. } // namespace