mysqlembed.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "mysql.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield_imp.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #ifdef _WIN32
  28. #define EXPORT __declspec(dllexport)
  29. #else
  30. #define EXPORT
  31. #endif
  32. static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  33. static void UNSUPPORTED(const char *feature)
  34. {
  35. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in mysql plugin", feature);
  36. }
  37. static const char * compatibleVersions[] = {
  38. "MySQL Embed Helper 1.0.0",
  39. NULL };
  40. static const char *version = "MySQL Embed Helper 1.0.0";
  41. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  42. {
  43. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  44. {
  45. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  46. pbx->compatibleVersions = compatibleVersions;
  47. }
  48. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  49. return false;
  50. pb->magicVersion = PLUGIN_VERSION;
  51. pb->version = version;
  52. pb->moduleName = "mysql";
  53. pb->ECL = NULL;
  54. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  55. pb->description = "MySQL Embed Helper";
  56. return true;
  57. }
  58. namespace mysqlembed {
  59. static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  60. static void fail(const char *msg) __attribute__((noreturn));
  61. static void failx(const char *message, ...)
  62. {
  63. va_list args;
  64. va_start(args,message);
  65. StringBuffer msg;
  66. msg.append("mysql: ").valist_appendf(message,args);
  67. va_end(args);
  68. rtlFail(0, msg.str());
  69. }
  70. static void fail(const char *message)
  71. {
  72. StringBuffer msg;
  73. msg.append("mysql: ").append(message);
  74. rtlFail(0, msg.str());
  75. }
  76. // Wrappers to MySQL structures that require corresponding releases
  77. class MySQLConnection : public CInterface
  78. {
  79. public:
  80. IMPLEMENT_IINTERFACE;
  81. MySQLConnection(MYSQL *_conn) : conn(_conn)
  82. {
  83. }
  84. ~MySQLConnection()
  85. {
  86. if (conn)
  87. mysql_close(conn);
  88. }
  89. inline operator MYSQL *() const
  90. {
  91. return conn;
  92. }
  93. private:
  94. MySQLConnection(const MySQLConnection &);
  95. MYSQL *conn;
  96. };
  97. class MySQLResult : public CInterface
  98. {
  99. public:
  100. IMPLEMENT_IINTERFACE;
  101. MySQLResult(MYSQL_RES *_res) : res(_res)
  102. {
  103. }
  104. ~MySQLResult()
  105. {
  106. if (res)
  107. mysql_free_result(res);
  108. }
  109. inline operator MYSQL_RES *() const
  110. {
  111. return res;
  112. }
  113. private:
  114. MySQLResult(const MySQLResult &);
  115. MYSQL_RES *res;
  116. };
  117. class MySQLStatement : public CInterface
  118. {
  119. public:
  120. IMPLEMENT_IINTERFACE;
  121. MySQLStatement(MYSQL_STMT *_stmt) : stmt(_stmt)
  122. {
  123. }
  124. ~MySQLStatement()
  125. {
  126. if (stmt)
  127. mysql_stmt_close(stmt);
  128. }
  129. inline operator MYSQL_STMT *() const
  130. {
  131. return stmt;
  132. }
  133. private:
  134. MySQLStatement(const MySQLStatement &);
  135. MYSQL_STMT *stmt;
  136. };
  137. class MySQLBindingArray
  138. {
  139. public:
  140. MySQLBindingArray()
  141. {
  142. columns = 0;
  143. bindinfo = NULL;
  144. is_null = NULL;;
  145. error = NULL;
  146. lengths = NULL;
  147. }
  148. void init(unsigned count)
  149. {
  150. columns = count;
  151. if (columns)
  152. {
  153. bindinfo = new MYSQL_BIND [columns];
  154. is_null = new my_bool [columns];
  155. error = new my_bool [columns];
  156. lengths = new unsigned long [columns];
  157. memset(bindinfo, 0, columns * sizeof(bindinfo[0]));
  158. memset(is_null, 0, columns * sizeof(is_null[0]));
  159. memset(error, 0, columns * sizeof(error[0]));
  160. memset(lengths, 0, columns * sizeof(lengths[0]));
  161. for (int i = 0; i < columns; i++)
  162. {
  163. bindinfo[i].is_null = &is_null[i];
  164. bindinfo[i].length = &lengths[i];
  165. bindinfo[i].error = &error[i];
  166. }
  167. }
  168. }
  169. void bindResults(MYSQL_RES *res)
  170. {
  171. init(mysql_num_fields(res));
  172. for (int i = 0; i < columns; i++)
  173. {
  174. MYSQL_FIELD *col = mysql_fetch_field_direct(res, i);
  175. if (col->type == MYSQL_TYPE_DECIMAL || col->type == MYSQL_TYPE_NEWDECIMAL)
  176. {
  177. bindinfo[i].buffer_type = MYSQL_TYPE_STRING;
  178. bindinfo[i].buffer_length = 100; // MORE - is there a better guess?
  179. }
  180. else
  181. {
  182. bindinfo[i].buffer_type = col->type;
  183. bindinfo[i].buffer_length = col->length;
  184. }
  185. bindinfo[i].buffer = rtlMalloc(bindinfo[i].buffer_length);
  186. }
  187. }
  188. ~MySQLBindingArray()
  189. {
  190. for (int i = 0; i < columns; i++)
  191. {
  192. rtlFree(bindinfo[i].buffer);
  193. }
  194. delete [] bindinfo;
  195. delete [] is_null;
  196. delete [] error;
  197. delete [] lengths;
  198. }
  199. inline int numColumns() const
  200. {
  201. return columns;
  202. }
  203. inline MYSQL_BIND &queryColumn(int colIdx) const
  204. {
  205. if (colIdx >= columns)
  206. fail("Column index out of range");
  207. return bindinfo[colIdx];
  208. }
  209. inline MYSQL_BIND *queryBindings() const
  210. {
  211. return bindinfo;
  212. }
  213. private:
  214. MYSQL_BIND *bindinfo;
  215. my_bool *is_null;
  216. my_bool *error;
  217. unsigned long *lengths;
  218. int columns;
  219. };
  220. class MySQLPreparedStatement : public CInterface
  221. {
  222. public:
  223. IMPLEMENT_IINTERFACE;
  224. MySQLPreparedStatement(MySQLConnection *_conn, MySQLStatement *_stmt)
  225. : conn(_conn), stmt(_stmt)
  226. {
  227. // Create bindings for input parameters
  228. inputBindings.init(mysql_stmt_param_count(*stmt));
  229. // And for results
  230. res.setown(new MySQLResult(mysql_stmt_result_metadata(*stmt)));
  231. if (*res)
  232. {
  233. resultBindings.bindResults(*res);
  234. /* Bind the result buffers */
  235. if (mysql_stmt_bind_result(*stmt, resultBindings.queryBindings()))
  236. fail(mysql_stmt_error(*stmt));
  237. }
  238. else if (mysql_stmt_errno(*stmt)) // SQL actions don't return results...
  239. fail(mysql_stmt_error(*stmt));
  240. }
  241. ~MySQLPreparedStatement()
  242. {
  243. stop();
  244. }
  245. inline void stop()
  246. {
  247. res.clear();
  248. stmt.clear();
  249. }
  250. bool next()
  251. {
  252. if (!stmt)
  253. return false;
  254. int rc = mysql_stmt_fetch(*stmt);
  255. if (rc == MYSQL_NO_DATA)
  256. return false;
  257. else if (rc)
  258. fail(mysql_stmt_error(*stmt));
  259. else
  260. return true;
  261. }
  262. void execute()
  263. {
  264. assertex(stmt && *stmt);
  265. if (inputBindings.numColumns() && mysql_stmt_bind_param(*stmt, inputBindings.queryBindings()))
  266. fail(mysql_stmt_error(*stmt));
  267. if (mysql_stmt_execute(*stmt))
  268. fail(mysql_stmt_error(*stmt));
  269. }
  270. inline const MySQLBindingArray &queryResultBindings() const
  271. {
  272. return resultBindings;
  273. }
  274. inline const MySQLBindingArray &queryInputBindings() const
  275. {
  276. return inputBindings;
  277. }
  278. inline bool hasResult() const
  279. {
  280. return *res != NULL;
  281. }
  282. protected:
  283. Linked<MySQLConnection> conn;
  284. Linked<MySQLStatement> stmt;
  285. Owned<MySQLResult> res;
  286. MySQLBindingArray inputBindings;
  287. MySQLBindingArray resultBindings;
  288. };
  289. // Conversions from MySQL values to ECL data
  290. static void typeError(const char *expected, const RtlFieldInfo *field) __attribute__((noreturn));
  291. static void typeError(const char *expected, const RtlFieldInfo *field)
  292. {
  293. VStringBuffer msg("mysql: type mismatch - %s expected", expected);
  294. if (field)
  295. msg.appendf(" for field %s", field->name->str());
  296. rtlFail(0, msg.str());
  297. }
  298. static bool isInteger(enum_field_types type)
  299. {
  300. switch (type)
  301. {
  302. case MYSQL_TYPE_TINY:
  303. case MYSQL_TYPE_SHORT:
  304. case MYSQL_TYPE_LONG:
  305. case MYSQL_TYPE_LONGLONG:
  306. case MYSQL_TYPE_INT24:
  307. return true;
  308. case MYSQL_TYPE_TIMESTAMP:
  309. case MYSQL_TYPE_DATE:
  310. case MYSQL_TYPE_TIME:
  311. return false; // Who knows...
  312. default:
  313. return false;
  314. }
  315. }
  316. static bool getBooleanResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  317. {
  318. if (*bound.is_null)
  319. {
  320. NullFieldProcessor p(field);
  321. return p.boolResult;
  322. }
  323. if (!isInteger(bound.buffer_type))
  324. typeError("boolean", field);
  325. return rtlReadUInt(bound.buffer, *bound.length) != 0;
  326. }
  327. static void getDataResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, void * &result)
  328. {
  329. if (*bound.is_null)
  330. {
  331. NullFieldProcessor p(field);
  332. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  333. return;
  334. }
  335. if (bound.buffer_type == MYSQL_TYPE_TINY_BLOB ||
  336. bound.buffer_type == MYSQL_TYPE_MEDIUM_BLOB ||
  337. bound.buffer_type == MYSQL_TYPE_LONG_BLOB ||
  338. bound.buffer_type == MYSQL_TYPE_BLOB ||
  339. bound.buffer_type == MYSQL_TYPE_STRING ||
  340. bound.buffer_type == MYSQL_TYPE_VAR_STRING
  341. )
  342. rtlStrToDataX(chars, result, *bound.length, bound.buffer); // This feels like it may not work to me - will preallocate rather larger than we want
  343. else
  344. typeError("blob", field);
  345. }
  346. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  347. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  348. static double getRealResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  349. {
  350. if (*bound.is_null)
  351. {
  352. NullFieldProcessor p(field);
  353. return p.doubleResult;
  354. }
  355. if (isInteger(bound.buffer_type))
  356. {
  357. if (bound.is_unsigned)
  358. return (double) getUnsignedResult(field, bound);
  359. else
  360. return (double) getSignedResult(field, bound);
  361. }
  362. else if (bound.buffer_type == MYSQL_TYPE_FLOAT)
  363. return * (float *) bound.buffer;
  364. else if (bound.buffer_type == MYSQL_TYPE_DOUBLE)
  365. return * (double *) bound.buffer;
  366. else
  367. typeError("double", field);
  368. }
  369. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  370. {
  371. if (*bound.is_null)
  372. {
  373. NullFieldProcessor p(field);
  374. return p.intResult;
  375. }
  376. if (isInteger(bound.buffer_type))
  377. {
  378. if (bound.is_unsigned)
  379. return (__int64) rtlReadUInt(bound.buffer, *bound.length);
  380. else
  381. return rtlReadInt(bound.buffer, *bound.length);
  382. }
  383. else
  384. typeError("integer", field);
  385. }
  386. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  387. {
  388. if (*bound.is_null)
  389. {
  390. NullFieldProcessor p(field);
  391. return p.uintResult;
  392. }
  393. if (!isInteger(bound.buffer_type))
  394. typeError("integer", field);
  395. if (bound.is_unsigned)
  396. return rtlReadUInt(bound.buffer, *bound.length);
  397. else
  398. return (unsigned __int64) rtlReadInt(bound.buffer, *bound.length);
  399. }
  400. static void getStringResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  401. {
  402. if (*bound.is_null)
  403. {
  404. NullFieldProcessor p(field);
  405. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  406. return;
  407. }
  408. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  409. typeError("string", field);
  410. const char *text = (const char *) bound.buffer;
  411. unsigned long bytes = *bound.length;
  412. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  413. rtlUtf8ToStrX(chars, result, numchars, text);
  414. }
  415. static void getUTF8Result(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  416. {
  417. if (*bound.is_null)
  418. {
  419. NullFieldProcessor p(field);
  420. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  421. return;
  422. }
  423. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  424. typeError("string", field);
  425. const char *text = (const char *) bound.buffer;
  426. unsigned long bytes = *bound.length;
  427. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  428. rtlUtf8ToUtf8X(chars, result, numchars, text);
  429. }
  430. static void getUnicodeResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, UChar * &result)
  431. {
  432. if (*bound.is_null)
  433. {
  434. NullFieldProcessor p(field);
  435. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  436. return;
  437. }
  438. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  439. typeError("string", field);
  440. const char *text = (const char *) bound.buffer;
  441. unsigned long bytes = *bound.length;
  442. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  443. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  444. }
  445. static void getDecimalResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, Decimal &value)
  446. {
  447. if (*bound.is_null)
  448. {
  449. NullFieldProcessor p(field);
  450. value.set(p.decimalResult);
  451. return;
  452. }
  453. size32_t chars;
  454. rtlDataAttr result;
  455. mysqlembed::getStringResult(field, bound, chars, result.refstr());
  456. value.setString(chars, result.getstr());
  457. if (field)
  458. {
  459. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  460. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  461. }
  462. }
  463. static void createBindBuffer(MYSQL_BIND & bindInfo, enum_field_types sqlType, unsigned size)
  464. {
  465. if (size)
  466. {
  467. if (!bindInfo.buffer)
  468. {
  469. bindInfo.buffer_type = sqlType;
  470. bindInfo.buffer = rtlMalloc(size);
  471. }
  472. else
  473. assertex(bindInfo.buffer_type == sqlType);
  474. }
  475. else
  476. {
  477. // Buffer is reallocated each time - caller is responsible for it.
  478. bindInfo.buffer_type = sqlType;
  479. rtlFree(bindInfo.buffer);
  480. bindInfo.buffer = NULL;
  481. }
  482. }
  483. // A MySQLRowBuilder object is used to construct an ECL row from a MySQL row
  484. class MySQLRowBuilder : public CInterfaceOf<IFieldSource>
  485. {
  486. public:
  487. MySQLRowBuilder(const MySQLBindingArray &_resultInfo)
  488. : resultInfo(_resultInfo), colIdx(-1)
  489. {
  490. }
  491. virtual bool getBooleanResult(const RtlFieldInfo *field)
  492. {
  493. return mysqlembed::getBooleanResult(field, nextField(field));
  494. }
  495. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  496. {
  497. mysqlembed::getDataResult(field, nextField(field), len, result);
  498. }
  499. virtual double getRealResult(const RtlFieldInfo *field)
  500. {
  501. return mysqlembed::getRealResult(field, nextField(field));
  502. }
  503. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  504. {
  505. return mysqlembed::getSignedResult(field, nextField(field));
  506. }
  507. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  508. {
  509. return mysqlembed::getUnsignedResult(field, nextField(field));
  510. }
  511. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  512. {
  513. mysqlembed::getStringResult(field, nextField(field), chars, result);
  514. }
  515. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  516. {
  517. mysqlembed::getUTF8Result(field, nextField(field), chars, result);
  518. }
  519. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  520. {
  521. mysqlembed::getUnicodeResult(field, nextField(field), chars, result);
  522. }
  523. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  524. {
  525. mysqlembed::getDecimalResult(field, nextField(field), value);
  526. }
  527. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  528. {
  529. UNSUPPORTED("SET fields");
  530. }
  531. virtual bool processNextSet(const RtlFieldInfo * field)
  532. {
  533. throwUnexpected();
  534. }
  535. virtual void processBeginDataset(const RtlFieldInfo * field)
  536. {
  537. UNSUPPORTED("Nested datasets");
  538. }
  539. virtual void processBeginRow(const RtlFieldInfo * field)
  540. {
  541. }
  542. virtual bool processNextRow(const RtlFieldInfo * field)
  543. {
  544. throwUnexpected();
  545. }
  546. virtual void processEndSet(const RtlFieldInfo * field)
  547. {
  548. throwUnexpected();
  549. }
  550. virtual void processEndDataset(const RtlFieldInfo * field)
  551. {
  552. throwUnexpected();
  553. }
  554. virtual void processEndRow(const RtlFieldInfo * field)
  555. {
  556. }
  557. protected:
  558. const MYSQL_BIND &nextField(const RtlFieldInfo * field)
  559. {
  560. if (colIdx < resultInfo.numColumns())
  561. colIdx++;
  562. else
  563. fail("Too many fields in ECL output row");
  564. const MYSQL_BIND &column = resultInfo.queryColumn(colIdx);
  565. if (*column.error)
  566. failx("Error fetching column %s", field->name->str());
  567. return column;
  568. }
  569. const MySQLBindingArray &resultInfo;
  570. int colIdx;
  571. };
  572. // Bind MySQL variables from an ECL record
  573. class MySQLRecordBinder : public CInterfaceOf<IFieldProcessor>
  574. {
  575. public:
  576. MySQLRecordBinder(const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  577. : typeInfo(_typeInfo), bindings(_bindings), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  578. {
  579. }
  580. int numFields()
  581. {
  582. int count = 0;
  583. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  584. assertex(fields);
  585. while (*fields++)
  586. count++;
  587. return count;
  588. }
  589. void processRow(const byte *row)
  590. {
  591. thisParam = firstParam;
  592. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  593. }
  594. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  595. {
  596. size32_t utf8chars;
  597. char *utf8;
  598. rtlStrToUtf8X(utf8chars, utf8, len, value);
  599. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  600. bindInfo.buffer = utf8;
  601. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  602. bindInfo.length = &bindInfo.buffer_length;
  603. }
  604. virtual void processBool(bool value, const RtlFieldInfo * field)
  605. {
  606. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_TINY, sizeof(value));
  607. * (bool *) bindInfo.buffer = value;
  608. bindInfo.is_unsigned = true;
  609. }
  610. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  611. {
  612. size32_t bytes;
  613. void *data;
  614. rtlStrToDataX(bytes, data, len, value);
  615. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_BLOB, 0);
  616. bindInfo.buffer = data;
  617. bindInfo.buffer_length = bytes;
  618. bindInfo.length = &bindInfo.buffer_length;
  619. }
  620. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  621. {
  622. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  623. * (__int64 *) bindInfo.buffer = value;
  624. bindInfo.is_unsigned = false;
  625. }
  626. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  627. {
  628. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  629. * (unsigned __int64 *) bindInfo.buffer = value;
  630. bindInfo.is_unsigned = true;
  631. }
  632. virtual void processReal(double value, const RtlFieldInfo * field)
  633. {
  634. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_DOUBLE, sizeof(value));
  635. * (double *) bindInfo.buffer = value;
  636. }
  637. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  638. {
  639. Decimal val;
  640. size32_t bytes;
  641. char *data;
  642. val.setDecimal(digits, precision, value);
  643. val.getStringX(bytes, data);
  644. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  645. bindInfo.buffer = data;
  646. bindInfo.buffer_length = bytes;
  647. bindInfo.length = &bindInfo.buffer_length;
  648. }
  649. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  650. {
  651. Decimal val;
  652. size32_t bytes;
  653. char *data;
  654. val.setUDecimal(digits, precision, value);
  655. val.getStringX(bytes, data);
  656. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  657. bindInfo.buffer = data;
  658. bindInfo.buffer_length = bytes;
  659. bindInfo.length = &bindInfo.buffer_length;
  660. }
  661. virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
  662. {
  663. size32_t utf8chars;
  664. char *utf8;
  665. rtlUnicodeToUtf8X(utf8chars, utf8, len, value);
  666. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  667. bindInfo.buffer = utf8;
  668. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  669. bindInfo.length = &bindInfo.buffer_length;
  670. }
  671. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  672. {
  673. size32_t charCount;
  674. rtlDataAttr text;
  675. rtlQStrToStrX(charCount, text.refstr(), len, value);
  676. processString(charCount, text.getstr(), field);
  677. }
  678. virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
  679. {
  680. size32_t utf8chars;
  681. char *utf8;
  682. rtlUtf8ToUtf8X(utf8chars, utf8, len, value);
  683. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  684. bindInfo.buffer = utf8;
  685. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  686. bindInfo.length = &bindInfo.buffer_length;
  687. }
  688. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  689. {
  690. UNSUPPORTED("SET fields");
  691. return false;
  692. }
  693. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  694. {
  695. UNSUPPORTED("Nested datasets");
  696. return false;
  697. }
  698. virtual bool processBeginRow(const RtlFieldInfo * field)
  699. {
  700. return true;
  701. }
  702. virtual void processEndSet(const RtlFieldInfo * field)
  703. {
  704. throwUnexpected();
  705. }
  706. virtual void processEndDataset(const RtlFieldInfo * field)
  707. {
  708. throwUnexpected();
  709. }
  710. virtual void processEndRow(const RtlFieldInfo * field)
  711. {
  712. }
  713. protected:
  714. MYSQL_BIND &createBindBuffer(enum_field_types sqlType, unsigned size)
  715. {
  716. MYSQL_BIND &bindInfo = bindings.queryColumn(thisParam++);
  717. mysqlembed::createBindBuffer(bindInfo, sqlType, size);
  718. return bindInfo;
  719. }
  720. const RtlTypeInfo *typeInfo;
  721. const MySQLBindingArray &bindings;
  722. int firstParam;
  723. RtlFieldStrInfo dummyField;
  724. int thisParam;
  725. };
  726. //
  727. class MySQLDatasetBinder : public MySQLRecordBinder
  728. {
  729. public:
  730. MySQLDatasetBinder(IRowStream * _input, const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  731. : input(_input), MySQLRecordBinder(_typeInfo, _bindings, _firstParam)
  732. {
  733. }
  734. bool bindNext()
  735. {
  736. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  737. if (!nextRow)
  738. return false;
  739. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  740. return true;
  741. }
  742. void executeAll(MySQLPreparedStatement *stmtInfo)
  743. {
  744. while (bindNext())
  745. {
  746. stmtInfo->execute();
  747. }
  748. }
  749. protected:
  750. Owned<IRowStream> input;
  751. };
  752. // A MySQL function that returns a dataset will return a MySQLRowStream object that can be
  753. // interrogated to return each row of the result in turn
  754. class MySQLRowStream : public CInterfaceOf<IRowStream>
  755. {
  756. public:
  757. MySQLRowStream(MySQLDatasetBinder *_inputStream, MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  758. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  759. {
  760. executePending = true;
  761. eof = false;
  762. }
  763. virtual const void *nextRow()
  764. {
  765. // A little complex when streaming data in as well as out - want to execute for every input record
  766. if (eof)
  767. return NULL;
  768. loop
  769. {
  770. if (executePending)
  771. {
  772. executePending = false;
  773. if (inputStream && !inputStream->bindNext())
  774. {
  775. noteEOF();
  776. return NULL;
  777. }
  778. stmtInfo->execute();
  779. }
  780. if (stmtInfo->next())
  781. break;
  782. if (inputStream)
  783. executePending = true;
  784. else
  785. {
  786. noteEOF();
  787. return NULL;
  788. }
  789. }
  790. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  791. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  792. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  793. assertex(typeInfo);
  794. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  795. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  796. return rowBuilder.finalizeRowClear(len);
  797. }
  798. virtual void stop()
  799. {
  800. resultAllocator.clear();
  801. stmtInfo->stop();
  802. }
  803. protected:
  804. void noteEOF()
  805. {
  806. if (!eof)
  807. {
  808. eof = true;
  809. stop();
  810. }
  811. }
  812. Linked<MySQLDatasetBinder> inputStream;
  813. Linked<MySQLPreparedStatement> stmtInfo;
  814. Linked<IEngineRowAllocator> resultAllocator;
  815. bool executePending;
  816. bool eof;
  817. };
  818. // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
  819. class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  820. {
  821. public:
  822. MySQLEmbedFunctionContext(const char *options)
  823. : nextParam(0)
  824. {
  825. const char *server = "localhost";
  826. const char *user = "";
  827. const char *password = "";
  828. const char *database = "";
  829. unsigned port = 0;
  830. StringArray opts;
  831. opts.appendList(options, ",");
  832. ForEachItemIn(idx, opts)
  833. {
  834. const char *opt = opts.item(idx);
  835. const char *val = strchr(opt, '=');
  836. if (val)
  837. {
  838. StringBuffer optName(val-opt, opt);
  839. val++;
  840. if (stricmp(optName, "server")==0)
  841. server = val; // Note that lifetime of val is adequate for this to be safe
  842. else if (stricmp(optName, "port")==0)
  843. port = atoi(val);
  844. else if (stricmp(optName, "user")==0)
  845. user = val;
  846. else if (stricmp(optName, "password")==0)
  847. password = val;
  848. else if (stricmp(optName, "database")==0)
  849. database = val;
  850. }
  851. }
  852. conn.setown(new MySQLConnection(mysql_init(NULL)));
  853. if (!mysql_real_connect(*conn, server, user, password, database, port, NULL, 0))
  854. {
  855. VStringBuffer err("mysql: failed to connect (%s)", mysql_error(*conn));
  856. rtlFail(0, err.str());
  857. }
  858. }
  859. virtual bool getBooleanResult()
  860. {
  861. bool ret = mysqlembed::getBooleanResult(NULL, getScalarResult());
  862. checkSingleRow();
  863. return ret;
  864. }
  865. virtual void getDataResult(size32_t &len, void * &result)
  866. {
  867. mysqlembed::getDataResult(NULL, getScalarResult(), len, result);
  868. checkSingleRow();
  869. }
  870. virtual double getRealResult()
  871. {
  872. double ret = mysqlembed::getRealResult(NULL, getScalarResult());
  873. checkSingleRow();
  874. return ret;
  875. }
  876. virtual __int64 getSignedResult()
  877. {
  878. __int64 ret = mysqlembed::getSignedResult(NULL, getScalarResult());
  879. checkSingleRow();
  880. return ret;
  881. }
  882. virtual unsigned __int64 getUnsignedResult()
  883. {
  884. unsigned __int64 ret = mysqlembed::getUnsignedResult(NULL, getScalarResult());
  885. checkSingleRow();
  886. return ret;
  887. }
  888. virtual void getStringResult(size32_t &chars, char * &result)
  889. {
  890. mysqlembed::getStringResult(NULL, getScalarResult(), chars, result);
  891. checkSingleRow();
  892. }
  893. virtual void getUTF8Result(size32_t &chars, char * &result)
  894. {
  895. mysqlembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  896. checkSingleRow();
  897. }
  898. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  899. {
  900. mysqlembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  901. checkSingleRow();
  902. }
  903. virtual void getDecimalResult(Decimal &value)
  904. {
  905. mysqlembed::getDecimalResult(NULL, getScalarResult(), value);
  906. checkSingleRow();
  907. }
  908. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  909. {
  910. UNSUPPORTED("SET results");
  911. }
  912. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  913. {
  914. return new MySQLRowStream(inputStream, stmtInfo, _resultAllocator);
  915. }
  916. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  917. {
  918. if (!stmtInfo->hasResult())
  919. typeError("row", NULL);
  920. lazyExecute();
  921. MySQLRowStream stream(NULL, stmtInfo, _resultAllocator);
  922. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  923. roxiemem::OwnedConstRoxieRow ret2 = stream.nextRow();
  924. stream.stop();
  925. if (ret == NULL || ret2 != NULL) // Check for exactly one returned row
  926. typeError("row", NULL);
  927. return (byte *) ret.getClear();
  928. }
  929. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  930. {
  931. if (!stmtInfo->next())
  932. typeError("row", NULL);
  933. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  934. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  935. assertex(typeInfo);
  936. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  937. size32_t ret = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  938. if (stmtInfo->next())
  939. typeError("row", NULL); // Check that a single row was returned
  940. return ret;
  941. }
  942. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  943. {
  944. MySQLRecordBinder binder(metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam);
  945. binder.processRow(val);
  946. nextParam += binder.numFields();
  947. }
  948. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  949. {
  950. // We only support a single dataset parameter...
  951. if (inputStream)
  952. {
  953. fail("At most one dataset parameter supported");
  954. }
  955. inputStream.setown(new MySQLDatasetBinder(LINK(val), metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam));
  956. nextParam += inputStream->numFields();
  957. }
  958. virtual void bindBooleanParam(const char *name, bool val)
  959. {
  960. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_TINY, sizeof(val));
  961. * (bool *) bindInfo.buffer = val;
  962. bindInfo.is_unsigned = true;
  963. }
  964. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  965. {
  966. size32_t bytes;
  967. void *data;
  968. rtlStrToDataX(bytes, data, len, val);
  969. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_BLOB, 0);
  970. bindInfo.buffer = data;
  971. bindInfo.buffer_length = bytes;
  972. bindInfo.length = &bindInfo.buffer_length;
  973. }
  974. virtual void bindFloatParam(const char *name, float val)
  975. {
  976. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_FLOAT, sizeof(val));
  977. * (float *) bindInfo.buffer = val;
  978. }
  979. virtual void bindRealParam(const char *name, double val)
  980. {
  981. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_DOUBLE, sizeof(val));
  982. * (double *) bindInfo.buffer = val;
  983. }
  984. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  985. {
  986. bindSignedParam(name, val);
  987. }
  988. virtual void bindSignedParam(const char *name, __int64 val)
  989. {
  990. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  991. * (__int64 *) bindInfo.buffer = val;
  992. bindInfo.is_unsigned = false;
  993. }
  994. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  995. {
  996. bindUnsignedParam(name, val);
  997. }
  998. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  999. {
  1000. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1001. * (unsigned __int64 *) bindInfo.buffer = val;
  1002. bindInfo.is_unsigned = true;
  1003. }
  1004. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1005. {
  1006. size32_t utf8chars;
  1007. char *utf8;
  1008. rtlStrToUtf8X(utf8chars, utf8, len, val);
  1009. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1010. bindInfo.buffer = utf8;
  1011. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1012. bindInfo.length = &bindInfo.buffer_length;
  1013. }
  1014. virtual void bindVStringParam(const char *name, const char *val)
  1015. {
  1016. bindStringParam(name, strlen(val), val);
  1017. }
  1018. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1019. {
  1020. size32_t utf8chars;
  1021. char *utf8;
  1022. rtlUtf8ToUtf8X(utf8chars, utf8, chars, val);
  1023. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1024. bindInfo.buffer = utf8;
  1025. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1026. bindInfo.length = &bindInfo.buffer_length;
  1027. }
  1028. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1029. {
  1030. size32_t utf8chars;
  1031. char *utf8;
  1032. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  1033. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1034. bindInfo.buffer = utf8;
  1035. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1036. bindInfo.length = &bindInfo.buffer_length;
  1037. }
  1038. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  1039. {
  1040. UNSUPPORTED("SET parameters"); // MySQL does support sets, so MIGHT be possible...
  1041. }
  1042. virtual void importFunction(size32_t lenChars, const char *text)
  1043. {
  1044. throwUnexpected();
  1045. }
  1046. virtual void compileEmbeddedScript(size32_t chars, const char *script)
  1047. {
  1048. size32_t len = rtlUtf8Size(chars, script);
  1049. Owned<MySQLStatement> stmt = new MySQLStatement(mysql_stmt_init(*conn));
  1050. if (!*stmt)
  1051. fail("failed to create statement");
  1052. if (mysql_stmt_prepare(*stmt, script, len))
  1053. fail(mysql_stmt_error(*stmt));
  1054. stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
  1055. }
  1056. virtual void callFunction()
  1057. {
  1058. if (nextParam != stmtInfo->queryInputBindings().numColumns())
  1059. fail("Not enough parameters");
  1060. if (!stmtInfo->hasResult())
  1061. lazyExecute();
  1062. }
  1063. protected:
  1064. void lazyExecute()
  1065. {
  1066. if (inputStream)
  1067. inputStream->executeAll(stmtInfo);
  1068. else
  1069. stmtInfo->execute();
  1070. }
  1071. const MYSQL_BIND &getScalarResult()
  1072. {
  1073. if (!stmtInfo->hasResult() || stmtInfo->queryResultBindings().numColumns() != 1)
  1074. typeError("scalar", NULL);
  1075. lazyExecute(); // MORE this seems wrong to me - or at least needs to check not already executed
  1076. if (!stmtInfo->next())
  1077. typeError("scalar", NULL);
  1078. return stmtInfo->queryResultBindings().queryColumn(0);
  1079. }
  1080. void checkSingleRow()
  1081. {
  1082. if (stmtInfo->next())
  1083. typeError("scalar", NULL);
  1084. }
  1085. inline MYSQL_BIND &findParameter(const char *name, enum_field_types sqlType, unsigned size)
  1086. {
  1087. // Everything is positional in MySQL
  1088. MYSQL_BIND &bindInfo = stmtInfo->queryInputBindings().queryColumn(nextParam++);
  1089. createBindBuffer(bindInfo, sqlType, size);
  1090. return bindInfo;
  1091. }
  1092. Owned<MySQLConnection> conn;
  1093. Owned<MySQLPreparedStatement> stmtInfo;
  1094. Owned<MySQLDatasetBinder> inputStream;
  1095. int nextParam;
  1096. };
  1097. class MySQLEmbedContext : public CInterfaceOf<IEmbedContext>
  1098. {
  1099. public:
  1100. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
  1101. {
  1102. if (flags & EFimport)
  1103. UNSUPPORTED("IMPORT");
  1104. else
  1105. return new MySQLEmbedFunctionContext(options);
  1106. }
  1107. };
  1108. extern IEmbedContext* getEmbedContext()
  1109. {
  1110. return new MySQLEmbedContext();
  1111. }
  1112. extern bool syntaxCheck(const char *script)
  1113. {
  1114. return true; // MORE
  1115. }
  1116. } // namespace