cassandraembed.cpp 112 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jptree.hpp"
  28. #include "workunit.hpp"
  29. #include "workunit.ipp"
  30. #ifdef _WIN32
  31. #define EXPORT __declspec(dllexport)
  32. #else
  33. #define EXPORT
  34. #endif
  35. static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  36. static void UNSUPPORTED(const char *feature)
  37. {
  38. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in Cassandra plugin", feature);
  39. }
  40. static const char * compatibleVersions[] = {
  41. "Cassandra Embed Helper 1.0.0",
  42. NULL };
  43. static const char *version = "Cassandra Embed Helper 1.0.0";
  44. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  45. {
  46. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  47. {
  48. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  49. pbx->compatibleVersions = compatibleVersions;
  50. }
  51. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  52. return false;
  53. pb->magicVersion = PLUGIN_VERSION;
  54. pb->version = version;
  55. pb->moduleName = "cassandra";
  56. pb->ECL = NULL;
  57. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  58. pb->description = "Cassandra Embed Helper";
  59. return true;
  60. }
  61. namespace cassandraembed {
  62. static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  63. static void fail(const char *msg) __attribute__((noreturn));
  64. static void failx(const char *message, ...)
  65. {
  66. va_list args;
  67. va_start(args,message);
  68. StringBuffer msg;
  69. msg.append("cassandra: ").valist_appendf(message,args);
  70. va_end(args);
  71. rtlFail(0, msg.str());
  72. }
  73. static void fail(const char *message)
  74. {
  75. StringBuffer msg;
  76. msg.append("cassandra: ").append(message);
  77. rtlFail(0, msg.str());
  78. }
  79. // Wrappers to Cassandra structures that require corresponding releases
  80. class CassandraCluster : public CInterface
  81. {
  82. public:
  83. CassandraCluster(CassCluster *_cluster) : cluster(_cluster)
  84. {
  85. }
  86. ~CassandraCluster()
  87. {
  88. if (cluster)
  89. cass_cluster_free(cluster);
  90. }
  91. inline operator CassCluster *() const
  92. {
  93. return cluster;
  94. }
  95. private:
  96. CassandraCluster(const CassandraCluster &);
  97. CassCluster *cluster;
  98. };
  99. class CassandraFuture : public CInterface
  100. {
  101. public:
  102. CassandraFuture(CassFuture *_future) : future(_future)
  103. {
  104. }
  105. ~CassandraFuture()
  106. {
  107. if (future)
  108. cass_future_free(future);
  109. }
  110. inline operator CassFuture *() const
  111. {
  112. return future;
  113. }
  114. void wait(const char *why)
  115. {
  116. cass_future_wait(future);
  117. CassError rc = cass_future_error_code(future);
  118. if(rc != CASS_OK)
  119. {
  120. CassString message = cass_future_error_message(future);
  121. VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int)message.length, message.data);
  122. rtlFail(0, err.str());
  123. }
  124. }
  125. private:
  126. CassandraFuture(const CassandraFuture &);
  127. CassFuture *future;
  128. };
  129. class CassandraSession : public CInterface
  130. {
  131. public:
  132. CassandraSession() : session(NULL) {}
  133. CassandraSession(CassSession *_session) : session(_session)
  134. {
  135. }
  136. ~CassandraSession()
  137. {
  138. set(NULL);
  139. }
  140. void set(CassSession *_session)
  141. {
  142. if (session)
  143. {
  144. CassandraFuture close_future(cass_session_close(session));
  145. cass_future_wait(close_future);
  146. cass_session_free(session);
  147. }
  148. session = _session;
  149. }
  150. inline operator CassSession *() const
  151. {
  152. return session;
  153. }
  154. private:
  155. CassandraSession(const CassandraSession &);
  156. CassSession *session;
  157. };
  158. class CassandraBatch : public CInterface
  159. {
  160. public:
  161. CassandraBatch(CassBatch *_batch) : batch(_batch)
  162. {
  163. }
  164. ~CassandraBatch()
  165. {
  166. if (batch)
  167. cass_batch_free(batch);
  168. }
  169. inline operator CassBatch *() const
  170. {
  171. return batch;
  172. }
  173. private:
  174. CassandraBatch(const CassandraBatch &);
  175. CassBatch *batch;
  176. };
  177. class CassandraStatement : public CInterface
  178. {
  179. public:
  180. CassandraStatement(CassStatement *_statement) : statement(_statement)
  181. {
  182. }
  183. ~CassandraStatement()
  184. {
  185. if (statement)
  186. cass_statement_free(statement);
  187. }
  188. inline operator CassStatement *() const
  189. {
  190. return statement;
  191. }
  192. private:
  193. CassandraStatement(const CassandraStatement &);
  194. CassStatement *statement;
  195. };
  196. class CassandraPrepared : public CInterface
  197. {
  198. public:
  199. CassandraPrepared(const CassPrepared *_prepared) : prepared(_prepared)
  200. {
  201. }
  202. ~CassandraPrepared()
  203. {
  204. if (prepared)
  205. cass_prepared_free(prepared);
  206. }
  207. inline operator const CassPrepared *() const
  208. {
  209. return prepared;
  210. }
  211. private:
  212. CassandraPrepared(const CassandraPrepared &);
  213. const CassPrepared *prepared;
  214. };
  215. class CassandraResult : public CInterface
  216. {
  217. public:
  218. CassandraResult(const CassResult *_result) : result(_result)
  219. {
  220. }
  221. ~CassandraResult()
  222. {
  223. if (result)
  224. cass_result_free(result);
  225. }
  226. inline operator const CassResult *() const
  227. {
  228. return result;
  229. }
  230. private:
  231. CassandraResult(const CassandraResult &);
  232. const CassResult *result;
  233. };
  234. class CassandraIterator : public CInterface
  235. {
  236. public:
  237. CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
  238. {
  239. }
  240. ~CassandraIterator()
  241. {
  242. if (iterator)
  243. cass_iterator_free(iterator);
  244. }
  245. inline operator CassIterator *() const
  246. {
  247. return iterator;
  248. }
  249. private:
  250. CassandraIterator(const CassandraIterator &);
  251. CassIterator *iterator;
  252. };
  253. class CassandraCollection : public CInterface
  254. {
  255. public:
  256. CassandraCollection(CassCollection *_collection) : collection(_collection)
  257. {
  258. }
  259. ~CassandraCollection()
  260. {
  261. if (collection)
  262. cass_collection_free(collection);
  263. }
  264. inline operator CassCollection *() const
  265. {
  266. return collection;
  267. }
  268. private:
  269. CassandraCollection(const CassandraCollection &);
  270. CassCollection *collection;
  271. };
  272. void check(CassError rc)
  273. {
  274. if (rc != CASS_OK)
  275. {
  276. fail(cass_error_desc(rc));
  277. }
  278. }
  279. class CassandraStatementInfo : public CInterface
  280. {
  281. public:
  282. IMPLEMENT_IINTERFACE;
  283. CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode)
  284. : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
  285. {
  286. assertex(prepared && *prepared);
  287. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  288. }
  289. ~CassandraStatementInfo()
  290. {
  291. stop();
  292. }
  293. inline void stop()
  294. {
  295. iterator.clear();
  296. result.clear();
  297. prepared.clear();
  298. }
  299. bool next()
  300. {
  301. if (!iterator)
  302. return false;
  303. return cass_iterator_next(*iterator);
  304. }
  305. void startStream()
  306. {
  307. if (batchMode != (CassBatchType) -1)
  308. {
  309. batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
  310. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  311. }
  312. }
  313. void endStream()
  314. {
  315. if (batch)
  316. {
  317. CassandraFuture future(cass_session_execute_batch(*session, *batch));
  318. future.wait("execute");
  319. result.setown(new CassandraResult(cass_future_get_result(future)));
  320. assertex (rowCount() == 0);
  321. }
  322. }
  323. void execute()
  324. {
  325. assertex(statement && *statement);
  326. if (batch)
  327. {
  328. check(cass_batch_add_statement(*batch, *statement));
  329. statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
  330. }
  331. else
  332. {
  333. CassandraFuture future(cass_session_execute(*session, *statement));
  334. future.wait("execute");
  335. result.setown(new CassandraResult(cass_future_get_result(future)));
  336. if (rowCount() > 0)
  337. iterator.setown(new CassandraIterator(cass_iterator_from_result(*result)));
  338. }
  339. }
  340. inline size_t rowCount() const
  341. {
  342. return cass_result_row_count(*result);
  343. }
  344. inline bool hasResult() const
  345. {
  346. return result != NULL;
  347. }
  348. inline const CassRow *queryRow() const
  349. {
  350. assertex(iterator && *iterator);
  351. return cass_iterator_get_row(*iterator);
  352. }
  353. inline CassStatement *queryStatement() const
  354. {
  355. assertex(statement && *statement);
  356. return *statement;
  357. }
  358. protected:
  359. Linked<CassandraSession> session;
  360. Linked<CassandraPrepared> prepared;
  361. Owned<CassandraBatch> batch;
  362. Owned<CassandraStatement> statement;
  363. Owned<CassandraResult> result;
  364. Owned<CassandraIterator> iterator;
  365. unsigned numBindings;
  366. CassBatchType(batchMode);
  367. };
  368. // Conversions from Cassandra values to ECL data
  369. static const char *getTypeName(CassValueType type)
  370. {
  371. switch (type)
  372. {
  373. case CASS_VALUE_TYPE_CUSTOM: return "CUSTOM";
  374. case CASS_VALUE_TYPE_ASCII: return "ASCII";
  375. case CASS_VALUE_TYPE_BIGINT: return "BIGINT";
  376. case CASS_VALUE_TYPE_BLOB: return "BLOB";
  377. case CASS_VALUE_TYPE_BOOLEAN: return "BOOLEAN";
  378. case CASS_VALUE_TYPE_COUNTER: return "COUNTER";
  379. case CASS_VALUE_TYPE_DECIMAL: return "DECIMAL";
  380. case CASS_VALUE_TYPE_DOUBLE: return "DOUBLE";
  381. case CASS_VALUE_TYPE_FLOAT: return "FLOAT";
  382. case CASS_VALUE_TYPE_INT: return "INT";
  383. case CASS_VALUE_TYPE_TEXT: return "TEXT";
  384. case CASS_VALUE_TYPE_TIMESTAMP: return "TIMESTAMP";
  385. case CASS_VALUE_TYPE_UUID: return "UUID";
  386. case CASS_VALUE_TYPE_VARCHAR: return "VARCHAR";
  387. case CASS_VALUE_TYPE_VARINT: return "VARINT";
  388. case CASS_VALUE_TYPE_TIMEUUID: return "TIMEUUID";
  389. case CASS_VALUE_TYPE_INET: return "INET";
  390. case CASS_VALUE_TYPE_LIST: return "LIST";
  391. case CASS_VALUE_TYPE_MAP: return "MAP";
  392. case CASS_VALUE_TYPE_SET: return "SET";
  393. default: return "UNKNOWN";
  394. }
  395. }
  396. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field) __attribute__((noreturn));
  397. static void typeError(const char *expected, const CassValue *value, const RtlFieldInfo *field)
  398. {
  399. VStringBuffer msg("cassandra: type mismatch - %s expected", expected);
  400. if (field)
  401. msg.appendf(" for field %s", field->name->str());
  402. if (value)
  403. msg.appendf(", received %s", getTypeName(cass_value_type(value)));
  404. rtlFail(0, msg.str());
  405. }
  406. static bool isInteger(const CassValueType t)
  407. {
  408. switch (t)
  409. {
  410. case CASS_VALUE_TYPE_TIMESTAMP:
  411. case CASS_VALUE_TYPE_INT:
  412. case CASS_VALUE_TYPE_BIGINT:
  413. case CASS_VALUE_TYPE_COUNTER:
  414. case CASS_VALUE_TYPE_VARINT:
  415. return true;
  416. default:
  417. return false;
  418. }
  419. }
  420. static bool isString(CassValueType t)
  421. {
  422. switch (t)
  423. {
  424. case CASS_VALUE_TYPE_VARCHAR:
  425. case CASS_VALUE_TYPE_TEXT:
  426. case CASS_VALUE_TYPE_ASCII:
  427. return true;
  428. default:
  429. return false;
  430. }
  431. }
  432. // when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
  433. static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
  434. {
  435. const RtlTypeInfo *type = field->type;
  436. if ((type->fieldType & RFTMkind) == type_set)
  437. return type->queryChildType();
  438. else
  439. return type;
  440. }
  441. static int getNumFields(const RtlTypeInfo *record)
  442. {
  443. int count = 0;
  444. const RtlFieldInfo * const *fields = record->queryFields();
  445. assertex(fields);
  446. while (*fields++)
  447. count++;
  448. return count;
  449. }
  450. static bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
  451. {
  452. if (cass_value_is_null(value))
  453. {
  454. NullFieldProcessor p(field);
  455. return p.boolResult;
  456. }
  457. if (cass_value_type(value) != CASS_VALUE_TYPE_BOOLEAN)
  458. typeError("boolean", value, field);
  459. cass_bool_t output;
  460. check(cass_value_get_bool(value, &output));
  461. return output != cass_false;
  462. }
  463. static void getDataResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, void * &result)
  464. {
  465. if (cass_value_is_null(value))
  466. {
  467. NullFieldProcessor p(field);
  468. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  469. return;
  470. }
  471. // We COULD require that the field being retrieved is a blob - but Cassandra seems happy to use any field here, and
  472. // it seems like it could be more useful to support anything
  473. // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
  474. // typeError("blob", value, field);
  475. CassBytes bytes;
  476. check(cass_value_get_bytes(value, &bytes));
  477. rtlStrToDataX(chars, result, bytes.size, bytes.data);
  478. }
  479. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
  480. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value);
  481. static double getRealResult(const RtlFieldInfo *field, const CassValue *value)
  482. {
  483. if (cass_value_is_null(value))
  484. {
  485. NullFieldProcessor p(field);
  486. return p.doubleResult;
  487. }
  488. else if (isInteger(cass_value_type(value)))
  489. return (double) getSignedResult(field, value);
  490. else switch (cass_value_type(value))
  491. {
  492. case CASS_VALUE_TYPE_FLOAT:
  493. {
  494. cass_float_t output_f;
  495. check(cass_value_get_float(value, &output_f));
  496. return output_f;
  497. }
  498. case CASS_VALUE_TYPE_DOUBLE:
  499. {
  500. cass_double_t output_d;
  501. check(cass_value_get_double(value, &output_d));
  502. return output_d;
  503. }
  504. default:
  505. typeError("double", value, field);
  506. }
  507. }
  508. static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value)
  509. {
  510. if (cass_value_is_null(value))
  511. {
  512. NullFieldProcessor p(field);
  513. return p.intResult;
  514. }
  515. switch (cass_value_type(value))
  516. {
  517. case CASS_VALUE_TYPE_INT:
  518. {
  519. cass_int32_t output;
  520. check(cass_value_get_int32(value, &output));
  521. return output;
  522. }
  523. case CASS_VALUE_TYPE_TIMESTAMP:
  524. case CASS_VALUE_TYPE_BIGINT:
  525. case CASS_VALUE_TYPE_COUNTER:
  526. case CASS_VALUE_TYPE_VARINT:
  527. {
  528. cass_int64_t output;
  529. check(cass_value_get_int64(value, &output));
  530. return output;
  531. }
  532. default:
  533. typeError("integer", value, field);
  534. }
  535. }
  536. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const CassValue *value)
  537. {
  538. if (cass_value_is_null(value))
  539. {
  540. NullFieldProcessor p(field);
  541. return p.uintResult;
  542. }
  543. return (__uint64) getSignedResult(field, value);
  544. }
  545. static void getStringResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  546. {
  547. if (cass_value_is_null(value))
  548. {
  549. NullFieldProcessor p(field);
  550. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  551. return;
  552. }
  553. switch (cass_value_type(value))
  554. {
  555. case CASS_VALUE_TYPE_ASCII:
  556. {
  557. CassString output;
  558. check(cass_value_get_string(value, &output));
  559. const char *text = output.data;
  560. unsigned long bytes = output.length;
  561. rtlStrToStrX(chars, result, bytes, text);
  562. break;
  563. }
  564. case CASS_VALUE_TYPE_VARCHAR:
  565. case CASS_VALUE_TYPE_TEXT:
  566. {
  567. CassString output;
  568. check(cass_value_get_string(value, &output));
  569. const char *text = output.data;
  570. unsigned long bytes = output.length;
  571. unsigned numchars = rtlUtf8Length(bytes, text);
  572. rtlUtf8ToStrX(chars, result, numchars, text);
  573. break;
  574. }
  575. default:
  576. typeError("string", value, field);
  577. }
  578. }
  579. static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, char * &result)
  580. {
  581. if (cass_value_is_null(value))
  582. {
  583. NullFieldProcessor p(field);
  584. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  585. return;
  586. }
  587. switch (cass_value_type(value))
  588. {
  589. case CASS_VALUE_TYPE_ASCII:
  590. {
  591. CassString output;
  592. check(cass_value_get_string(value, &output));
  593. const char *text = output.data;
  594. unsigned long bytes = output.length;
  595. rtlStrToUtf8X(chars, result, bytes, text);
  596. break;
  597. }
  598. case CASS_VALUE_TYPE_VARCHAR:
  599. case CASS_VALUE_TYPE_TEXT:
  600. {
  601. CassString output;
  602. check(cass_value_get_string(value, &output));
  603. const char *text = output.data;
  604. unsigned long bytes = output.length;
  605. unsigned numchars = rtlUtf8Length(bytes, text);
  606. rtlUtf8ToUtf8X(chars, result, numchars, text);
  607. break;
  608. }
  609. default:
  610. typeError("string", value, field);
  611. }
  612. }
  613. static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value, size32_t &chars, UChar * &result)
  614. {
  615. if (cass_value_is_null(value))
  616. {
  617. NullFieldProcessor p(field);
  618. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  619. return;
  620. }
  621. switch (cass_value_type(value))
  622. {
  623. case CASS_VALUE_TYPE_ASCII:
  624. {
  625. CassString output;
  626. check(cass_value_get_string(value, &output));
  627. const char *text = output.data;
  628. unsigned long bytes = output.length;
  629. rtlStrToUnicodeX(chars, result, bytes, text);
  630. break;
  631. }
  632. case CASS_VALUE_TYPE_VARCHAR:
  633. case CASS_VALUE_TYPE_TEXT:
  634. {
  635. CassString output;
  636. check(cass_value_get_string(value, &output));
  637. const char *text = output.data;
  638. unsigned long bytes = output.length;
  639. unsigned numchars = rtlUtf8Length(bytes, text);
  640. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  641. break;
  642. }
  643. default:
  644. typeError("string", value, field);
  645. }
  646. }
  647. static void getDecimalResult(const RtlFieldInfo *field, const CassValue *value, Decimal &result)
  648. {
  649. // Note - Cassandra has a decimal type, but it's not particularly similar to the ecl one. Map to string for now, as we do in MySQL
  650. if (cass_value_is_null(value))
  651. {
  652. NullFieldProcessor p(field);
  653. result.set(p.decimalResult);
  654. return;
  655. }
  656. size32_t chars;
  657. rtlDataAttr tempStr;
  658. cassandraembed::getStringResult(field, value, chars, tempStr.refstr());
  659. result.setString(chars, tempStr.getstr());
  660. if (field)
  661. {
  662. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  663. result.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  664. }
  665. }
  666. // A CassandraRowBuilder object is used to construct an ECL row from a Cassandra row
  667. class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
  668. {
  669. public:
  670. CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
  671. : stmtInfo(_stmtInfo), colIdx(0), numIteratorFields(0), nextIteratedField(0)
  672. {
  673. }
  674. virtual bool getBooleanResult(const RtlFieldInfo *field)
  675. {
  676. return cassandraembed::getBooleanResult(field, nextField(field));
  677. }
  678. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  679. {
  680. cassandraembed::getDataResult(field, nextField(field), len, result);
  681. }
  682. virtual double getRealResult(const RtlFieldInfo *field)
  683. {
  684. return cassandraembed::getRealResult(field, nextField(field));
  685. }
  686. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  687. {
  688. return cassandraembed::getSignedResult(field, nextField(field));
  689. }
  690. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  691. {
  692. return cassandraembed::getUnsignedResult(field, nextField(field));
  693. }
  694. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  695. {
  696. cassandraembed::getStringResult(field, nextField(field), chars, result);
  697. }
  698. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  699. {
  700. cassandraembed::getUTF8Result(field, nextField(field), chars, result);
  701. }
  702. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  703. {
  704. cassandraembed::getUnicodeResult(field, nextField(field), chars, result);
  705. }
  706. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  707. {
  708. cassandraembed::getDecimalResult(field, nextField(field), value);
  709. }
  710. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  711. {
  712. isAll = false;
  713. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  714. }
  715. virtual bool processNextSet(const RtlFieldInfo * field)
  716. {
  717. numIteratorFields = 1;
  718. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
  719. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  720. }
  721. virtual void processBeginDataset(const RtlFieldInfo * field)
  722. {
  723. numIteratorFields = getNumFields(field->type->queryChildType());
  724. switch (numIteratorFields)
  725. {
  726. case 1:
  727. iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
  728. break;
  729. case 2:
  730. iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
  731. break;
  732. default:
  733. UNSUPPORTED("Nested datasets with > 2 fields");
  734. }
  735. }
  736. virtual void processBeginRow(const RtlFieldInfo * field)
  737. {
  738. }
  739. virtual bool processNextRow(const RtlFieldInfo * field)
  740. {
  741. nextIteratedField = 0;
  742. return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
  743. // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
  744. }
  745. virtual void processEndSet(const RtlFieldInfo * field)
  746. {
  747. iterator.clear();
  748. numIteratorFields = 0;
  749. }
  750. virtual void processEndDataset(const RtlFieldInfo * field)
  751. {
  752. iterator.clear();
  753. numIteratorFields = 0;
  754. }
  755. virtual void processEndRow(const RtlFieldInfo * field)
  756. {
  757. }
  758. protected:
  759. const CassValue *nextField(const RtlFieldInfo * field)
  760. {
  761. const CassValue *ret;
  762. if (iterator)
  763. {
  764. switch (numIteratorFields)
  765. {
  766. case 1:
  767. ret = cass_iterator_get_value(*iterator);
  768. break;
  769. case 2:
  770. if (nextIteratedField==0)
  771. ret = cass_iterator_get_map_key(*iterator);
  772. else
  773. ret = cass_iterator_get_map_value(*iterator);
  774. nextIteratedField++;
  775. break;
  776. default:
  777. throwUnexpected();
  778. }
  779. }
  780. else
  781. ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
  782. if (!ret)
  783. failx("Too many fields in ECL output row, reading field %s", field->name->getAtomNamePtr());
  784. return ret;
  785. }
  786. const CassandraStatementInfo *stmtInfo;
  787. Owned<CassandraIterator> iterator;
  788. int colIdx;
  789. int numIteratorFields;
  790. int nextIteratedField;
  791. };
  792. // Bind Cassandra columns from an ECL record
  793. class CassandraRecordBinder : public CInterfaceOf<IFieldProcessor>
  794. {
  795. public:
  796. CassandraRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmtInfo, int _firstParam)
  797. : logctx(_logctx), typeInfo(_typeInfo), stmtInfo(_stmtInfo), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  798. {
  799. }
  800. int numFields()
  801. {
  802. int count = 0;
  803. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  804. assertex(fields);
  805. while (*fields++)
  806. count++;
  807. return count;
  808. }
  809. void processRow(const byte *row)
  810. {
  811. thisParam = firstParam;
  812. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  813. }
  814. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  815. {
  816. size32_t utf8chars;
  817. rtlDataAttr utfText;
  818. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
  819. if (collection)
  820. checkBind(cass_collection_append_string(*collection,
  821. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  822. field);
  823. else
  824. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  825. checkNextParam(field),
  826. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  827. field);
  828. }
  829. virtual void processBool(bool value, const RtlFieldInfo * field)
  830. {
  831. if (collection)
  832. checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
  833. else
  834. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
  835. }
  836. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  837. {
  838. if (collection)
  839. checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
  840. else
  841. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
  842. }
  843. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  844. {
  845. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  846. {
  847. if (collection)
  848. checkBind(cass_collection_append_int64(*collection, value), field);
  849. else
  850. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  851. }
  852. else
  853. {
  854. if (collection)
  855. checkBind(cass_collection_append_int32(*collection, value), field);
  856. else
  857. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  858. }
  859. }
  860. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  861. {
  862. UNSUPPORTED("UNSIGNED columns");
  863. }
  864. virtual void processReal(double value, const RtlFieldInfo * field)
  865. {
  866. if (getFieldBaseType(field)->size(NULL,NULL)>4)
  867. {
  868. if (collection)
  869. checkBind(cass_collection_append_double(*collection, value), field);
  870. else
  871. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
  872. }
  873. else
  874. {
  875. if (collection)
  876. checkBind(cass_collection_append_float(*collection, (float) value), field);
  877. else
  878. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
  879. }
  880. }
  881. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  882. {
  883. Decimal val;
  884. size32_t bytes;
  885. rtlDataAttr decText;
  886. val.setDecimal(digits, precision, value);
  887. val.getStringX(bytes, decText.refstr());
  888. processUtf8(bytes, decText.getstr(), field);
  889. }
  890. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  891. {
  892. UNSUPPORTED("UNSIGNED decimals");
  893. }
  894. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  895. {
  896. size32_t utf8chars;
  897. rtlDataAttr utfText;
  898. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
  899. if (collection)
  900. checkBind(cass_collection_append_string(*collection,
  901. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  902. field);
  903. else
  904. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  905. checkNextParam(field),
  906. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  907. field);
  908. }
  909. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  910. {
  911. size32_t charCount;
  912. rtlDataAttr text;
  913. rtlQStrToStrX(charCount, text.refstr(), len, value);
  914. processUtf8(charCount, text.getstr(), field);
  915. }
  916. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  917. {
  918. if (collection)
  919. checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  920. else
  921. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
  922. }
  923. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  924. {
  925. if (isAll)
  926. UNSUPPORTED("SET(ALL)");
  927. collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
  928. return true;
  929. }
  930. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  931. {
  932. // If there's a single field, assume we are mapping to a SET/LIST
  933. // If there are two, assume it's a MAP
  934. // Otherwise, fail
  935. int numFields = getNumFields(field->type->queryChildType());
  936. if (numFields < 1 || numFields > 2)
  937. {
  938. UNSUPPORTED("Nested datasets with > 2 fields");
  939. }
  940. collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
  941. return true;
  942. }
  943. virtual bool processBeginRow(const RtlFieldInfo * field)
  944. {
  945. return true;
  946. }
  947. virtual void processEndSet(const RtlFieldInfo * field)
  948. {
  949. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  950. collection.clear();
  951. }
  952. virtual void processEndDataset(const RtlFieldInfo * field)
  953. {
  954. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
  955. collection.clear();
  956. }
  957. virtual void processEndRow(const RtlFieldInfo * field)
  958. {
  959. }
  960. protected:
  961. inline unsigned checkNextParam(const RtlFieldInfo * field)
  962. {
  963. if (logctx.queryTraceLevel() > 4)
  964. logctx.CTXLOG("Binding %s to %d", field->name->str(), thisParam);
  965. return thisParam++;
  966. }
  967. inline void checkBind(CassError rc, const RtlFieldInfo * field)
  968. {
  969. if (rc != CASS_OK)
  970. {
  971. failx("While binding parameter %s: %s", field->name->getAtomNamePtr(), cass_error_desc(rc));
  972. }
  973. }
  974. const RtlTypeInfo *typeInfo;
  975. const CassandraStatementInfo *stmtInfo;
  976. Owned<CassandraCollection> collection;
  977. const IContextLogger &logctx;
  978. int firstParam;
  979. RtlFieldStrInfo dummyField;
  980. int thisParam;
  981. };
  982. //
  983. class CassandraDatasetBinder : public CassandraRecordBinder
  984. {
  985. public:
  986. CassandraDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, const CassandraStatementInfo *_stmt, int _firstParam)
  987. : input(_input), CassandraRecordBinder(_logctx, _typeInfo, _stmt, _firstParam)
  988. {
  989. }
  990. bool bindNext()
  991. {
  992. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  993. if (!nextRow)
  994. return false;
  995. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  996. return true;
  997. }
  998. void executeAll(CassandraStatementInfo *stmtInfo)
  999. {
  1000. stmtInfo->startStream();
  1001. while (bindNext())
  1002. {
  1003. stmtInfo->execute();
  1004. }
  1005. stmtInfo->endStream();
  1006. }
  1007. protected:
  1008. Owned<IRowStream> input;
  1009. };
  1010. // A Cassandra function that returns a dataset will return a CassandraRowStream object that can be
  1011. // interrogated to return each row of the result in turn
  1012. class CassandraRowStream : public CInterfaceOf<IRowStream>
  1013. {
  1014. public:
  1015. CassandraRowStream(CassandraDatasetBinder *_inputStream, CassandraStatementInfo *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1016. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1017. {
  1018. executePending = true;
  1019. eof = false;
  1020. }
  1021. virtual const void *nextRow()
  1022. {
  1023. // A little complex when streaming data in as well as out - want to execute for every input record
  1024. if (eof)
  1025. return NULL;
  1026. loop
  1027. {
  1028. if (executePending)
  1029. {
  1030. executePending = false;
  1031. if (inputStream && !inputStream->bindNext())
  1032. {
  1033. noteEOF();
  1034. return NULL;
  1035. }
  1036. stmtInfo->execute();
  1037. }
  1038. if (stmtInfo->next())
  1039. break;
  1040. if (inputStream)
  1041. executePending = true;
  1042. else
  1043. {
  1044. noteEOF();
  1045. return NULL;
  1046. }
  1047. }
  1048. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1049. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1050. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1051. assertex(typeInfo);
  1052. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1053. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1054. return rowBuilder.finalizeRowClear(len);
  1055. }
  1056. virtual void stop()
  1057. {
  1058. resultAllocator.clear();
  1059. stmtInfo->stop();
  1060. }
  1061. protected:
  1062. void noteEOF()
  1063. {
  1064. if (!eof)
  1065. {
  1066. eof = true;
  1067. stop();
  1068. }
  1069. }
  1070. Linked<CassandraDatasetBinder> inputStream;
  1071. Linked<CassandraStatementInfo> stmtInfo;
  1072. Linked<IEngineRowAllocator> resultAllocator;
  1073. bool executePending;
  1074. bool eof;
  1075. };
  1076. // Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
  1077. class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1078. {
  1079. public:
  1080. CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
  1081. : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
  1082. {
  1083. cluster.setown(new CassandraCluster(cass_cluster_new()));
  1084. const char *contact_points = "localhost";
  1085. const char *user = "";
  1086. const char *password = "";
  1087. const char *keyspace = "";
  1088. StringArray opts;
  1089. opts.appendList(options, ",");
  1090. ForEachItemIn(idx, opts)
  1091. {
  1092. const char *opt = opts.item(idx);
  1093. const char *val = strchr(opt, '=');
  1094. if (val)
  1095. {
  1096. StringBuffer optName(val-opt, opt);
  1097. val++;
  1098. if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
  1099. contact_points = val; // Note that lifetime of val is adequate for this to be safe
  1100. else if (stricmp(optName, "user")==0)
  1101. user = val;
  1102. else if (stricmp(optName, "password")==0)
  1103. password = val;
  1104. else if (stricmp(optName, "keyspace")==0)
  1105. keyspace = val;
  1106. else if (stricmp(optName, "batch")==0)
  1107. {
  1108. if (stricmp(val, "LOGGED")==0)
  1109. batchMode = CASS_BATCH_TYPE_LOGGED;
  1110. else if (stricmp(val, "UNLOGGED")==0)
  1111. batchMode = CASS_BATCH_TYPE_UNLOGGED;
  1112. else if (stricmp(val, "COUNTER")==0)
  1113. batchMode = CASS_BATCH_TYPE_COUNTER;
  1114. }
  1115. else if (stricmp(optName, "port")==0)
  1116. {
  1117. unsigned port = getUnsignedOption(val, "port");
  1118. checkSetOption(cass_cluster_set_port(*cluster, port), "port");
  1119. }
  1120. else if (stricmp(optName, "protocol_version")==0)
  1121. {
  1122. unsigned protocol_version = getUnsignedOption(val, "protocol_version");
  1123. checkSetOption(cass_cluster_set_protocol_version(*cluster, protocol_version), "protocol_version");
  1124. }
  1125. else if (stricmp(optName, "num_threads_io")==0)
  1126. {
  1127. unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
  1128. cass_cluster_set_num_threads_io(*cluster, num_threads_io); // No status return
  1129. }
  1130. else if (stricmp(optName, "queue_size_io")==0)
  1131. {
  1132. unsigned queue_size_io = getUnsignedOption(val, "queue_size_io");
  1133. checkSetOption(cass_cluster_set_queue_size_io(*cluster, queue_size_io), "queue_size_io");
  1134. }
  1135. else if (stricmp(optName, "core_connections_per_host")==0)
  1136. {
  1137. unsigned core_connections_per_host = getUnsignedOption(val, "core_connections_per_host");
  1138. checkSetOption(cass_cluster_set_core_connections_per_host(*cluster, core_connections_per_host), "core_connections_per_host");
  1139. }
  1140. else if (stricmp(optName, "max_connections_per_host")==0)
  1141. {
  1142. unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
  1143. checkSetOption(cass_cluster_set_max_connections_per_host(*cluster, max_connections_per_host), "max_connections_per_host");
  1144. }
  1145. else if (stricmp(optName, "max_concurrent_creation")==0)
  1146. {
  1147. unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
  1148. checkSetOption(cass_cluster_set_max_concurrent_creation(*cluster, max_concurrent_creation), "max_concurrent_creation");
  1149. }
  1150. else if (stricmp(optName, "pending_requests_high_water_mark")==0)
  1151. {
  1152. unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
  1153. checkSetOption(cass_cluster_set_pending_requests_high_water_mark(*cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
  1154. }
  1155. else if (stricmp(optName, "pending_requests_low_water_mark")==0)
  1156. {
  1157. unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
  1158. checkSetOption(cass_cluster_set_pending_requests_low_water_mark(*cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
  1159. }
  1160. else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
  1161. {
  1162. unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
  1163. checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(*cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
  1164. }
  1165. else if (stricmp(optName, "connect_timeout")==0)
  1166. {
  1167. unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
  1168. cass_cluster_set_connect_timeout(*cluster, connect_timeout);
  1169. }
  1170. else if (stricmp(optName, "request_timeout")==0)
  1171. {
  1172. unsigned request_timeout = getUnsignedOption(val, "request_timeout");
  1173. cass_cluster_set_request_timeout(*cluster, request_timeout);
  1174. }
  1175. else
  1176. failx("Unrecognized option %s", optName.str());
  1177. }
  1178. }
  1179. cass_cluster_set_contact_points(*cluster, contact_points);
  1180. if (*user || *password)
  1181. cass_cluster_set_credentials(*cluster, user, password);
  1182. session.setown(new CassandraSession(cass_session_new()));
  1183. CassandraFuture future(keyspace ? cass_session_connect_keyspace(*session, *cluster, keyspace) : cass_session_connect(*session, *cluster));
  1184. future.wait("connect");
  1185. }
  1186. virtual bool getBooleanResult()
  1187. {
  1188. bool ret = cassandraembed::getBooleanResult(NULL, getScalarResult());
  1189. checkSingleRow();
  1190. return ret;
  1191. }
  1192. virtual void getDataResult(size32_t &len, void * &result)
  1193. {
  1194. cassandraembed::getDataResult(NULL, getScalarResult(), len, result);
  1195. checkSingleRow();
  1196. }
  1197. virtual double getRealResult()
  1198. {
  1199. double ret = cassandraembed::getRealResult(NULL, getScalarResult());
  1200. checkSingleRow();
  1201. return ret;
  1202. }
  1203. virtual __int64 getSignedResult()
  1204. {
  1205. __int64 ret = cassandraembed::getSignedResult(NULL, getScalarResult());
  1206. checkSingleRow();
  1207. return ret;
  1208. }
  1209. virtual unsigned __int64 getUnsignedResult()
  1210. {
  1211. unsigned __int64 ret = cassandraembed::getUnsignedResult(NULL, getScalarResult());
  1212. checkSingleRow();
  1213. return ret;
  1214. }
  1215. virtual void getStringResult(size32_t &chars, char * &result)
  1216. {
  1217. cassandraembed::getStringResult(NULL, getScalarResult(), chars, result);
  1218. checkSingleRow();
  1219. }
  1220. virtual void getUTF8Result(size32_t &chars, char * &result)
  1221. {
  1222. cassandraembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1223. checkSingleRow();
  1224. }
  1225. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1226. {
  1227. cassandraembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1228. checkSingleRow();
  1229. }
  1230. virtual void getDecimalResult(Decimal &value)
  1231. {
  1232. cassandraembed::getDecimalResult(NULL, getScalarResult(), value);
  1233. checkSingleRow();
  1234. }
  1235. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1236. {
  1237. CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
  1238. rtlRowBuilder out;
  1239. byte *outData = NULL;
  1240. size32_t outBytes = 0;
  1241. while (cass_iterator_next(iterator))
  1242. {
  1243. const CassValue *value = cass_iterator_get_value(iterator);
  1244. assertex(value);
  1245. if (elemSize != UNKNOWN_LENGTH)
  1246. {
  1247. out.ensureAvailable(outBytes + elemSize);
  1248. outData = out.getbytes() + outBytes;
  1249. }
  1250. switch ((type_t) elemType)
  1251. {
  1252. case type_int:
  1253. rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
  1254. break;
  1255. case type_unsigned:
  1256. rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
  1257. break;
  1258. case type_real:
  1259. if (elemSize == sizeof(double))
  1260. * (double *) outData = cassandraembed::getRealResult(NULL, value);
  1261. else
  1262. {
  1263. assertex(elemSize == sizeof(float));
  1264. * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
  1265. }
  1266. break;
  1267. case type_boolean:
  1268. assertex(elemSize == sizeof(bool));
  1269. * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
  1270. break;
  1271. case type_string:
  1272. case type_varstring:
  1273. {
  1274. rtlDataAttr str;
  1275. size32_t lenBytes;
  1276. cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
  1277. if (elemSize == UNKNOWN_LENGTH)
  1278. {
  1279. if (elemType == type_string)
  1280. {
  1281. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1282. outData = out.getbytes() + outBytes;
  1283. * (size32_t *) outData = lenBytes;
  1284. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
  1285. outBytes += lenBytes + sizeof(size32_t);
  1286. }
  1287. else
  1288. {
  1289. out.ensureAvailable(outBytes + lenBytes + 1);
  1290. outData = out.getbytes() + outBytes;
  1291. rtlStrToVStr(0, outData, lenBytes, str.getstr());
  1292. outBytes += lenBytes + 1;
  1293. }
  1294. }
  1295. else
  1296. {
  1297. if (elemType == type_string)
  1298. rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
  1299. else
  1300. rtlStrToVStr(elemSize, outData, lenBytes, str.getstr()); // Fixed size null terminated strings... weird.
  1301. }
  1302. break;
  1303. }
  1304. case type_unicode:
  1305. case type_utf8:
  1306. {
  1307. rtlDataAttr str;
  1308. size32_t lenChars;
  1309. cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
  1310. const char * text = str.getstr();
  1311. size32_t lenBytes = rtlUtf8Size(lenChars, text);
  1312. if (elemType == type_utf8)
  1313. {
  1314. assertex (elemSize == UNKNOWN_LENGTH);
  1315. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1316. outData = out.getbytes() + outBytes;
  1317. * (size32_t *) outData = lenChars;
  1318. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1319. outBytes += lenBytes + sizeof(size32_t);
  1320. }
  1321. else
  1322. {
  1323. if (elemSize == UNKNOWN_LENGTH)
  1324. {
  1325. // You can't assume that number of chars in utf8 matches number in unicode16 ...
  1326. size32_t numchars16;
  1327. rtlDataAttr unicode16;
  1328. rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
  1329. out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
  1330. outData = out.getbytes() + outBytes;
  1331. * (size32_t *) outData = numchars16;
  1332. rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
  1333. outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
  1334. }
  1335. else
  1336. rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
  1337. }
  1338. break;
  1339. }
  1340. default:
  1341. fail("type mismatch - unsupported return type");
  1342. }
  1343. if (elemSize != UNKNOWN_LENGTH)
  1344. outBytes += elemSize;
  1345. }
  1346. __isAllResult = false;
  1347. __resultBytes = outBytes;
  1348. __result = out.detachdata();
  1349. }
  1350. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1351. {
  1352. return new CassandraRowStream(inputStream, stmtInfo, _resultAllocator);
  1353. }
  1354. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1355. {
  1356. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1357. typeError("row", NULL, NULL);
  1358. CassandraRowStream stream(NULL, stmtInfo, _resultAllocator);
  1359. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1360. stream.stop();
  1361. if (ret == NULL) // Check for exactly one returned row
  1362. typeError("row", NULL, NULL);
  1363. return (byte *) ret.getClear();
  1364. }
  1365. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1366. {
  1367. if (!stmtInfo->hasResult() || stmtInfo->rowCount() != 1)
  1368. typeError("row", NULL, NULL);
  1369. if (!stmtInfo->next())
  1370. fail("Failed to read row");
  1371. CassandraRowBuilder cassandraRowBuilder(stmtInfo);
  1372. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1373. assertex(typeInfo);
  1374. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1375. return typeInfo->build(rowBuilder, 0, &dummyField, cassandraRowBuilder);
  1376. }
  1377. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1378. {
  1379. CassandraRecordBinder binder(logctx, metaVal.queryTypeInfo(), stmtInfo, nextParam);
  1380. binder.processRow(val);
  1381. nextParam += binder.numFields();
  1382. }
  1383. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1384. {
  1385. // We only support a single dataset parameter...
  1386. // MORE - look into batch?
  1387. if (inputStream)
  1388. {
  1389. fail("At most one dataset parameter supported");
  1390. }
  1391. inputStream.setown(new CassandraDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), stmtInfo, nextParam));
  1392. nextParam += inputStream->numFields();
  1393. }
  1394. virtual void bindBooleanParam(const char *name, bool val)
  1395. {
  1396. checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(name), val ? cass_true : cass_false), name);
  1397. }
  1398. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1399. {
  1400. checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), cass_bytes_init((const cass_byte_t*) val, len)), name);
  1401. }
  1402. virtual void bindFloatParam(const char *name, float val)
  1403. {
  1404. checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1405. }
  1406. virtual void bindRealParam(const char *name, double val)
  1407. {
  1408. checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1409. }
  1410. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1411. {
  1412. if (size > 4)
  1413. checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1414. else
  1415. checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(name), val), name);
  1416. }
  1417. virtual void bindSignedParam(const char *name, __int64 val)
  1418. {
  1419. bindSignedSizeParam(name, 8, val);
  1420. }
  1421. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1422. {
  1423. UNSUPPORTED("UNSIGNED columns");
  1424. }
  1425. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1426. {
  1427. UNSUPPORTED("UNSIGNED columns");
  1428. }
  1429. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1430. {
  1431. size32_t utf8chars;
  1432. rtlDataAttr utfText;
  1433. rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
  1434. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1435. checkNextParam(name),
  1436. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1437. name);
  1438. }
  1439. virtual void bindVStringParam(const char *name, const char *val)
  1440. {
  1441. bindStringParam(name, strlen(val), val);
  1442. }
  1443. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1444. {
  1445. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(name), cass_string_init2(val, rtlUtf8Size(chars, val))), name);
  1446. }
  1447. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1448. {
  1449. size32_t utf8chars;
  1450. rtlDataAttr utfText;
  1451. rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
  1452. checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
  1453. checkNextParam(name),
  1454. cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
  1455. name);
  1456. }
  1457. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1458. {
  1459. if (isAll)
  1460. UNSUPPORTED("SET(ALL)");
  1461. type_t typecode = (type_t) elemType;
  1462. const byte *inData = (const byte *) setData;
  1463. const byte *endData = inData + totalBytes;
  1464. int numElems;
  1465. if (elemSize == UNKNOWN_LENGTH)
  1466. {
  1467. numElems = 0;
  1468. // Will need 2 passes to work out how many elements there are in the set :(
  1469. while (inData < endData)
  1470. {
  1471. int thisSize;
  1472. switch (elemType)
  1473. {
  1474. case type_varstring:
  1475. thisSize = strlen((const char *) inData) + 1;
  1476. break;
  1477. case type_string:
  1478. thisSize = * (size32_t *) inData + sizeof(size32_t);
  1479. break;
  1480. case type_unicode:
  1481. thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
  1482. break;
  1483. case type_utf8:
  1484. thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
  1485. break;
  1486. default:
  1487. fail("Unsupported parameter type");
  1488. break;
  1489. }
  1490. inData += thisSize;
  1491. numElems++;
  1492. }
  1493. inData = (const byte *) setData;
  1494. }
  1495. else
  1496. numElems = totalBytes / elemSize;
  1497. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
  1498. while (inData < endData)
  1499. {
  1500. size32_t thisSize = elemSize;
  1501. CassError rc;
  1502. switch (typecode)
  1503. {
  1504. case type_int:
  1505. if (elemSize > 4)
  1506. rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
  1507. else
  1508. rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
  1509. break;
  1510. case type_unsigned:
  1511. UNSUPPORTED("UNSIGNED columns");
  1512. break;
  1513. case type_varstring:
  1514. {
  1515. size32_t numChars = strlen((const char *) inData);
  1516. if (elemSize == UNKNOWN_LENGTH)
  1517. thisSize = numChars + 1;
  1518. size32_t utf8chars;
  1519. rtlDataAttr utfText;
  1520. rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
  1521. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1522. break;
  1523. }
  1524. case type_string:
  1525. {
  1526. if (elemSize == UNKNOWN_LENGTH)
  1527. {
  1528. thisSize = * (size32_t *) inData;
  1529. inData += sizeof(size32_t);
  1530. }
  1531. size32_t utf8chars;
  1532. rtlDataAttr utfText;
  1533. rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
  1534. rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
  1535. break;
  1536. }
  1537. case type_real:
  1538. if (elemSize == sizeof(double))
  1539. rc = cass_collection_append_double(collection, * (double *) inData);
  1540. else
  1541. rc = cass_collection_append_float(collection, * (float *) inData);
  1542. break;
  1543. case type_boolean:
  1544. assertex(elemSize == sizeof(bool));
  1545. rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
  1546. break;
  1547. case type_unicode:
  1548. {
  1549. if (elemSize == UNKNOWN_LENGTH)
  1550. {
  1551. thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
  1552. inData += sizeof(size32_t);
  1553. }
  1554. unsigned unicodeChars;
  1555. rtlDataAttr unicode;
  1556. rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
  1557. size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
  1558. rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
  1559. break;
  1560. }
  1561. case type_utf8:
  1562. {
  1563. assertex (elemSize == UNKNOWN_LENGTH);
  1564. size32_t numChars = * (size32_t *) inData;
  1565. inData += sizeof(size32_t);
  1566. thisSize = rtlUtf8Size(numChars, inData);
  1567. rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
  1568. break;
  1569. }
  1570. case type_data:
  1571. if (elemSize == UNKNOWN_LENGTH)
  1572. {
  1573. thisSize = * (size32_t *) inData;
  1574. inData += sizeof(size32_t);
  1575. }
  1576. rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
  1577. break;
  1578. }
  1579. checkBind(rc, name);
  1580. inData += thisSize;
  1581. }
  1582. checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
  1583. checkNextParam(name),
  1584. collection),
  1585. name);
  1586. }
  1587. virtual void importFunction(size32_t lenChars, const char *text)
  1588. {
  1589. throwUnexpected();
  1590. }
  1591. virtual void compileEmbeddedScript(size32_t chars, const char *_script)
  1592. {
  1593. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  1594. size32_t len = rtlUtf8Size(chars, _script);
  1595. queryString.set(_script, len);
  1596. const char *script = queryString.get(); // Now null terminated
  1597. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1598. {
  1599. loop
  1600. {
  1601. const char *nextScript = findUnquoted(script, ';');
  1602. if (!nextScript)
  1603. {
  1604. // script should be pointing at only trailing whitespace, else it's a "missing ;" error
  1605. break;
  1606. }
  1607. CassandraStatement statement(cass_statement_new(cass_string_init2(script, nextScript-script), 0));
  1608. CassandraFuture future(cass_session_execute(*session, statement));
  1609. future.wait("execute statement");
  1610. script = nextScript;
  1611. }
  1612. }
  1613. else
  1614. {
  1615. // MORE - can cache this, perhaps, if script is same as last time?
  1616. CassandraFuture future(cass_session_prepare(*session, cass_string_init(script)));
  1617. future.wait("prepare statement");
  1618. Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
  1619. if ((flags & EFnoparams) == 0)
  1620. numParams = countBindings(script);
  1621. else
  1622. numParams = 0;
  1623. stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode));
  1624. }
  1625. }
  1626. virtual void callFunction()
  1627. {
  1628. // Does not seem to be a way to check number of parameters expected...
  1629. // if (nextParam != cass_statement_bind_count(stmtInfo))
  1630. // fail("Not enough parameters");
  1631. try
  1632. {
  1633. if (stmtInfo && !stmtInfo->hasResult())
  1634. lazyExecute();
  1635. }
  1636. catch (IException *E)
  1637. {
  1638. StringBuffer msg;
  1639. E->errorMessage(msg);
  1640. msg.appendf(" (processing query %s)", queryString.get());
  1641. throw makeStringException(E->errorCode(), msg);
  1642. }
  1643. }
  1644. protected:
  1645. void lazyExecute()
  1646. {
  1647. if (inputStream)
  1648. inputStream->executeAll(stmtInfo);
  1649. else
  1650. stmtInfo->execute();
  1651. }
  1652. const CassValue *getScalarResult()
  1653. {
  1654. if (!stmtInfo->next())
  1655. typeError("scalar", NULL, NULL);
  1656. if (cass_row_get_column(stmtInfo->queryRow(), 1))
  1657. typeError("scalar", NULL, NULL);
  1658. const CassValue *result = cass_row_get_column(stmtInfo->queryRow(), 0);
  1659. if (!result)
  1660. typeError("scalar", NULL, NULL);
  1661. return result;
  1662. }
  1663. void checkSingleRow()
  1664. {
  1665. if (stmtInfo->rowCount() != 1)
  1666. typeError("scalar", NULL, NULL);
  1667. }
  1668. unsigned countBindings(const char *query)
  1669. {
  1670. unsigned queryCount = 0;
  1671. while ((query = findUnquoted(query, '?')) != NULL)
  1672. queryCount++;
  1673. return queryCount;
  1674. }
  1675. const char *findUnquoted(const char *query, char searchFor)
  1676. {
  1677. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  1678. char inStr = '\0';
  1679. char ch;
  1680. while ((ch = *query++) != 0)
  1681. {
  1682. if (ch == inStr)
  1683. inStr = false;
  1684. else switch (ch)
  1685. {
  1686. case '\'':
  1687. case '"':
  1688. inStr = ch;
  1689. break;
  1690. case '\\':
  1691. if (inStr && *query)
  1692. query++;
  1693. break;
  1694. case '/':
  1695. if (!inStr)
  1696. {
  1697. if (*query=='/')
  1698. {
  1699. while (*query && *query != '\n')
  1700. query++;
  1701. }
  1702. else if (*query=='*')
  1703. {
  1704. query++;
  1705. loop
  1706. {
  1707. if (!*query)
  1708. fail("Unterminated comment in query string");
  1709. if (*query=='*' && query[1]=='/')
  1710. {
  1711. query+= 2;
  1712. break;
  1713. }
  1714. query++;
  1715. }
  1716. }
  1717. }
  1718. break;
  1719. default:
  1720. if (!inStr && ch==searchFor)
  1721. return query;
  1722. break;
  1723. }
  1724. }
  1725. return NULL;
  1726. }
  1727. inline unsigned checkNextParam(const char *name)
  1728. {
  1729. if (nextParam == numParams)
  1730. failx("Too many parameters supplied: No matching ? for parameter %s", name);
  1731. return nextParam++;
  1732. }
  1733. inline void checkBind(CassError rc, const char *name)
  1734. {
  1735. if (rc != CASS_OK)
  1736. {
  1737. failx("While binding parameter %s: %s", name, cass_error_desc(rc));
  1738. }
  1739. }
  1740. inline void checkSetOption(CassError rc, const char *name)
  1741. {
  1742. if (rc != CASS_OK)
  1743. {
  1744. failx("While setting option %s: %s", name, cass_error_desc(rc));
  1745. }
  1746. }
  1747. unsigned getUnsignedOption(const char *val, const char *option)
  1748. {
  1749. char *endp;
  1750. long value = strtoul(val, &endp, 0);
  1751. if (endp==val || *endp != '\0' || value > INT_MAX || value < INT_MIN)
  1752. failx("Invalid value '%s' for option %s", val, option);
  1753. return (int) value;
  1754. }
  1755. Owned<CassandraCluster> cluster;
  1756. Owned<CassandraSession> session;
  1757. Owned<CassandraStatementInfo> stmtInfo;
  1758. Owned<CassandraDatasetBinder> inputStream;
  1759. const IContextLogger &logctx;
  1760. unsigned flags;
  1761. unsigned nextParam;
  1762. unsigned numParams;
  1763. CassBatchType batchMode;
  1764. StringAttr queryString;
  1765. };
  1766. class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
  1767. {
  1768. public:
  1769. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1770. {
  1771. return createFunctionContextEx(NULL, flags, options);
  1772. }
  1773. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1774. {
  1775. if (flags & EFimport)
  1776. UNSUPPORTED("IMPORT");
  1777. else
  1778. return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
  1779. }
  1780. };
  1781. extern IEmbedContext* getEmbedContext()
  1782. {
  1783. return new CassandraEmbedContext();
  1784. }
  1785. extern bool syntaxCheck(const char *script)
  1786. {
  1787. return true; // MORE
  1788. }
  1789. //--------------------------------------------
  1790. #define ATTRIBUTES_NAME "attributes"
  1791. void addElement(IPTree *parent, const char *name, const CassValue *value)
  1792. {
  1793. switch (cass_value_type(value))
  1794. {
  1795. case CASS_VALUE_TYPE_UNKNOWN:
  1796. // It's a NULL - ignore it (or we could add empty element...)
  1797. break;
  1798. case CASS_VALUE_TYPE_ASCII:
  1799. case CASS_VALUE_TYPE_TEXT:
  1800. case CASS_VALUE_TYPE_VARCHAR:
  1801. {
  1802. rtlDataAttr str;
  1803. unsigned chars;
  1804. getUTF8Result(NULL, value, chars, str.refstr());
  1805. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  1806. parent->addProp(name, s);
  1807. break;
  1808. }
  1809. case CASS_VALUE_TYPE_INT:
  1810. case CASS_VALUE_TYPE_BIGINT:
  1811. case CASS_VALUE_TYPE_VARINT:
  1812. parent->addPropInt64(name, getSignedResult(NULL, value));
  1813. break;
  1814. case CASS_VALUE_TYPE_BLOB:
  1815. {
  1816. rtlDataAttr data;
  1817. unsigned bytes;
  1818. getDataResult(NULL, value, bytes, data.refdata());
  1819. parent->addPropBin(name, bytes, data.getbytes());
  1820. break;
  1821. }
  1822. case CASS_VALUE_TYPE_BOOLEAN:
  1823. parent->addPropBool(name, getBooleanResult(NULL, value));
  1824. break;
  1825. case CASS_VALUE_TYPE_DOUBLE:
  1826. case CASS_VALUE_TYPE_FLOAT:
  1827. {
  1828. double v = getRealResult(NULL, value);
  1829. StringBuffer s;
  1830. s.append(v);
  1831. parent->addProp(name, s);
  1832. break;
  1833. }
  1834. case CASS_VALUE_TYPE_LIST:
  1835. case CASS_VALUE_TYPE_SET:
  1836. {
  1837. CassandraIterator elems(cass_iterator_from_collection(value));
  1838. Owned<IPTree> list = createPTree(name);
  1839. while (cass_iterator_next(elems))
  1840. addElement(list, "item", cass_iterator_get_value(elems));
  1841. parent->addPropTree(name, list.getClear());
  1842. break;
  1843. }
  1844. case CASS_VALUE_TYPE_MAP:
  1845. {
  1846. CassandraIterator elems(cass_iterator_from_map(value));
  1847. if (strcmp(name, ATTRIBUTES_NAME)==0 && isString(cass_value_primary_sub_type(value)))
  1848. {
  1849. while (cass_iterator_next(elems))
  1850. {
  1851. rtlDataAttr str;
  1852. unsigned chars;
  1853. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1854. StringBuffer s("@");
  1855. s.append(chars, str.getstr());
  1856. addElement(parent, s, cass_iterator_get_map_value(elems));
  1857. }
  1858. }
  1859. else
  1860. {
  1861. Owned<IPTree> map = createPTree(name);
  1862. while (cass_iterator_next(elems))
  1863. {
  1864. if (isString(cass_value_primary_sub_type(value)))
  1865. {
  1866. rtlDataAttr str;
  1867. unsigned chars;
  1868. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  1869. StringAttr s(str.getstr(), chars);
  1870. addElement(map, s, cass_iterator_get_map_value(elems));
  1871. }
  1872. else
  1873. {
  1874. Owned<IPTree> mapping = createPTree("mapping");
  1875. addElement(mapping, "key", cass_iterator_get_map_key(elems));
  1876. addElement(mapping, "value", cass_iterator_get_map_value(elems));
  1877. map->addPropTree("mapping", mapping.getClear());
  1878. }
  1879. }
  1880. parent->addPropTree(name, map.getClear());
  1881. }
  1882. break;
  1883. }
  1884. default:
  1885. DBGLOG("Column type %d not supported", cass_value_type(value));
  1886. UNSUPPORTED("Column type");
  1887. }
  1888. }
  1889. void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const char *name, CassValueType type)
  1890. {
  1891. if (parent->hasProp(name) || strcmp(name, ATTRIBUTES_NAME)==0)
  1892. {
  1893. switch (type)
  1894. {
  1895. case CASS_VALUE_TYPE_ASCII:
  1896. case CASS_VALUE_TYPE_TEXT:
  1897. case CASS_VALUE_TYPE_VARCHAR:
  1898. {
  1899. const char *value = parent->queryProp(name);
  1900. if (value)
  1901. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  1902. break;
  1903. }
  1904. case CASS_VALUE_TYPE_INT:
  1905. check(cass_statement_bind_int32(statement, idx, parent->getPropInt(name)));
  1906. break;
  1907. case CASS_VALUE_TYPE_BIGINT:
  1908. case CASS_VALUE_TYPE_VARINT:
  1909. check(cass_statement_bind_int64(statement, idx, parent->getPropInt64(name)));
  1910. break;
  1911. case CASS_VALUE_TYPE_BLOB:
  1912. {
  1913. MemoryBuffer buf;
  1914. parent->getPropBin(name, buf);
  1915. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t*)buf.toByteArray(), buf.length())));
  1916. break;
  1917. }
  1918. case CASS_VALUE_TYPE_BOOLEAN:
  1919. check(cass_statement_bind_bool(statement, idx, (cass_bool_t) parent->getPropBool(name)));
  1920. break;
  1921. case CASS_VALUE_TYPE_DOUBLE:
  1922. check(cass_statement_bind_double(statement, idx, atof(parent->queryProp(name))));
  1923. break;
  1924. case CASS_VALUE_TYPE_FLOAT:
  1925. check(cass_statement_bind_float(statement, idx, atof(parent->queryProp(name))));
  1926. break;
  1927. case CASS_VALUE_TYPE_LIST:
  1928. case CASS_VALUE_TYPE_SET:
  1929. {
  1930. Owned<IPTree> child = parent->getPropTree(name);
  1931. unsigned numItems = child->getCount("item");
  1932. if (numItems)
  1933. {
  1934. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numItems));
  1935. Owned<IPTreeIterator> items = child->getElements("item");
  1936. ForEach(*items)
  1937. {
  1938. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  1939. if (strcmp(name, "list1")==0)
  1940. check(cass_collection_append_int32(collection, items->query().getPropInt(NULL)));
  1941. else
  1942. check(cass_collection_append_string(collection, cass_string_init(items->query().queryProp(NULL))));
  1943. }
  1944. check(cass_statement_bind_collection(statement, idx, collection));
  1945. }
  1946. break;
  1947. }
  1948. case CASS_VALUE_TYPE_MAP:
  1949. {
  1950. // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
  1951. if (strcmp(name, ATTRIBUTES_NAME)==0)
  1952. {
  1953. Owned<IAttributeIterator> attrs = parent->getAttributes();
  1954. unsigned numItems = attrs->count();
  1955. ForEach(*attrs)
  1956. {
  1957. numItems++;
  1958. }
  1959. if (numItems)
  1960. {
  1961. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  1962. ForEach(*attrs)
  1963. {
  1964. const char *key = attrs->queryName();
  1965. const char *value = attrs->queryValue();
  1966. check(cass_collection_append_string(collection, cass_string_init(key+1))); // skip the @
  1967. check(cass_collection_append_string(collection, cass_string_init(value)));
  1968. }
  1969. check(cass_statement_bind_collection(statement, idx, collection));
  1970. }
  1971. }
  1972. else
  1973. {
  1974. Owned<IPTree> child = parent->getPropTree(name);
  1975. unsigned numItems = child->numChildren();
  1976. // MORE - if the cassandra driver objects to there being fewer than numItems supplied, we may need to recode using a second pass.
  1977. if (numItems)
  1978. {
  1979. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  1980. Owned<IPTreeIterator> items = child->getElements("*");
  1981. ForEach(*items)
  1982. {
  1983. IPTree &item = items->query();
  1984. const char *key = item.queryName();
  1985. const char *value = item.queryProp(NULL);
  1986. if (key && value)
  1987. {
  1988. check(cass_collection_append_string(collection, cass_string_init(key)));
  1989. check(cass_collection_append_string(collection, cass_string_init(value)));
  1990. }
  1991. }
  1992. check(cass_statement_bind_collection(statement, idx, collection));
  1993. }
  1994. }
  1995. break;
  1996. }
  1997. default:
  1998. DBGLOG("Column type %d not supported", type);
  1999. UNSUPPORTED("Column type");
  2000. }
  2001. }
  2002. }
  2003. extern void cassandraToGenericXML()
  2004. {
  2005. CassandraCluster cluster(cass_cluster_new());
  2006. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2007. CassandraSession session(cass_session_new());
  2008. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "test"));
  2009. future.wait("connect");
  2010. CassandraStatement statement(cass_statement_new(cass_string_init("select * from tbl1 where name = 'name1';"), 0));
  2011. CassandraFuture future2(cass_session_execute(session, statement));
  2012. future2.wait("execute");
  2013. CassandraResult result(cass_future_get_result(future2));
  2014. StringArray names;
  2015. UnsignedArray types;
  2016. for (int i = 0; i < cass_result_column_count(result); i++)
  2017. {
  2018. CassString column = cass_result_column_name(result, i);
  2019. StringBuffer name(column.length, column.data);
  2020. names.append(name);
  2021. types.append(cass_result_column_type(result, i));
  2022. }
  2023. // Now fetch the rows
  2024. Owned<IPTree> xml = createPTree("tbl1");
  2025. CassandraIterator rows(cass_iterator_from_result(result));
  2026. while (cass_iterator_next(rows))
  2027. {
  2028. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2029. Owned<IPTree> row = createPTree("row");
  2030. unsigned colidx = 0;
  2031. while (cass_iterator_next(cols))
  2032. {
  2033. const CassValue *value = cass_iterator_get_column(cols);
  2034. const char *name = names.item(colidx);
  2035. addElement(row, name, value);
  2036. colidx++;
  2037. }
  2038. xml->addPropTree("row", row.getClear());
  2039. }
  2040. xml->setProp("row[1]/name", "newname");
  2041. StringBuffer buf;
  2042. toXML(xml, buf);
  2043. DBGLOG("%s", buf.str());
  2044. // Now try going the other way...
  2045. // For this we need to know the expected names (can fetch them from system table) and types (ditto, potentially, though a dummy select may be easier)
  2046. StringBuffer colNames;
  2047. StringBuffer values;
  2048. ForEachItemIn(idx, names)
  2049. {
  2050. colNames.append(",").append(names.item(idx));
  2051. values.append(",?");
  2052. }
  2053. VStringBuffer insertQuery("INSERT into tbl1 (%s) values (%s);", colNames.str()+1, values.str()+1);
  2054. Owned<IPTreeIterator> xmlRows = xml->getElements("row");
  2055. ForEach(*xmlRows)
  2056. {
  2057. IPropertyTree *xmlrow = &xmlRows->query();
  2058. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), names.length()));
  2059. ForEachItemIn(idx, names)
  2060. {
  2061. bindElement(update, xmlrow, idx, names.item(idx), (CassValueType) types.item(idx));
  2062. }
  2063. // MORE - use a batch
  2064. CassandraFuture future3(cass_session_execute(session, update));
  2065. future2.wait("insert");
  2066. }
  2067. }
  2068. //--------------------------------------------
  2069. struct CassandraColumnMapper
  2070. {
  2071. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  2072. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal = 0) = 0;
  2073. };
  2074. class StringColumnMapper : implements CassandraColumnMapper
  2075. {
  2076. public:
  2077. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2078. {
  2079. rtlDataAttr str;
  2080. unsigned chars;
  2081. getUTF8Result(NULL, value, chars, str.refstr());
  2082. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2083. row->setProp(name, s);
  2084. return row;
  2085. }
  2086. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2087. {
  2088. const char *value = row->queryProp(name);
  2089. if (value)
  2090. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2091. }
  2092. } stringColumnMapper;
  2093. class BlobColumnMapper : implements CassandraColumnMapper
  2094. {
  2095. public:
  2096. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2097. {
  2098. rtlDataAttr str;
  2099. unsigned chars;
  2100. getDataResult(NULL, value, chars, str.refdata());
  2101. row->setPropBin(name, chars, str.getbytes());
  2102. return row;
  2103. }
  2104. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2105. {
  2106. MemoryBuffer value;
  2107. row->getPropBin(name, value);
  2108. if (value.length())
  2109. {
  2110. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.toByteArray(), value.length())));
  2111. }
  2112. }
  2113. } blobColumnMapper;
  2114. class TimeStampColumnMapper : implements CassandraColumnMapper
  2115. {
  2116. public:
  2117. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2118. {
  2119. // never fetched (that may change?)
  2120. return row;
  2121. }
  2122. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2123. {
  2124. // never bound
  2125. }
  2126. } timestampColumnMapper;
  2127. class RootNameColumnMapper : implements CassandraColumnMapper
  2128. {
  2129. public:
  2130. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2131. {
  2132. rtlDataAttr str;
  2133. unsigned chars;
  2134. getUTF8Result(NULL, value, chars, str.refstr());
  2135. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2136. row->renameProp("/", s);
  2137. return row;
  2138. }
  2139. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2140. {
  2141. const char *value = row->queryName();
  2142. if (value)
  2143. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2144. }
  2145. } rootNameColumnMapper;
  2146. class GraphIdColumnMapper : implements CassandraColumnMapper
  2147. {
  2148. public:
  2149. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2150. {
  2151. rtlDataAttr str;
  2152. unsigned chars;
  2153. getUTF8Result(NULL, value, chars, str.refstr());
  2154. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  2155. if (strcmp(s, "Running")==0) // The input XML structure is a little odd
  2156. return row;
  2157. else
  2158. {
  2159. if (!row->hasProp(s))
  2160. row->addPropTree(s, createPTree());
  2161. return row->queryPropTree(s);
  2162. }
  2163. }
  2164. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2165. {
  2166. const char *value = row->queryName();
  2167. if (value)
  2168. check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
  2169. }
  2170. } graphIdColumnMapper;
  2171. class ProgressColumnMapper : implements CassandraColumnMapper
  2172. {
  2173. public:
  2174. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2175. {
  2176. rtlDataAttr str;
  2177. unsigned chars;
  2178. getDataResult(NULL, value, chars, str.refdata()); // Stored as a blob in case we want to compress
  2179. IPTree *child = createPTreeFromXMLString(chars, str.getstr()); // For now, assume we did not compress!
  2180. row->addPropTree(child->queryName(), child);
  2181. return child;
  2182. }
  2183. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2184. {
  2185. // MORE - may need to read, and probably should write, compressed.
  2186. StringBuffer value;
  2187. ::toXML(row, value, 0, 0);
  2188. if (value.length())
  2189. {
  2190. check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.str(), value.length())));
  2191. }
  2192. }
  2193. } progressColumnMapper;
  2194. class BoolColumnMapper : implements CassandraColumnMapper
  2195. {
  2196. public:
  2197. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2198. {
  2199. row->addPropBool(name, getBooleanResult(NULL, value));
  2200. return row;
  2201. }
  2202. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2203. {
  2204. if (row->hasProp(name))
  2205. {
  2206. bool value = row->getPropBool(name, false);
  2207. check(cass_statement_bind_bool(statement, idx, value ? cass_true : cass_false));
  2208. }
  2209. }
  2210. } boolColumnMapper;
  2211. class IntColumnMapper : implements CassandraColumnMapper
  2212. {
  2213. public:
  2214. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2215. {
  2216. row->addPropInt(name, getSignedResult(NULL, value));
  2217. return row;
  2218. }
  2219. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2220. {
  2221. if (row->hasProp(name))
  2222. {
  2223. int value = row->getPropInt(name);
  2224. check(cass_statement_bind_int32(statement, idx, value));
  2225. }
  2226. }
  2227. } intColumnMapper;
  2228. class DefaultedIntColumnMapper : public IntColumnMapper
  2229. {
  2230. public:
  2231. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int defaultValue)
  2232. {
  2233. int value = row->getPropInt(name, defaultValue);
  2234. check(cass_statement_bind_int32(statement, idx, value));
  2235. }
  2236. } defaultedIntColumnMapper;
  2237. class BigIntColumnMapper : implements CassandraColumnMapper
  2238. {
  2239. public:
  2240. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2241. {
  2242. row->addPropInt64(name, getSignedResult(NULL, value));
  2243. return row;
  2244. }
  2245. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2246. {
  2247. if (row->hasProp(name))
  2248. {
  2249. __int64 value = row->getPropInt64(name);
  2250. check(cass_statement_bind_int64(statement, idx, value));
  2251. }
  2252. }
  2253. } bigintColumnMapper;
  2254. class SubgraphIdColumnMapper : implements CassandraColumnMapper
  2255. {
  2256. public:
  2257. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2258. {
  2259. __int64 id = getSignedResult(NULL, value);
  2260. if (id)
  2261. row->addPropInt64(name, id);
  2262. return row;
  2263. }
  2264. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2265. {
  2266. int value = row->getPropInt(name);
  2267. check(cass_statement_bind_int64(statement, idx, value));
  2268. }
  2269. } subgraphIdColumnMapper;
  2270. class SimpleMapColumnMapper : implements CassandraColumnMapper
  2271. {
  2272. public:
  2273. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2274. {
  2275. Owned<IPTree> map = createPTree(name);
  2276. CassandraIterator elems(cass_iterator_from_map(value));
  2277. while (cass_iterator_next(elems))
  2278. {
  2279. rtlDataAttr str;
  2280. unsigned chars;
  2281. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2282. StringAttr s(str.getstr(), chars);
  2283. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  2284. }
  2285. row->addPropTree(name, map.getClear());
  2286. return row;
  2287. }
  2288. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2289. {
  2290. Owned<IPTree> child = row->getPropTree(name);
  2291. unsigned numItems = child->numChildren();
  2292. if (numItems)
  2293. {
  2294. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2295. Owned<IPTreeIterator> items = child->getElements("*");
  2296. ForEach(*items)
  2297. {
  2298. IPTree &item = items->query();
  2299. const char *key = item.queryName();
  2300. const char *value = item.queryProp(NULL);
  2301. if (key && value)
  2302. {
  2303. check(cass_collection_append_string(collection, cass_string_init(key)));
  2304. check(cass_collection_append_string(collection, cass_string_init(value)));
  2305. }
  2306. }
  2307. check(cass_statement_bind_collection(statement, idx, collection));
  2308. }
  2309. }
  2310. } simpleMapColumnMapper;
  2311. class AttributeMapColumnMapper : implements CassandraColumnMapper
  2312. {
  2313. public:
  2314. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2315. {
  2316. CassandraIterator elems(cass_iterator_from_map(value));
  2317. while (cass_iterator_next(elems))
  2318. {
  2319. rtlDataAttr str;
  2320. unsigned chars;
  2321. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  2322. StringBuffer s("@");
  2323. s.append(chars, str.getstr());
  2324. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  2325. }
  2326. return row;
  2327. }
  2328. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2329. {
  2330. // NOTE - name here provides a list of attributes that we should NOT be mapping
  2331. Owned<IAttributeIterator> attrs = row->getAttributes();
  2332. unsigned numItems = attrs->count();
  2333. ForEach(*attrs)
  2334. {
  2335. const char *key = attrs->queryName();
  2336. if (strstr(name, key) == NULL) // MORE - should really check that the following char is a @
  2337. numItems++;
  2338. }
  2339. if (numItems)
  2340. {
  2341. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2342. ForEach(*attrs)
  2343. {
  2344. const char *key = attrs->queryName();
  2345. if (strstr(name, key) == NULL) // MORE - should really check that the following char is a @
  2346. {
  2347. const char *value = attrs->queryValue();
  2348. check(cass_collection_append_string(collection, cass_string_init(key+1))); // skip the @
  2349. check(cass_collection_append_string(collection, cass_string_init(value)));
  2350. }
  2351. }
  2352. check(cass_statement_bind_collection(statement, idx, collection));
  2353. }
  2354. }
  2355. } attributeMapColumnMapper;
  2356. class GraphMapColumnMapper : implements CassandraColumnMapper
  2357. {
  2358. public:
  2359. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2360. : elemName(_elemName), nameAttr(_nameAttr)
  2361. {
  2362. }
  2363. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2364. {
  2365. Owned<IPTree> map = createPTree(name);
  2366. CassandraIterator elems(cass_iterator_from_map(value));
  2367. while (cass_iterator_next(elems))
  2368. {
  2369. rtlDataAttr str;
  2370. unsigned chars;
  2371. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  2372. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  2373. map->addPropTree(elemName, child.getClear());
  2374. }
  2375. row->addPropTree(name, map.getClear());
  2376. return row;
  2377. }
  2378. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2379. {
  2380. Owned<IPTree> child = row->getPropTree(name);
  2381. unsigned numItems = child->numChildren();
  2382. if (numItems)
  2383. {
  2384. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  2385. Owned<IPTreeIterator> items = child->getElements("*");
  2386. ForEach(*items)
  2387. {
  2388. IPTree &item = items->query();
  2389. const char *key = item.queryProp(nameAttr);
  2390. // MORE - may need to read, and probably should write, compressed. At least for graphs
  2391. StringBuffer value;
  2392. ::toXML(&item, value, 0, 0);
  2393. if (key && value.length())
  2394. {
  2395. check(cass_collection_append_string(collection, cass_string_init(key)));
  2396. check(cass_collection_append_string(collection, cass_string_init(value)));
  2397. }
  2398. }
  2399. check(cass_statement_bind_collection(statement, idx, collection));
  2400. }
  2401. }
  2402. private:
  2403. const char *elemName;
  2404. const char *nameAttr;
  2405. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid");
  2406. class AssociationsMapColumnMapper : public GraphMapColumnMapper
  2407. {
  2408. public:
  2409. AssociationsMapColumnMapper(const char *_elemName, const char *_nameAttr)
  2410. : GraphMapColumnMapper(_elemName, _nameAttr)
  2411. {
  2412. }
  2413. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2414. {
  2415. // Name is "Query/Associated ...
  2416. IPTree *query = row->queryPropTree("Query");
  2417. if (!query)
  2418. {
  2419. query = createPTree("Query");
  2420. row->setPropTree("Query", query);
  2421. }
  2422. return GraphMapColumnMapper::toXML(query, "Associated", value);
  2423. }
  2424. } associationsMapColumnMapper("File", "@filename");
  2425. class PluginListColumnMapper : implements CassandraColumnMapper
  2426. {
  2427. public:
  2428. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  2429. : elemName(_elemName), nameAttr(_nameAttr)
  2430. {
  2431. }
  2432. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  2433. {
  2434. Owned<IPTree> map = createPTree(name);
  2435. CassandraIterator elems(cass_iterator_from_collection(value));
  2436. while (cass_iterator_next(elems))
  2437. {
  2438. Owned<IPTree> child = createPTree(elemName);
  2439. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  2440. map->addPropTree(elemName, child.getClear());
  2441. }
  2442. row->addPropTree(name, map.getClear());
  2443. return row;
  2444. }
  2445. virtual void fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
  2446. {
  2447. Owned<IPTree> child = row->getPropTree(name);
  2448. unsigned numItems = child->numChildren();
  2449. if (numItems)
  2450. {
  2451. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  2452. Owned<IPTreeIterator> items = child->getElements("*");
  2453. ForEach(*items)
  2454. {
  2455. IPTree &item = items->query();
  2456. const char *value = item.queryProp(nameAttr);
  2457. if (value)
  2458. check(cass_collection_append_string(collection, cass_string_init(value)));
  2459. }
  2460. check(cass_statement_bind_collection(statement, idx, collection));
  2461. }
  2462. }
  2463. private:
  2464. const char *elemName;
  2465. const char *nameAttr;
  2466. } pluginListColumnMapper("Plugin", "@dllname");
  2467. struct CassandraXmlMapping
  2468. {
  2469. const char *columnName;
  2470. const char *columnType;
  2471. const char *xpath;
  2472. CassandraColumnMapper &mapper;
  2473. };
  2474. const CassandraXmlMapping wuExceptionsMappings [] =
  2475. {
  2476. {"wuid", "text", NULL, rootNameColumnMapper},
  2477. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  2478. {"value", "text", ".", stringColumnMapper},
  2479. {"ts", "timeuuid", NULL, timestampColumnMapper}, // must be last since we don't bind it, so it would throw out the colidx values of following fields
  2480. { NULL, "wuExceptions", "((wuid), ts)", stringColumnMapper}
  2481. };
  2482. const CassandraXmlMapping wuStatisticsMappings [] =
  2483. {
  2484. {"wuid", "text", NULL, rootNameColumnMapper},
  2485. {"name", "text", "@name", stringColumnMapper},
  2486. {"attributes", "map<text, text>", "@name", attributeMapColumnMapper},
  2487. { NULL, "wuStatistics", "((wuid), name)", stringColumnMapper}
  2488. };
  2489. const CassandraXmlMapping workunitsMappings [] =
  2490. {
  2491. {"wuid", "text", NULL, rootNameColumnMapper},
  2492. {"clustername", "text", "@clusterName", stringColumnMapper},
  2493. {"jobname", "text", "@jobName", stringColumnMapper},
  2494. {"priorityclass", "int", "@priorityClass", intColumnMapper},
  2495. {"protected", "boolean", "@protected", boolColumnMapper},
  2496. {"scope", "text", "@scope", stringColumnMapper},
  2497. {"submitID", "text", "@submitID", stringColumnMapper},
  2498. {"state", "text", "@state", stringColumnMapper},
  2499. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  2500. {"attributes", "map<text, text>", "@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state", attributeMapColumnMapper}, // name is the suppression list
  2501. {"graphs", "map<text, text>", "Graphs", graphMapColumnMapper},
  2502. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  2503. {"query", "text", "Query/Text", stringColumnMapper},
  2504. {"associations", "map<text, text>", "Query/Associated", associationsMapColumnMapper},
  2505. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  2506. { NULL, "workunits", "((wuid))", stringColumnMapper}
  2507. };
  2508. const CassandraXmlMapping graphProgressMappings [] =
  2509. {
  2510. {"wuid", "text", NULL, rootNameColumnMapper},
  2511. {"graphID", "text", NULL, graphIdColumnMapper},
  2512. {"progress", "blob", NULL, progressColumnMapper}, // NOTE - order of these is significant - this creates the subtree that ones below will modify
  2513. {"subgraphID", "text", "@id", subgraphIdColumnMapper},
  2514. {"state", "int", "@_state", intColumnMapper},
  2515. { NULL, "graphprogress", "((wuid), graphid, subgraphid)", stringColumnMapper}
  2516. };
  2517. const CassandraXmlMapping wuResultsMappings [] =
  2518. {
  2519. {"wuid", "text", NULL, rootNameColumnMapper},
  2520. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, // Note - special sequences indicate Variable or Temporary...
  2521. {"name", "text", "@name", stringColumnMapper},
  2522. {"format", "text", "@format", stringColumnMapper}, // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
  2523. {"status", "text", "@status", stringColumnMapper},
  2524. {"rowcount", "int", "rowCount", bigintColumnMapper},// This is the number of rows in result (which may be stored in a file rather than in value)
  2525. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},// This is the number of rows in value
  2526. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
  2527. {"logicalName", "text", "logicalName", stringColumnMapper}, // either this or value will be present once result status is "calculated"
  2528. {"value", "blob", "Value", blobColumnMapper},
  2529. { NULL, "wuResults", "((wuid), sequence, name)", stringColumnMapper}
  2530. };
  2531. int getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, StringBuffer &tableName)
  2532. {
  2533. int numFields = 0;
  2534. while (mappings->columnName)
  2535. {
  2536. names.appendf(",%s", mappings->columnName);
  2537. if (strcmp(mappings->columnType, "timeuuid")==0)
  2538. bindings.appendf(",now()");
  2539. else
  2540. {
  2541. bindings.appendf(",?");
  2542. numFields++;
  2543. }
  2544. mappings++;
  2545. }
  2546. tableName.append(mappings->columnType);
  2547. return numFields;
  2548. }
  2549. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  2550. {
  2551. StringBuffer fields;
  2552. while (mappings->columnName)
  2553. {
  2554. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  2555. mappings++;
  2556. }
  2557. return out.appendf("CREATE TABLE IF NOT EXISTS HPCC.%s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
  2558. }
  2559. const CassResult *fetchDataForWu(const char *wuid, CassSession *session, const CassandraXmlMapping *mappings)
  2560. {
  2561. StringBuffer names;
  2562. StringBuffer bindings;
  2563. StringBuffer tableName;
  2564. getFieldNames(mappings+1, names, bindings, tableName); // mappings+1 means we don't return the wuid column
  2565. VStringBuffer selectQuery("select %s from HPCC.%s where wuid='%s';", names.str()+1, tableName.str(), wuid);
  2566. CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
  2567. CassandraFuture future(cass_session_execute(session, statement));
  2568. future.wait("execute");
  2569. return cass_future_get_result(future);
  2570. }
  2571. void executeSimpleCommand(CassSession *session, const char *command)
  2572. {
  2573. CassandraStatement statement(cass_statement_new(cass_string_init(command), 0));
  2574. CassandraFuture future(cass_session_execute(session, statement));
  2575. future.wait("execute");
  2576. }
  2577. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  2578. {
  2579. StringBuffer schema;
  2580. executeSimpleCommand(session, describeTable(mappings, schema));
  2581. }
  2582. extern void simpleXMLtoCassandra(CassSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML)
  2583. {
  2584. StringBuffer names;
  2585. StringBuffer bindings;
  2586. StringBuffer tableName;
  2587. int numBound = getFieldNames(mappings, names, bindings, tableName);
  2588. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2589. CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(insertQuery)));
  2590. futurePrep.wait("prepare statement");
  2591. CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
  2592. CassandraStatement update(cass_prepared_bind(prepared));
  2593. check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
  2594. unsigned colidx = 1;
  2595. while (mappings[colidx].columnName)
  2596. {
  2597. mappings[colidx].mapper.fromXML(update, colidx, inXML, mappings[colidx].xpath);
  2598. colidx++;
  2599. }
  2600. check(cass_batch_add_statement(batch, update));
  2601. }
  2602. extern void childXMLtoCassandra(CassSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, int defaultValue)
  2603. {
  2604. if (inXML->hasProp(xpath))
  2605. {
  2606. StringBuffer names;
  2607. StringBuffer bindings;
  2608. StringBuffer tableName;
  2609. int numBound = getFieldNames(mappings, names, bindings, tableName);
  2610. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2611. CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(insertQuery)));
  2612. futurePrep.wait("prepare statement");
  2613. CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
  2614. Owned<IPTreeIterator> results = inXML->getElements(xpath);
  2615. ForEach(*results)
  2616. {
  2617. IPTree &result = results->query();
  2618. CassandraStatement update(cass_prepared_bind(prepared));
  2619. mappings[0].mapper.fromXML(update, 0, inXML, mappings[0].xpath);
  2620. unsigned colidx = 1;
  2621. while (mappings[colidx].columnName)
  2622. {
  2623. mappings[colidx].mapper.fromXML(update, colidx, &result, mappings[colidx].xpath, defaultValue);
  2624. colidx++;
  2625. }
  2626. check(cass_batch_add_statement(batch, update));
  2627. }
  2628. }
  2629. }
  2630. extern void cassandraToChildXML(CassSession *session, const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuTree, const char *parentName, const char *childName)
  2631. {
  2632. CassandraResult result(fetchDataForWu(wuid, session, mappings));
  2633. Owned<IPTree> parent = createPTree(parentName);
  2634. CassandraIterator rows(cass_iterator_from_result(result));
  2635. while (cass_iterator_next(rows))
  2636. {
  2637. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2638. Owned<IPTree> child = createPTree(childName);
  2639. unsigned colidx = 1;
  2640. while (cass_iterator_next(cols))
  2641. {
  2642. assertex(mappings[colidx].columnName);
  2643. const CassValue *value = cass_iterator_get_column(cols);
  2644. if (value && !cass_value_is_null(value))
  2645. mappings[colidx].mapper.toXML(child, mappings[colidx].xpath, value);
  2646. colidx++;
  2647. }
  2648. parent->addPropTree(childName, child.getClear());
  2649. }
  2650. wuTree->addPropTree(parentName, parent.getClear());
  2651. }
  2652. extern void wuResultsXMLtoCassandra(CassSession *session, CassBatch *batch, IPTree *inXML, const char *xpath, int defaultSequence)
  2653. {
  2654. childXMLtoCassandra(session, batch, wuResultsMappings, inXML, xpath, defaultSequence);
  2655. }
  2656. extern void cassandraToWuResultsXML(CassSession *session, const char *wuid, IPTree *wuTree)
  2657. {
  2658. CassandraResult result(fetchDataForWu(wuid, session, wuResultsMappings));
  2659. Owned<IPTree> results;
  2660. Owned<IPTree> variables;
  2661. Owned<IPTree> temporaries;
  2662. CassandraIterator rows(cass_iterator_from_result(result));
  2663. while (cass_iterator_next(rows))
  2664. {
  2665. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2666. if (!cass_iterator_next(cols))
  2667. fail("No column found reading wuresults.sequence");
  2668. const CassValue *sequenceValue = cass_iterator_get_column(cols);
  2669. int sequence = getSignedResult(NULL, sequenceValue);
  2670. Owned<IPTree> child;
  2671. IPTree *parent;
  2672. switch (sequence)
  2673. {
  2674. case ResultSequenceStored:
  2675. if (!variables)
  2676. variables.setown(createPTree("Variables"));
  2677. child.setown(createPTree("Variable"));
  2678. parent = variables;
  2679. break;
  2680. case ResultSequenceInternal:
  2681. case ResultSequenceOnce:
  2682. if (!temporaries)
  2683. temporaries.setown(createPTree("Temporaries"));
  2684. child.setown(createPTree("Variable"));
  2685. parent = temporaries;
  2686. break;
  2687. default:
  2688. if (!results)
  2689. results.setown(createPTree("Results"));
  2690. child.setown(createPTree("Result"));
  2691. parent = results;
  2692. break;
  2693. }
  2694. unsigned colidx = 2;
  2695. while (cass_iterator_next(cols))
  2696. {
  2697. assertex(wuResultsMappings[colidx].columnName);
  2698. const CassValue *value = cass_iterator_get_column(cols);
  2699. if (value && !cass_value_is_null(value))
  2700. wuResultsMappings[colidx].mapper.toXML(child, wuResultsMappings[colidx].xpath, value);
  2701. colidx++;
  2702. }
  2703. const char *childName = child->queryName();
  2704. parent->addPropTree(childName, child.getClear());
  2705. }
  2706. if (results)
  2707. wuTree->addPropTree("Results", results.getClear());
  2708. if (variables)
  2709. wuTree->addPropTree("Variables", variables.getClear());
  2710. if (temporaries)
  2711. wuTree->addPropTree("Temporaries", temporaries.getClear());
  2712. }
  2713. extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
  2714. {
  2715. StringBuffer names;
  2716. StringBuffer bindings;
  2717. StringBuffer tableName;
  2718. int numBound = getFieldNames(graphProgressMappings, names, bindings, tableName);
  2719. VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  2720. CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
  2721. CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(insertQuery)));
  2722. futurePrep.wait("prepare statement");
  2723. CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
  2724. Owned<IPTreeIterator> graphs = inXML->getElements("./graph*");
  2725. ForEach(*graphs)
  2726. {
  2727. IPTree &graph = graphs->query();
  2728. Owned<IPTreeIterator> subgraphs = graph.getElements("./node");
  2729. ForEach(*subgraphs)
  2730. {
  2731. IPTree &subgraph = subgraphs->query();
  2732. CassandraStatement update(cass_prepared_bind(prepared));
  2733. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2734. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  2735. unsigned colidx = 2;
  2736. while (graphProgressMappings[colidx].columnName)
  2737. {
  2738. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &subgraph, graphProgressMappings[colidx].xpath);
  2739. colidx++;
  2740. }
  2741. check(cass_batch_add_statement(batch, update));
  2742. }
  2743. // And one more with subgraphid = 0 for the graph status
  2744. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
  2745. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2746. graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
  2747. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  2748. unsigned colidx = 4; // we skip progress and subgraphid
  2749. while (graphProgressMappings[colidx].columnName)
  2750. {
  2751. graphProgressMappings[colidx].mapper.fromXML(update, colidx, &graph, graphProgressMappings[colidx].xpath);
  2752. colidx++;
  2753. }
  2754. check(cass_batch_add_statement(batch, update));
  2755. }
  2756. if (inXML->hasProp("Running"))
  2757. {
  2758. IPTree *running = inXML->queryPropTree("Running");
  2759. CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
  2760. graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
  2761. graphProgressMappings[1].mapper.fromXML(update, 1, running, graphProgressMappings[1].xpath);
  2762. graphProgressMappings[2].mapper.fromXML(update, 2, running, graphProgressMappings[2].xpath);
  2763. check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
  2764. check(cass_batch_add_statement(batch, update));
  2765. }
  2766. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  2767. futureBatch.wait("execute");
  2768. }
  2769. extern void cassandraToGraphProgressXML(CassSession *session, const char *wuid)
  2770. {
  2771. CassandraResult result(fetchDataForWu(wuid, session, graphProgressMappings));
  2772. Owned<IPTree> progress = createPTree(wuid);
  2773. CassandraIterator rows(cass_iterator_from_result(result));
  2774. while (cass_iterator_next(rows))
  2775. {
  2776. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2777. unsigned colidx = 1; // wuid is not returned
  2778. IPTree *ptree = progress;
  2779. while (cass_iterator_next(cols))
  2780. {
  2781. assertex(graphProgressMappings[colidx].columnName);
  2782. const CassValue *value = cass_iterator_get_column(cols);
  2783. // NOTE - this relies on the fact that progress is NULL when subgraphId=0, so that the status and id fields
  2784. // get set on the graph instead of on the child node in those cases.
  2785. if (value && !cass_value_is_null(value))
  2786. ptree = graphProgressMappings[colidx].mapper.toXML(ptree, graphProgressMappings[colidx].xpath, value);
  2787. colidx++;
  2788. }
  2789. }
  2790. StringBuffer out;
  2791. toXML(progress, out, 0, XML_SortTags|XML_Format);
  2792. printf("%s", out.str());
  2793. }
  2794. extern void workunitXMLtoCassandra(CassSession *session, IPTree *inXML)
  2795. {
  2796. const char *wuid = inXML->queryName();
  2797. CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
  2798. simpleXMLtoCassandra(session, batch, workunitsMappings, wuid, inXML);
  2799. wuResultsXMLtoCassandra(session, batch, inXML, "Results/Result", 0);
  2800. wuResultsXMLtoCassandra(session, batch, inXML, "Variables/Variable", ResultSequenceStored);
  2801. wuResultsXMLtoCassandra(session, batch, inXML, "Temporaries/Variable", ResultSequenceInternal); // NOTE - lookups may also request ResultSequenceOnce
  2802. childXMLtoCassandra(session, batch, wuExceptionsMappings, inXML, "Exceptions/Exception", 0);
  2803. childXMLtoCassandra(session, batch, wuStatisticsMappings, inXML, "Statistics/Statistic", 0);
  2804. CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
  2805. futureBatch.wait("execute");
  2806. }
  2807. extern IPTree *cassandraToWorkunitXML(CassSession *session, const char *wuid)
  2808. {
  2809. CassandraResult result(fetchDataForWu(wuid, session, workunitsMappings));
  2810. CassandraIterator rows(cass_iterator_from_result(result));
  2811. if (cass_iterator_next(rows)) // should just be one
  2812. {
  2813. Owned<IPTree> wuXML = createPTree(wuid);
  2814. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2815. wuXML->setPropTree("Query", createPTree("Query"));
  2816. wuXML->setProp("Query/@fetchEntire", "1");
  2817. unsigned colidx = 1; // wuid is not returned
  2818. while (cass_iterator_next(cols))
  2819. {
  2820. assertex(workunitsMappings[colidx].columnName);
  2821. const CassValue *value = cass_iterator_get_column(cols);
  2822. if (value && !cass_value_is_null(value))
  2823. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  2824. colidx++;
  2825. }
  2826. return wuXML.getClear();
  2827. }
  2828. else
  2829. return NULL;
  2830. }
  2831. extern void cassandraTestWorkunitXML()
  2832. {
  2833. CassandraCluster cluster(cass_cluster_new());
  2834. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2835. CassandraSession session(cass_session_new());
  2836. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  2837. future.wait("connect");
  2838. ensureTable(session, workunitsMappings);
  2839. ensureTable(session, wuResultsMappings);
  2840. ensureTable(session, wuExceptionsMappings);
  2841. ensureTable(session, wuStatisticsMappings);
  2842. Owned<IPTree> inXML = createPTreeFromXMLFile("/data/rchapman/hpcc/e.xml");
  2843. workunitXMLtoCassandra(session, inXML);
  2844. // Now the other way
  2845. const char *wuid = inXML->queryName();
  2846. Owned<IPTree> wuXML = cassandraToWorkunitXML(session, wuid);
  2847. cassandraToWuResultsXML(session, wuid, wuXML);
  2848. cassandraToChildXML(session, wuExceptionsMappings, wuid, wuXML, "Exceptions", "Exception");
  2849. cassandraToChildXML(session, wuStatisticsMappings, wuid, wuXML, "Statistics", "Statistic");
  2850. StringBuffer out;
  2851. toXML(wuXML, out, 0, XML_SortTags|XML_Format);
  2852. printf("%s", out.str());
  2853. }
  2854. extern void cassandraTestGraphProgressXML()
  2855. {
  2856. CassandraCluster cluster(cass_cluster_new());
  2857. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2858. CassandraSession session(cass_session_new());
  2859. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  2860. future.wait("connect");
  2861. ensureTable(session, graphProgressMappings);
  2862. Owned<IPTree> inXML = createPTreeFromXMLFile("/data/rchapman/hpcc/testing/regress/ecl/a.xml");
  2863. graphProgressXMLtoCassandra(session, inXML);
  2864. const char *wuid = inXML->queryName();
  2865. cassandraToGraphProgressXML(session, wuid);
  2866. }
  2867. extern void cassandraTest()
  2868. {
  2869. cassandraTestWorkunitXML();
  2870. //cassandraTestGraphProgressXML();
  2871. }
  2872. class CConstCassandraWorkUnit : public CLocalWorkUnit
  2873. {
  2874. public:
  2875. CConstCassandraWorkUnit(IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser)
  2876. : CLocalWorkUnit(secmgr, secuser)
  2877. {
  2878. CLocalWorkUnit::loadPTree(wuXML);
  2879. }
  2880. };
  2881. class CCasssandraWorkUnitFactory : implements CInterfaceOf<IWorkUnitFactory>
  2882. {
  2883. public:
  2884. CCasssandraWorkUnitFactory() : cluster(cass_cluster_new())
  2885. {
  2886. cass_cluster_set_contact_points(cluster, "127.0.0.1");
  2887. session.set(cass_session_new());
  2888. CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
  2889. future.wait("connect");
  2890. }
  2891. ~CCasssandraWorkUnitFactory()
  2892. {
  2893. }
  2894. virtual IWorkUnit * createWorkUnit(const char * app, const char * user) { UNIMPLEMENTED; }
  2895. virtual bool deleteWorkUnit(const char * wuid) { UNIMPLEMENTED; }
  2896. virtual IConstWorkUnit * openWorkUnit(const char * wuid, bool lock)
  2897. {
  2898. // MORE - what to do about lock?
  2899. Owned<IPTree> wuXML = cassandraToWorkunitXML(session, wuid);
  2900. if (wuXML)
  2901. return new CConstCassandraWorkUnit(wuXML.getClear(), NULL, NULL);
  2902. else
  2903. return NULL;
  2904. }
  2905. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner) { UNIMPLEMENTED; }
  2906. virtual IWorkUnit * updateWorkUnit(const char * wuid) { UNIMPLEMENTED; }
  2907. virtual int setTracingLevel(int newlevel) { UNIMPLEMENTED; }
  2908. virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char * user) { UNIMPLEMENTED; }
  2909. virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state) { UNIMPLEMENTED; }
  2910. virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl) { UNIMPLEMENTED; }
  2911. virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster) { UNIMPLEMENTED; }
  2912. virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath) { UNIMPLEMENTED; }
  2913. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField * sortorder, WUSortField * filters, const void * filterbuf, unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total) { UNIMPLEMENTED; }
  2914. virtual unsigned numWorkUnits() { UNIMPLEMENTED; }
  2915. virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf) { UNIMPLEMENTED; }
  2916. virtual void descheduleAllWorkUnits() { UNIMPLEMENTED; }
  2917. virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
  2918. private:
  2919. CassandraCluster cluster;
  2920. CassandraSession session;
  2921. };
  2922. } // namespace