mysqlembed.cpp 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "mysql.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #ifdef _WIN32
  28. #define EXPORT __declspec(dllexport)
  29. #else
  30. #define EXPORT
  31. #endif
  32. __declspec(noreturn) static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  33. static void UNSUPPORTED(const char *feature)
  34. {
  35. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in mysql plugin", feature);
  36. }
  37. static const char * compatibleVersions[] = {
  38. "MySQL Embed Helper 1.0.0",
  39. NULL };
  40. static const char *version = "MySQL Embed Helper 1.0.0";
  41. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  42. {
  43. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  44. {
  45. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  46. pbx->compatibleVersions = compatibleVersions;
  47. }
  48. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  49. return false;
  50. pb->magicVersion = PLUGIN_VERSION;
  51. pb->version = version;
  52. pb->moduleName = "mysql";
  53. pb->ECL = NULL;
  54. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  55. pb->description = "MySQL Embed Helper";
  56. return true;
  57. }
  58. namespace mysqlembed {
  59. __declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
  60. __declspec(noreturn) static void fail(const char *msg) __attribute__((noreturn));
  61. static void failx(const char *message, ...)
  62. {
  63. va_list args;
  64. va_start(args,message);
  65. StringBuffer msg;
  66. msg.append("mysql: ").valist_appendf(message,args);
  67. va_end(args);
  68. rtlFail(0, msg.str());
  69. }
  70. static void fail(const char *message)
  71. {
  72. StringBuffer msg;
  73. msg.append("mysql: ").append(message);
  74. rtlFail(0, msg.str());
  75. }
  76. // Wrappers to MySQL structures that require corresponding releases
  77. class MySQLConnection : public CInterface
  78. {
  79. public:
  80. IMPLEMENT_IINTERFACE;
  81. MySQLConnection(MYSQL *_conn) : conn(_conn)
  82. {
  83. }
  84. ~MySQLConnection()
  85. {
  86. if (conn)
  87. mysql_close(conn);
  88. }
  89. inline operator MYSQL *() const
  90. {
  91. return conn;
  92. }
  93. private:
  94. MySQLConnection(const MySQLConnection &);
  95. MYSQL *conn;
  96. };
  97. class MySQLResult : public CInterface
  98. {
  99. public:
  100. IMPLEMENT_IINTERFACE;
  101. MySQLResult(MYSQL_RES *_res) : res(_res)
  102. {
  103. }
  104. ~MySQLResult()
  105. {
  106. if (res)
  107. mysql_free_result(res);
  108. }
  109. inline operator MYSQL_RES *() const
  110. {
  111. return res;
  112. }
  113. private:
  114. MySQLResult(const MySQLResult &);
  115. MYSQL_RES *res;
  116. };
  117. class MySQLStatement : public CInterface
  118. {
  119. public:
  120. IMPLEMENT_IINTERFACE;
  121. MySQLStatement(MYSQL_STMT *_stmt) : stmt(_stmt)
  122. {
  123. }
  124. ~MySQLStatement()
  125. {
  126. if (stmt)
  127. mysql_stmt_close(stmt);
  128. }
  129. inline operator MYSQL_STMT *() const
  130. {
  131. return stmt;
  132. }
  133. private:
  134. MySQLStatement(const MySQLStatement &);
  135. MYSQL_STMT *stmt;
  136. };
  137. class MySQLBindingArray
  138. {
  139. public:
  140. MySQLBindingArray()
  141. {
  142. columns = 0;
  143. bindinfo = NULL;
  144. is_null = NULL;;
  145. error = NULL;
  146. lengths = NULL;
  147. }
  148. void init(unsigned count)
  149. {
  150. columns = count;
  151. if (columns)
  152. {
  153. bindinfo = new MYSQL_BIND [columns];
  154. is_null = new my_bool [columns];
  155. error = new my_bool [columns];
  156. lengths = new unsigned long [columns];
  157. memset(bindinfo, 0, columns * sizeof(bindinfo[0]));
  158. memset(is_null, 0, columns * sizeof(is_null[0]));
  159. memset(error, 0, columns * sizeof(error[0]));
  160. memset(lengths, 0, columns * sizeof(lengths[0]));
  161. for (int i = 0; i < columns; i++)
  162. {
  163. bindinfo[i].is_null = &is_null[i];
  164. bindinfo[i].length = &lengths[i];
  165. bindinfo[i].error = &error[i];
  166. }
  167. }
  168. }
  169. void bindResults(MYSQL_RES *res)
  170. {
  171. init(mysql_num_fields(res));
  172. for (int i = 0; i < columns; i++)
  173. {
  174. MYSQL_FIELD *col = mysql_fetch_field_direct(res, i);
  175. switch (col->type)
  176. {
  177. case MYSQL_TYPE_DECIMAL:
  178. case MYSQL_TYPE_NEWDECIMAL:
  179. bindinfo[i].buffer_type = MYSQL_TYPE_STRING;
  180. bindinfo[i].buffer_length = 100; // MORE - is there a better guess?
  181. break;
  182. case MYSQL_TYPE_TIMESTAMP:
  183. case MYSQL_TYPE_DATETIME:
  184. case MYSQL_TYPE_TIME:
  185. case MYSQL_TYPE_DATE:
  186. bindinfo[i].buffer_type = col->type;
  187. bindinfo[i].buffer_length = sizeof(MYSQL_TIME);
  188. break;
  189. default:
  190. bindinfo[i].buffer_type = col->type;
  191. bindinfo[i].buffer_length = col->length;
  192. break;
  193. }
  194. bindinfo[i].buffer = rtlMalloc(bindinfo[i].buffer_length);
  195. }
  196. }
  197. ~MySQLBindingArray()
  198. {
  199. for (int i = 0; i < columns; i++)
  200. {
  201. rtlFree(bindinfo[i].buffer);
  202. }
  203. delete [] bindinfo;
  204. delete [] is_null;
  205. delete [] error;
  206. delete [] lengths;
  207. }
  208. inline int numColumns() const
  209. {
  210. return columns;
  211. }
  212. inline MYSQL_BIND &queryColumn(int colIdx, const char *name) const
  213. {
  214. if (colIdx >= columns)
  215. {
  216. VStringBuffer error("No matching bound column for parameter %d", colIdx);
  217. if (name)
  218. error.appendf(" (%s)", name);
  219. fail(error);
  220. }
  221. return bindinfo[colIdx];
  222. }
  223. inline MYSQL_BIND *queryBindings() const
  224. {
  225. return bindinfo;
  226. }
  227. private:
  228. MYSQL_BIND *bindinfo;
  229. my_bool *is_null;
  230. my_bool *error;
  231. unsigned long *lengths;
  232. int columns;
  233. };
  234. class MySQLPreparedStatement : public CInterface
  235. {
  236. public:
  237. IMPLEMENT_IINTERFACE;
  238. MySQLPreparedStatement(MySQLConnection *_conn, MySQLStatement *_stmt)
  239. : conn(_conn), stmt(_stmt)
  240. {
  241. // Create bindings for input parameters
  242. inputBindings.init(mysql_stmt_param_count(*stmt));
  243. // And for results
  244. res.setown(new MySQLResult(mysql_stmt_result_metadata(*stmt)));
  245. if (*res)
  246. {
  247. resultBindings.bindResults(*res);
  248. /* Bind the result buffers */
  249. if (mysql_stmt_bind_result(*stmt, resultBindings.queryBindings()))
  250. fail(mysql_stmt_error(*stmt));
  251. }
  252. else if (mysql_stmt_errno(*stmt)) // SQL actions don't return results...
  253. fail(mysql_stmt_error(*stmt));
  254. }
  255. ~MySQLPreparedStatement()
  256. {
  257. stop();
  258. }
  259. inline void stop()
  260. {
  261. res.clear();
  262. stmt.clear();
  263. }
  264. bool next()
  265. {
  266. if (!stmt)
  267. return false;
  268. int rc = mysql_stmt_fetch(*stmt);
  269. if (rc == MYSQL_NO_DATA)
  270. return false;
  271. else if (rc)
  272. fail(mysql_stmt_error(*stmt));
  273. else
  274. return true;
  275. }
  276. void execute()
  277. {
  278. assertex(stmt && *stmt);
  279. if (inputBindings.numColumns() && mysql_stmt_bind_param(*stmt, inputBindings.queryBindings()))
  280. fail(mysql_stmt_error(*stmt));
  281. if (mysql_stmt_execute(*stmt))
  282. fail(mysql_stmt_error(*stmt));
  283. }
  284. inline const MySQLBindingArray &queryResultBindings() const
  285. {
  286. return resultBindings;
  287. }
  288. inline const MySQLBindingArray &queryInputBindings() const
  289. {
  290. return inputBindings;
  291. }
  292. inline bool hasResult() const
  293. {
  294. return *res != NULL;
  295. }
  296. protected:
  297. Linked<MySQLConnection> conn;
  298. Linked<MySQLStatement> stmt;
  299. Owned<MySQLResult> res;
  300. MySQLBindingArray inputBindings;
  301. MySQLBindingArray resultBindings;
  302. };
  303. // Conversions from MySQL values to ECL data
  304. __declspec(noreturn) static void typeError(const char *expected, const RtlFieldInfo *field) __attribute__((noreturn));
  305. static void typeError(const char *expected, const RtlFieldInfo *field)
  306. {
  307. VStringBuffer msg("mysql: type mismatch - %s expected", expected);
  308. if (field)
  309. msg.appendf(" for field %s", field->name->queryStr());
  310. rtlFail(0, msg.str());
  311. }
  312. static bool isInteger(enum_field_types type)
  313. {
  314. switch (type)
  315. {
  316. case MYSQL_TYPE_TINY:
  317. case MYSQL_TYPE_SHORT:
  318. case MYSQL_TYPE_LONG:
  319. case MYSQL_TYPE_LONGLONG:
  320. case MYSQL_TYPE_INT24:
  321. return true;
  322. default:
  323. return false;
  324. }
  325. }
  326. static bool isDateTime(enum_field_types type)
  327. {
  328. switch (type)
  329. {
  330. case MYSQL_TYPE_TIMESTAMP:
  331. case MYSQL_TYPE_DATETIME:
  332. case MYSQL_TYPE_DATE:
  333. case MYSQL_TYPE_TIME:
  334. return true;
  335. default:
  336. return false;
  337. }
  338. }
  339. static bool isString(enum_field_types type)
  340. {
  341. switch (type)
  342. {
  343. case MYSQL_TYPE_TINY_BLOB:
  344. case MYSQL_TYPE_MEDIUM_BLOB:
  345. case MYSQL_TYPE_LONG_BLOB:
  346. case MYSQL_TYPE_BLOB:
  347. case MYSQL_TYPE_STRING:
  348. case MYSQL_TYPE_VAR_STRING:
  349. return true;
  350. default:
  351. return false;
  352. }
  353. }
  354. static unsigned __int64 getDateTimeValue(const MYSQL_BIND &bound)
  355. {
  356. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  357. switch (bound.buffer_type)
  358. {
  359. case MYSQL_TYPE_TIMESTAMP:
  360. case MYSQL_TYPE_DATETIME:
  361. //What format should this be? Possibly a timestamp_t
  362. return (unsigned __int64)((time->year * 10000) + (time->month * 100) + (time->day)) * 1000000 +
  363. (time->hour * 10000) + (time->minute * 100) + (time->second);
  364. case MYSQL_TYPE_DATE:
  365. return (time->year * 10000) + (time->month * 100) + (time->day);
  366. case MYSQL_TYPE_TIME:
  367. return (time->hour * 10000) + (time->minute * 100) + (time->second);
  368. default:
  369. throwUnexpected();
  370. }
  371. }
  372. static void getDateTimeText(const MYSQL_BIND &bound, size32_t &chars, char * &result)
  373. {
  374. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  375. char temp[20];
  376. switch (bound.buffer_type)
  377. {
  378. case MYSQL_TYPE_TIMESTAMP:
  379. case MYSQL_TYPE_DATETIME:
  380. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u %02u:%02u:%02u", time->year, time->month, time->day, time->hour, time->minute, time->second);
  381. break;
  382. case MYSQL_TYPE_DATE:
  383. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u", time->year, time->month, time->day);
  384. break;
  385. case MYSQL_TYPE_TIME:
  386. _snprintf(temp, sizeof(temp), "%02u:%02u:%02u", time->hour, time->minute, time->second);
  387. break;
  388. default:
  389. throwUnexpected();
  390. }
  391. rtlStrToStrX(chars, result, strlen(temp), temp);
  392. }
  393. static bool getBooleanResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  394. {
  395. if (*bound.is_null)
  396. {
  397. NullFieldProcessor p(field);
  398. return p.boolResult;
  399. }
  400. if (!isInteger(bound.buffer_type))
  401. typeError("boolean", field);
  402. return rtlReadUInt(bound.buffer, *bound.length) != 0;
  403. }
  404. static void getDataResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, void * &result)
  405. {
  406. if (*bound.is_null)
  407. {
  408. NullFieldProcessor p(field);
  409. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  410. return;
  411. }
  412. if (isString(bound.buffer_type))
  413. rtlStrToDataX(chars, result, *bound.length, bound.buffer); // This feels like it may not work to me - will preallocate rather larger than we want
  414. else
  415. typeError("blob", field);
  416. }
  417. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  418. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  419. static double getRealResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  420. {
  421. if (*bound.is_null)
  422. {
  423. NullFieldProcessor p(field);
  424. return p.doubleResult;
  425. }
  426. if (isInteger(bound.buffer_type))
  427. {
  428. if (bound.is_unsigned)
  429. return (double) getUnsignedResult(field, bound);
  430. else
  431. return (double) getSignedResult(field, bound);
  432. }
  433. else if (bound.buffer_type == MYSQL_TYPE_FLOAT)
  434. return * (float *) bound.buffer;
  435. else if (bound.buffer_type == MYSQL_TYPE_DOUBLE)
  436. return * (double *) bound.buffer;
  437. else
  438. typeError("double", field);
  439. }
  440. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  441. {
  442. if (*bound.is_null)
  443. {
  444. NullFieldProcessor p(field);
  445. return p.intResult;
  446. }
  447. if (isDateTime(bound.buffer_type))
  448. return getDateTimeValue(bound);
  449. if (isInteger(bound.buffer_type))
  450. {
  451. if (bound.is_unsigned)
  452. return (__int64) rtlReadUInt(bound.buffer, *bound.length);
  453. else
  454. return rtlReadInt(bound.buffer, *bound.length);
  455. }
  456. else
  457. typeError("integer", field);
  458. }
  459. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  460. {
  461. if (*bound.is_null)
  462. {
  463. NullFieldProcessor p(field);
  464. return p.uintResult;
  465. }
  466. if (isDateTime(bound.buffer_type))
  467. return getDateTimeValue(bound);
  468. if (!isInteger(bound.buffer_type))
  469. typeError("integer", field);
  470. if (bound.is_unsigned)
  471. return rtlReadUInt(bound.buffer, *bound.length);
  472. else
  473. return (unsigned __int64) rtlReadInt(bound.buffer, *bound.length);
  474. }
  475. static void getStringResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  476. {
  477. if (*bound.is_null)
  478. {
  479. NullFieldProcessor p(field);
  480. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  481. return;
  482. }
  483. if (isDateTime(bound.buffer_type))
  484. {
  485. getDateTimeText(bound, chars, result);
  486. return;
  487. }
  488. if (!isString(bound.buffer_type))
  489. typeError("string", field);
  490. const char *text = (const char *) bound.buffer;
  491. unsigned long bytes = *bound.length;
  492. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  493. rtlUtf8ToStrX(chars, result, numchars, text);
  494. }
  495. static void getUTF8Result(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  496. {
  497. if (*bound.is_null)
  498. {
  499. NullFieldProcessor p(field);
  500. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  501. return;
  502. }
  503. if (isDateTime(bound.buffer_type))
  504. {
  505. getDateTimeText(bound, chars, result);
  506. return;
  507. }
  508. if (!isString(bound.buffer_type))
  509. typeError("string", field);
  510. const char *text = (const char *) bound.buffer;
  511. unsigned long bytes = *bound.length;
  512. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  513. rtlUtf8ToUtf8X(chars, result, numchars, text);
  514. }
  515. static void getUnicodeResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, UChar * &result)
  516. {
  517. if (*bound.is_null)
  518. {
  519. NullFieldProcessor p(field);
  520. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  521. return;
  522. }
  523. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  524. typeError("string", field);
  525. const char *text = (const char *) bound.buffer;
  526. unsigned long bytes = *bound.length;
  527. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  528. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  529. }
  530. static void getDecimalResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, Decimal &value)
  531. {
  532. if (*bound.is_null)
  533. {
  534. NullFieldProcessor p(field);
  535. value.set(p.decimalResult);
  536. return;
  537. }
  538. size32_t chars;
  539. rtlDataAttr result;
  540. mysqlembed::getStringResult(field, bound, chars, result.refstr());
  541. value.setString(chars, result.getstr());
  542. if (field)
  543. {
  544. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  545. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  546. }
  547. }
  548. static void createBindBuffer(MYSQL_BIND & bindInfo, enum_field_types sqlType, unsigned size)
  549. {
  550. if (size)
  551. {
  552. if (!bindInfo.buffer)
  553. {
  554. bindInfo.buffer_type = sqlType;
  555. bindInfo.buffer = rtlMalloc(size);
  556. }
  557. else
  558. assertex(bindInfo.buffer_type == sqlType);
  559. }
  560. else
  561. {
  562. // Buffer is reallocated each time - caller is responsible for it.
  563. bindInfo.buffer_type = sqlType;
  564. rtlFree(bindInfo.buffer);
  565. bindInfo.buffer = NULL;
  566. }
  567. }
  568. // A MySQLRowBuilder object is used to construct an ECL row from a MySQL row
  569. class MySQLRowBuilder : public CInterfaceOf<IFieldSource>
  570. {
  571. public:
  572. MySQLRowBuilder(const MySQLBindingArray &_resultInfo)
  573. : resultInfo(_resultInfo), colIdx(-1)
  574. {
  575. }
  576. virtual bool getBooleanResult(const RtlFieldInfo *field)
  577. {
  578. return mysqlembed::getBooleanResult(field, nextField(field));
  579. }
  580. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  581. {
  582. mysqlembed::getDataResult(field, nextField(field), len, result);
  583. }
  584. virtual double getRealResult(const RtlFieldInfo *field)
  585. {
  586. return mysqlembed::getRealResult(field, nextField(field));
  587. }
  588. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  589. {
  590. return mysqlembed::getSignedResult(field, nextField(field));
  591. }
  592. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  593. {
  594. return mysqlembed::getUnsignedResult(field, nextField(field));
  595. }
  596. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  597. {
  598. mysqlembed::getStringResult(field, nextField(field), chars, result);
  599. }
  600. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  601. {
  602. mysqlembed::getUTF8Result(field, nextField(field), chars, result);
  603. }
  604. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  605. {
  606. mysqlembed::getUnicodeResult(field, nextField(field), chars, result);
  607. }
  608. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  609. {
  610. mysqlembed::getDecimalResult(field, nextField(field), value);
  611. }
  612. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  613. {
  614. UNSUPPORTED("SET fields");
  615. }
  616. virtual bool processNextSet(const RtlFieldInfo * field)
  617. {
  618. throwUnexpected();
  619. }
  620. virtual void processBeginDataset(const RtlFieldInfo * field)
  621. {
  622. UNSUPPORTED("Nested datasets");
  623. }
  624. virtual void processBeginRow(const RtlFieldInfo * field)
  625. {
  626. }
  627. virtual bool processNextRow(const RtlFieldInfo * field)
  628. {
  629. throwUnexpected();
  630. }
  631. virtual void processEndSet(const RtlFieldInfo * field)
  632. {
  633. throwUnexpected();
  634. }
  635. virtual void processEndDataset(const RtlFieldInfo * field)
  636. {
  637. throwUnexpected();
  638. }
  639. virtual void processEndRow(const RtlFieldInfo * field)
  640. {
  641. }
  642. protected:
  643. const MYSQL_BIND &nextField(const RtlFieldInfo * field)
  644. {
  645. if (colIdx < resultInfo.numColumns())
  646. colIdx++;
  647. else
  648. fail("Too many fields in ECL output row");
  649. const MYSQL_BIND &column = resultInfo.queryColumn(colIdx,field->name->queryStr());
  650. if (*column.error)
  651. failx("Error fetching column %s", field->name->queryStr());
  652. return column;
  653. }
  654. const MySQLBindingArray &resultInfo;
  655. int colIdx;
  656. };
  657. // Bind MySQL variables from an ECL record
  658. class MySQLRecordBinder : public CInterfaceOf<IFieldProcessor>
  659. {
  660. public:
  661. MySQLRecordBinder(const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  662. : typeInfo(_typeInfo), bindings(_bindings), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  663. {
  664. }
  665. int numFields()
  666. {
  667. int count = 0;
  668. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  669. assertex(fields);
  670. while (*fields++)
  671. count++;
  672. return count;
  673. }
  674. void processRow(const byte *row)
  675. {
  676. thisParam = firstParam;
  677. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  678. }
  679. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  680. {
  681. size32_t utf8chars;
  682. char *utf8;
  683. rtlStrToUtf8X(utf8chars, utf8, len, value);
  684. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  685. bindInfo.buffer = utf8;
  686. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  687. bindInfo.length = &bindInfo.buffer_length;
  688. }
  689. virtual void processBool(bool value, const RtlFieldInfo * field)
  690. {
  691. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_TINY, sizeof(value));
  692. * (bool *) bindInfo.buffer = value;
  693. bindInfo.is_unsigned = true;
  694. }
  695. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  696. {
  697. size32_t bytes;
  698. void *data;
  699. rtlStrToDataX(bytes, data, len, value);
  700. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_BLOB, 0);
  701. bindInfo.buffer = data;
  702. bindInfo.buffer_length = bytes;
  703. bindInfo.length = &bindInfo.buffer_length;
  704. }
  705. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  706. {
  707. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  708. * (__int64 *) bindInfo.buffer = value;
  709. bindInfo.is_unsigned = false;
  710. }
  711. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  712. {
  713. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  714. * (unsigned __int64 *) bindInfo.buffer = value;
  715. bindInfo.is_unsigned = true;
  716. }
  717. virtual void processReal(double value, const RtlFieldInfo * field)
  718. {
  719. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_DOUBLE, sizeof(value));
  720. * (double *) bindInfo.buffer = value;
  721. }
  722. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  723. {
  724. Decimal val;
  725. size32_t bytes;
  726. char *data;
  727. val.setDecimal(digits, precision, value);
  728. val.getStringX(bytes, data);
  729. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  730. bindInfo.buffer = data;
  731. bindInfo.buffer_length = bytes;
  732. bindInfo.length = &bindInfo.buffer_length;
  733. }
  734. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  735. {
  736. Decimal val;
  737. size32_t bytes;
  738. char *data;
  739. val.setUDecimal(digits, precision, value);
  740. val.getStringX(bytes, data);
  741. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  742. bindInfo.buffer = data;
  743. bindInfo.buffer_length = bytes;
  744. bindInfo.length = &bindInfo.buffer_length;
  745. }
  746. virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
  747. {
  748. size32_t utf8chars;
  749. char *utf8;
  750. rtlUnicodeToUtf8X(utf8chars, utf8, len, value);
  751. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  752. bindInfo.buffer = utf8;
  753. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  754. bindInfo.length = &bindInfo.buffer_length;
  755. }
  756. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  757. {
  758. size32_t charCount;
  759. rtlDataAttr text;
  760. rtlQStrToStrX(charCount, text.refstr(), len, value);
  761. processString(charCount, text.getstr(), field);
  762. }
  763. virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
  764. {
  765. size32_t utf8chars;
  766. char *utf8;
  767. rtlUtf8ToUtf8X(utf8chars, utf8, len, value);
  768. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  769. bindInfo.buffer = utf8;
  770. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  771. bindInfo.length = &bindInfo.buffer_length;
  772. }
  773. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  774. {
  775. UNSUPPORTED("SET fields");
  776. return false;
  777. }
  778. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  779. {
  780. UNSUPPORTED("Nested datasets");
  781. return false;
  782. }
  783. virtual bool processBeginRow(const RtlFieldInfo * field)
  784. {
  785. return true;
  786. }
  787. virtual void processEndSet(const RtlFieldInfo * field)
  788. {
  789. throwUnexpected();
  790. }
  791. virtual void processEndDataset(const RtlFieldInfo * field)
  792. {
  793. throwUnexpected();
  794. }
  795. virtual void processEndRow(const RtlFieldInfo * field)
  796. {
  797. }
  798. protected:
  799. MYSQL_BIND &createBindBuffer(enum_field_types sqlType, unsigned size)
  800. {
  801. MYSQL_BIND &bindInfo = bindings.queryColumn(thisParam++, NULL);
  802. mysqlembed::createBindBuffer(bindInfo, sqlType, size);
  803. return bindInfo;
  804. }
  805. const RtlTypeInfo *typeInfo;
  806. const MySQLBindingArray &bindings;
  807. int firstParam;
  808. RtlFieldStrInfo dummyField;
  809. int thisParam;
  810. };
  811. //
  812. class MySQLDatasetBinder : public MySQLRecordBinder
  813. {
  814. public:
  815. MySQLDatasetBinder(IRowStream * _input, const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  816. : input(_input), MySQLRecordBinder(_typeInfo, _bindings, _firstParam)
  817. {
  818. }
  819. bool bindNext()
  820. {
  821. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  822. if (!nextRow)
  823. return false;
  824. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  825. return true;
  826. }
  827. void executeAll(MySQLPreparedStatement *stmtInfo)
  828. {
  829. while (bindNext())
  830. {
  831. stmtInfo->execute();
  832. }
  833. }
  834. protected:
  835. Owned<IRowStream> input;
  836. };
  837. // A MySQL function that returns a dataset will return a MySQLRowStream object that can be
  838. // interrogated to return each row of the result in turn
  839. class MySQLRowStream : public CInterfaceOf<IRowStream>
  840. {
  841. public:
  842. MySQLRowStream(MySQLDatasetBinder *_inputStream, MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  843. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  844. {
  845. executePending = true;
  846. eof = false;
  847. }
  848. virtual const void *nextRow()
  849. {
  850. // A little complex when streaming data in as well as out - want to execute for every input record
  851. if (eof)
  852. return NULL;
  853. loop
  854. {
  855. if (executePending)
  856. {
  857. executePending = false;
  858. if (inputStream && !inputStream->bindNext())
  859. {
  860. noteEOF();
  861. return NULL;
  862. }
  863. stmtInfo->execute();
  864. }
  865. if (stmtInfo->next())
  866. break;
  867. if (inputStream)
  868. executePending = true;
  869. else
  870. {
  871. noteEOF();
  872. return NULL;
  873. }
  874. }
  875. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  876. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  877. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  878. assertex(typeInfo);
  879. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  880. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  881. return rowBuilder.finalizeRowClear(len);
  882. }
  883. virtual void stop()
  884. {
  885. resultAllocator.clear();
  886. stmtInfo->stop();
  887. }
  888. protected:
  889. void noteEOF()
  890. {
  891. if (!eof)
  892. {
  893. eof = true;
  894. stop();
  895. }
  896. }
  897. Linked<MySQLDatasetBinder> inputStream;
  898. Linked<MySQLPreparedStatement> stmtInfo;
  899. Linked<IEngineRowAllocator> resultAllocator;
  900. bool executePending;
  901. bool eof;
  902. };
  903. // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
  904. static __thread ThreadTermFunc threadHookChain;
  905. static __thread MySQLConnection *cachedConnection = NULL;
  906. static __thread const char *cachedOptions = NULL;
  907. static bool cachedConnectionMatches(const char *options)
  908. {
  909. return streq(options, cachedOptions);
  910. }
  911. static void clearCache()
  912. {
  913. ::Release(cachedConnection);
  914. cachedConnection = NULL;
  915. free((void *) cachedOptions);
  916. cachedOptions = NULL;
  917. }
  918. static bool mysqlInitialized = false;
  919. static __thread bool mysqlThreadInitialized = false;
  920. static CriticalSection initCrit;
  921. static void terminateMySqlThread()
  922. {
  923. clearCache();
  924. mysql_thread_end();
  925. mysqlThreadInitialized = false; // In case it was a threadpool thread...
  926. if (threadHookChain)
  927. {
  928. (*threadHookChain)();
  929. threadHookChain = NULL;
  930. }
  931. }
  932. static void initializeMySqlThread()
  933. {
  934. if (!mysqlThreadInitialized)
  935. {
  936. {
  937. CriticalBlock b(initCrit);
  938. if (!mysqlInitialized)
  939. {
  940. mysqlInitialized = true;
  941. mysql_library_init(0, NULL, NULL);
  942. }
  943. }
  944. mysql_thread_init();
  945. threadHookChain = addThreadTermFunc(terminateMySqlThread);
  946. mysqlThreadInitialized = true;
  947. }
  948. }
  949. static void cacheConnection(MySQLConnection *connection, const char *options)
  950. {
  951. clearCache();
  952. cachedOptions = strdup(options);
  953. cachedConnection = LINK(connection);
  954. }
  955. enum MySQLOptionParamType
  956. {
  957. ParamTypeNone,
  958. ParamTypeString,
  959. ParamTypeUInt,
  960. ParamTypeULong,
  961. ParamTypeBool
  962. };
  963. struct MySQLOptionDefinition
  964. {
  965. const char *name;
  966. enum mysql_option option;
  967. MySQLOptionParamType paramType;
  968. };
  969. #define addoption(a,b) { #a, a, b }
  970. MySQLOptionDefinition options[] =
  971. {
  972. addoption(MYSQL_OPT_COMPRESS, ParamTypeNone),
  973. addoption(MYSQL_OPT_CONNECT_TIMEOUT, ParamTypeUInt),
  974. addoption(MYSQL_OPT_GUESS_CONNECTION, ParamTypeNone),
  975. addoption(MYSQL_OPT_LOCAL_INFILE, ParamTypeUInt),
  976. addoption(MYSQL_OPT_NAMED_PIPE, ParamTypeNone),
  977. addoption(MYSQL_OPT_PROTOCOL, ParamTypeUInt),
  978. addoption(MYSQL_OPT_READ_TIMEOUT, ParamTypeUInt),
  979. addoption(MYSQL_OPT_RECONNECT, ParamTypeBool),
  980. addoption(MYSQL_OPT_SSL_VERIFY_SERVER_CERT, ParamTypeBool),
  981. addoption(MYSQL_OPT_USE_EMBEDDED_CONNECTION, ParamTypeNone),
  982. addoption(MYSQL_OPT_USE_REMOTE_CONNECTION, ParamTypeNone),
  983. addoption(MYSQL_OPT_USE_RESULT, ParamTypeNone),
  984. addoption(MYSQL_OPT_WRITE_TIMEOUT, ParamTypeUInt),
  985. addoption(MYSQL_READ_DEFAULT_FILE, ParamTypeString),
  986. addoption(MYSQL_READ_DEFAULT_GROUP, ParamTypeString),
  987. addoption(MYSQL_REPORT_DATA_TRUNCATION, ParamTypeBool),
  988. addoption(MYSQL_SECURE_AUTH, ParamTypeBool),
  989. addoption(MYSQL_SET_CHARSET_DIR, ParamTypeString),
  990. addoption(MYSQL_SET_CHARSET_NAME, ParamTypeString),
  991. addoption(MYSQL_SET_CLIENT_IP, ParamTypeString),
  992. addoption(MYSQL_SHARED_MEMORY_BASE_NAME, ParamTypeString),
  993. #if MYSQL_VERSION_ID >= 50507
  994. addoption(MYSQL_DEFAULT_AUTH, ParamTypeString),
  995. addoption(MYSQL_PLUGIN_DIR, ParamTypeString),
  996. #endif
  997. #if (MYSQL_VERSION_ID >= 50601)
  998. addoption(MYSQL_OPT_BIND, ParamTypeString),
  999. #endif
  1000. #if (MYSQL_VERSION_ID >= 50603)
  1001. addoption(MYSQL_OPT_SSL_CA, ParamTypeString),
  1002. addoption(MYSQL_OPT_SSL_CAPATH, ParamTypeString),
  1003. addoption(MYSQL_OPT_SSL_CERT, ParamTypeString),
  1004. addoption(MYSQL_OPT_SSL_CIPHER, ParamTypeString),
  1005. addoption(MYSQL_OPT_SSL_CRL, ParamTypeString),
  1006. addoption(MYSQL_OPT_SSL_CRLPATH, ParamTypeString),
  1007. addoption(MYSQL_OPT_SSL_KEY, ParamTypeString),
  1008. #endif
  1009. #if (MYSQL_VERSION_ID >= 50606)
  1010. addoption(MYSQL_SERVER_PUBLIC_KEY, ParamTypeString),
  1011. #endif
  1012. #if (MYSQL_VERSION_ID >= 50527 && MYSQL_VERSION_ID < 50600) || MYSQL_VERSION_ID >= 50607
  1013. addoption(MYSQL_ENABLE_CLEARTEXT_PLUGIN, ParamTypeBool),
  1014. #endif
  1015. addoption(MYSQL_INIT_COMMAND, ParamTypeString),
  1016. #if (MYSQL_VERSION_ID >= 50610)
  1017. addoption(MYSQL_OPT_CAN_HANDLE_EXPIRED_PASSWORDS, ParamTypeBool),
  1018. #endif
  1019. #if (MYSQL_VERSION_ID >= 50703)
  1020. addoption(MYSQL_OPT_SSL_ENFORCE, ParamTypeBool),
  1021. #endif
  1022. #if (MYSQL_VERSION_ID >= 50709)
  1023. addoption(MYSQL_OPT_MAX_ALLOWED_PACKET, ParamTypeULong),
  1024. addoption(MYSQL_OPT_NET_BUFFER_LENGTH, ParamTypeULong),
  1025. #endif
  1026. #if (MYSQL_VERSION_ID >= 50710)
  1027. addoption(MYSQL_OPT_TLS_VERSION, ParamTypeString),
  1028. #endif
  1029. #if (MYSQL_VERSION_ID >= 50711)
  1030. addoption(MYSQL_OPT_SSL_MODE, ParamTypeUInt),
  1031. #endif
  1032. { nullptr, (enum mysql_option) 0, ParamTypeNone }
  1033. };
  1034. static MySQLOptionDefinition &lookupOption(const char *optName)
  1035. {
  1036. for (MySQLOptionDefinition *optDef = options; optDef->name != nullptr; optDef++)
  1037. {
  1038. if (stricmp(optName, optDef->name)==0)
  1039. return *optDef;
  1040. }
  1041. failx("Unknown option %s", optName);
  1042. }
  1043. class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1044. {
  1045. public:
  1046. MySQLEmbedFunctionContext(const char *options)
  1047. : nextParam(0)
  1048. {
  1049. const char *server = "localhost";
  1050. const char *user = "";
  1051. const char *password = "";
  1052. const char *database = "";
  1053. bool hasMySQLOpt = false;
  1054. bool caching = true;
  1055. unsigned port = 0;
  1056. StringArray opts;
  1057. opts.appendList(options, ",");
  1058. ForEachItemIn(idx, opts)
  1059. {
  1060. const char *opt = opts.item(idx);
  1061. const char *val = strchr(opt, '=');
  1062. if (val)
  1063. {
  1064. StringBuffer optName(val-opt, opt);
  1065. val++;
  1066. if (stricmp(optName, "server")==0)
  1067. server = val; // Note that lifetime of val is adequate for this to be safe
  1068. else if (stricmp(optName, "port")==0)
  1069. port = atoi(val);
  1070. else if (stricmp(optName, "user")==0)
  1071. user = val;
  1072. else if (stricmp(optName, "password")==0)
  1073. password = val;
  1074. else if (stricmp(optName, "database")==0)
  1075. database = val;
  1076. else if (stricmp(optName, "cache")==0)
  1077. caching = clipStrToBool(val);
  1078. else if (strnicmp(optName, "MYSQL_", 6)==0)
  1079. hasMySQLOpt = true;
  1080. else
  1081. failx("Unknown option %s", optName.str());
  1082. }
  1083. }
  1084. initializeMySqlThread();
  1085. if (caching && cachedConnection && cachedConnectionMatches(options))
  1086. {
  1087. conn.set(cachedConnection);
  1088. }
  1089. else
  1090. {
  1091. if (cachedConnection)
  1092. {
  1093. ::Release(cachedConnection);
  1094. cachedConnection = NULL;
  1095. }
  1096. conn.setown(new MySQLConnection(mysql_init(NULL)));
  1097. if (hasMySQLOpt)
  1098. {
  1099. ForEachItemIn(idx, opts)
  1100. {
  1101. const char *opt = opts.item(idx);
  1102. if (strnicmp(opt, "MYSQL_", 6)==0)
  1103. {
  1104. const char *val = strchr(opt, '=');
  1105. StringBuffer optName(opt);
  1106. if (val)
  1107. {
  1108. optName.setLength(val-opt);
  1109. val++;
  1110. }
  1111. MySQLOptionDefinition &optDef = lookupOption(optName);
  1112. int rc;
  1113. if (optDef.paramType == ParamTypeNone)
  1114. {
  1115. if (val)
  1116. failx("Option %s does not take a value", optName.str());
  1117. rc = mysql_options(*conn, optDef.option, nullptr);
  1118. }
  1119. else
  1120. {
  1121. if (!val)
  1122. failx("Option %s requires a value", optName.str());
  1123. switch (optDef.paramType)
  1124. {
  1125. case ParamTypeString:
  1126. rc = mysql_options(*conn, optDef.option, val);
  1127. break;
  1128. case ParamTypeUInt:
  1129. {
  1130. unsigned int oval = strtoul(val, nullptr, 10);
  1131. rc = mysql_options(*conn, optDef.option, (const char *) &oval);
  1132. break;
  1133. }
  1134. case ParamTypeULong:
  1135. {
  1136. unsigned long oval = strtoul(val, nullptr, 10);
  1137. rc = mysql_options(*conn, optDef.option, (const char *) &oval);
  1138. break;
  1139. }
  1140. case ParamTypeBool:
  1141. {
  1142. my_bool oval = clipStrToBool(val);
  1143. rc = mysql_options(*conn, optDef.option, (const char *) &oval);
  1144. break;
  1145. }
  1146. }
  1147. }
  1148. if (rc)
  1149. failx("Failed to set option %s (%s)", optName.str(), mysql_error(*conn));
  1150. }
  1151. }
  1152. }
  1153. if (!mysql_real_connect(*conn, server, user, password, database, port, NULL, 0))
  1154. failx("Failed to connect (%s)", mysql_error(*conn));
  1155. if (caching)
  1156. {
  1157. cacheConnection(conn, options);
  1158. }
  1159. }
  1160. }
  1161. virtual bool getBooleanResult()
  1162. {
  1163. bool ret = mysqlembed::getBooleanResult(NULL, getScalarResult());
  1164. checkSingleRow();
  1165. return ret;
  1166. }
  1167. virtual void getDataResult(size32_t &len, void * &result)
  1168. {
  1169. mysqlembed::getDataResult(NULL, getScalarResult(), len, result);
  1170. checkSingleRow();
  1171. }
  1172. virtual double getRealResult()
  1173. {
  1174. double ret = mysqlembed::getRealResult(NULL, getScalarResult());
  1175. checkSingleRow();
  1176. return ret;
  1177. }
  1178. virtual __int64 getSignedResult()
  1179. {
  1180. __int64 ret = mysqlembed::getSignedResult(NULL, getScalarResult());
  1181. checkSingleRow();
  1182. return ret;
  1183. }
  1184. virtual unsigned __int64 getUnsignedResult()
  1185. {
  1186. unsigned __int64 ret = mysqlembed::getUnsignedResult(NULL, getScalarResult());
  1187. checkSingleRow();
  1188. return ret;
  1189. }
  1190. virtual void getStringResult(size32_t &chars, char * &result)
  1191. {
  1192. mysqlembed::getStringResult(NULL, getScalarResult(), chars, result);
  1193. checkSingleRow();
  1194. }
  1195. virtual void getUTF8Result(size32_t &chars, char * &result)
  1196. {
  1197. mysqlembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1198. checkSingleRow();
  1199. }
  1200. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1201. {
  1202. mysqlembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1203. checkSingleRow();
  1204. }
  1205. virtual void getDecimalResult(Decimal &value)
  1206. {
  1207. mysqlembed::getDecimalResult(NULL, getScalarResult(), value);
  1208. checkSingleRow();
  1209. }
  1210. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1211. {
  1212. UNSUPPORTED("SET results");
  1213. }
  1214. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1215. {
  1216. return new MySQLRowStream(inputStream, stmtInfo, _resultAllocator);
  1217. }
  1218. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1219. {
  1220. if (!stmtInfo->hasResult())
  1221. typeError("row", NULL);
  1222. lazyExecute();
  1223. MySQLRowStream stream(NULL, stmtInfo, _resultAllocator);
  1224. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1225. roxiemem::OwnedConstRoxieRow ret2 = stream.nextRow();
  1226. stream.stop();
  1227. if (ret == NULL || ret2 != NULL) // Check for exactly one returned row
  1228. typeError("row", NULL);
  1229. return (byte *) ret.getClear();
  1230. }
  1231. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1232. {
  1233. lazyExecute();
  1234. if (!stmtInfo->next())
  1235. typeError("row", NULL);
  1236. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  1237. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1238. assertex(typeInfo);
  1239. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1240. size32_t ret = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  1241. if (stmtInfo->next())
  1242. typeError("row", NULL); // Check that a single row was returned
  1243. return ret;
  1244. }
  1245. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  1246. {
  1247. MySQLRecordBinder binder(metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam);
  1248. binder.processRow(val);
  1249. nextParam += binder.numFields();
  1250. }
  1251. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1252. {
  1253. // We only support a single dataset parameter...
  1254. if (inputStream)
  1255. {
  1256. fail("At most one dataset parameter supported");
  1257. }
  1258. inputStream.setown(new MySQLDatasetBinder(LINK(val), metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam));
  1259. nextParam += inputStream->numFields();
  1260. }
  1261. virtual void bindBooleanParam(const char *name, bool val)
  1262. {
  1263. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_TINY, sizeof(val));
  1264. * (bool *) bindInfo.buffer = val;
  1265. bindInfo.is_unsigned = true;
  1266. }
  1267. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1268. {
  1269. size32_t bytes;
  1270. void *data;
  1271. rtlStrToDataX(bytes, data, len, val);
  1272. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_BLOB, 0);
  1273. bindInfo.buffer = data;
  1274. bindInfo.buffer_length = bytes;
  1275. bindInfo.length = &bindInfo.buffer_length;
  1276. }
  1277. virtual void bindFloatParam(const char *name, float val)
  1278. {
  1279. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_FLOAT, sizeof(val));
  1280. * (float *) bindInfo.buffer = val;
  1281. }
  1282. virtual void bindRealParam(const char *name, double val)
  1283. {
  1284. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_DOUBLE, sizeof(val));
  1285. * (double *) bindInfo.buffer = val;
  1286. }
  1287. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1288. {
  1289. bindSignedParam(name, val);
  1290. }
  1291. virtual void bindSignedParam(const char *name, __int64 val)
  1292. {
  1293. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1294. * (__int64 *) bindInfo.buffer = val;
  1295. bindInfo.is_unsigned = false;
  1296. }
  1297. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1298. {
  1299. bindUnsignedParam(name, val);
  1300. }
  1301. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1302. {
  1303. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1304. * (unsigned __int64 *) bindInfo.buffer = val;
  1305. bindInfo.is_unsigned = true;
  1306. }
  1307. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1308. {
  1309. size32_t utf8chars;
  1310. char *utf8;
  1311. rtlStrToUtf8X(utf8chars, utf8, len, val);
  1312. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1313. bindInfo.buffer = utf8;
  1314. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1315. bindInfo.length = &bindInfo.buffer_length;
  1316. }
  1317. virtual void bindVStringParam(const char *name, const char *val)
  1318. {
  1319. bindStringParam(name, strlen(val), val);
  1320. }
  1321. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1322. {
  1323. size32_t utf8chars;
  1324. char *utf8;
  1325. rtlUtf8ToUtf8X(utf8chars, utf8, chars, val);
  1326. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1327. bindInfo.buffer = utf8;
  1328. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1329. bindInfo.length = &bindInfo.buffer_length;
  1330. }
  1331. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1332. {
  1333. size32_t utf8chars;
  1334. char *utf8;
  1335. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  1336. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1337. bindInfo.buffer = utf8;
  1338. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1339. bindInfo.length = &bindInfo.buffer_length;
  1340. }
  1341. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  1342. {
  1343. UNSUPPORTED("SET parameters"); // MySQL does support sets, so MIGHT be possible...
  1344. }
  1345. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  1346. {
  1347. return NULL;
  1348. }
  1349. virtual void paramWriterCommit(IInterface *writer)
  1350. {
  1351. }
  1352. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  1353. {
  1354. }
  1355. virtual void importFunction(size32_t lenChars, const char *text)
  1356. {
  1357. throwUnexpected();
  1358. }
  1359. virtual void compileEmbeddedScript(size32_t chars, const char *script)
  1360. {
  1361. size32_t len = rtlUtf8Size(chars, script);
  1362. Owned<MySQLStatement> stmt = new MySQLStatement(mysql_stmt_init(*conn));
  1363. if (!*stmt)
  1364. fail("failed to create statement");
  1365. if (mysql_stmt_prepare(*stmt, script, len))
  1366. fail(mysql_stmt_error(*stmt));
  1367. stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
  1368. }
  1369. virtual void callFunction()
  1370. {
  1371. if (nextParam != stmtInfo->queryInputBindings().numColumns())
  1372. failx("Not enough parameters supplied (%d parameters supplied, but statement has %d bound columns)", nextParam, stmtInfo->queryInputBindings().numColumns());
  1373. // We actually do the execute later, when the result is fetched
  1374. }
  1375. protected:
  1376. void lazyExecute()
  1377. {
  1378. if (inputStream)
  1379. inputStream->executeAll(stmtInfo);
  1380. else
  1381. stmtInfo->execute();
  1382. }
  1383. const MYSQL_BIND &getScalarResult()
  1384. {
  1385. if (!stmtInfo->hasResult() || stmtInfo->queryResultBindings().numColumns() != 1)
  1386. typeError("scalar", NULL);
  1387. lazyExecute();
  1388. if (!stmtInfo->next())
  1389. typeError("scalar", NULL);
  1390. return stmtInfo->queryResultBindings().queryColumn(0, NULL);
  1391. }
  1392. void checkSingleRow()
  1393. {
  1394. if (stmtInfo->next())
  1395. typeError("scalar", NULL);
  1396. }
  1397. inline MYSQL_BIND &findParameter(const char *name, enum_field_types sqlType, unsigned size)
  1398. {
  1399. // Everything is positional in MySQL
  1400. MYSQL_BIND &bindInfo = stmtInfo->queryInputBindings().queryColumn(nextParam++, name);
  1401. createBindBuffer(bindInfo, sqlType, size);
  1402. return bindInfo;
  1403. }
  1404. Owned<MySQLConnection> conn;
  1405. Owned<MySQLPreparedStatement> stmtInfo;
  1406. Owned<MySQLDatasetBinder> inputStream;
  1407. int nextParam;
  1408. };
  1409. class MySQLEmbedContext : public CInterfaceOf<IEmbedContext>
  1410. {
  1411. public:
  1412. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1413. {
  1414. return createFunctionContextEx(NULL, flags, options);
  1415. }
  1416. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1417. {
  1418. if (flags & EFimport)
  1419. UNSUPPORTED("IMPORT");
  1420. else
  1421. return new MySQLEmbedFunctionContext(options);
  1422. }
  1423. virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
  1424. {
  1425. throwUnexpected();
  1426. }
  1427. };
  1428. extern IEmbedContext* getEmbedContext()
  1429. {
  1430. return new MySQLEmbedContext();
  1431. }
  1432. extern bool syntaxCheck(const char *script)
  1433. {
  1434. return true; // MORE
  1435. }
  1436. } // namespace