mysqlembed.cpp 56 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "mysql.h"
  15. #include "mysqld_error.h"
  16. #include "jexcept.hpp"
  17. #include "jthread.hpp"
  18. #include "hqlplugins.hpp"
  19. #include "deftype.hpp"
  20. #include "eclhelper.hpp"
  21. #include "eclrtl.hpp"
  22. #include "eclrtl_imp.hpp"
  23. #include "rtlds_imp.hpp"
  24. #include "rtlfield.hpp"
  25. #include "rtlembed.hpp"
  26. #include "roxiemem.hpp"
  27. #include "nbcd.hpp"
  28. #if (MYSQL_VERSION_ID >= 80000)
  29. typedef bool my_bool;
  30. #endif
  31. __declspec(noreturn) static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  32. static unsigned mysqlCacheCheckPeriod = 10000;
  33. static unsigned mysqlCacheTimeoutPeriod = 60000;
  34. static unsigned mysqlConnectionCacheSize = 10;
  35. static void UNSUPPORTED(const char *feature)
  36. {
  37. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in mysql plugin", feature);
  38. }
  39. static const char * compatibleVersions[] = {
  40. "MySQL Embed Helper 1.0.0",
  41. NULL };
  42. static const char *version = "MySQL Embed Helper 1.0.0";
  43. extern "C" DECL_EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  44. {
  45. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  46. {
  47. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  48. pbx->compatibleVersions = compatibleVersions;
  49. }
  50. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  51. return false;
  52. pb->magicVersion = PLUGIN_VERSION;
  53. pb->version = version;
  54. pb->moduleName = "mysql";
  55. pb->ECL = NULL;
  56. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  57. pb->description = "MySQL Embed Helper";
  58. return true;
  59. }
  60. extern "C" DECL_EXPORT void setPluginContextEx(IPluginContextEx * _ctx)
  61. {
  62. mysqlCacheCheckPeriod = _ctx->ctxGetPropInt("@mysqlCacheCheckPeriod", 10000);
  63. mysqlCacheTimeoutPeriod = _ctx->ctxGetPropInt("@mysqlCacheTimeoutPeriod", 60000);
  64. mysqlConnectionCacheSize = _ctx->ctxGetPropInt("@mysqlConnectionCacheSize", 10);
  65. }
  66. namespace mysqlembed {
  67. __declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
  68. __declspec(noreturn) static void fail(const char *msg) __attribute__((noreturn));
  69. static void failx(const char *message, ...)
  70. {
  71. va_list args;
  72. va_start(args,message);
  73. StringBuffer msg;
  74. msg.append("mysql: ").valist_appendf(message,args);
  75. va_end(args);
  76. rtlFail(0, msg.str());
  77. }
  78. static void fail(const char *message)
  79. {
  80. StringBuffer msg;
  81. msg.append("mysql: ").append(message);
  82. rtlFail(0, msg.str());
  83. }
  84. // Wrappers to MySQL structures that require corresponding releases
  85. class MySQLConnection;
  86. static __thread MySQLConnection *threadCachedConnection = nullptr;
  87. enum MySQLOptionParamType
  88. {
  89. ParamTypeNone,
  90. ParamTypeString,
  91. ParamTypeUInt,
  92. ParamTypeULong,
  93. ParamTypeBool
  94. };
  95. struct MySQLOptionDefinition
  96. {
  97. const char *name;
  98. enum mysql_option option;
  99. MySQLOptionParamType paramType;
  100. };
  101. #define addoption(a,b) { #a, a, b }
  102. MySQLOptionDefinition options[] =
  103. {
  104. addoption(MYSQL_OPT_COMPRESS, ParamTypeNone),
  105. addoption(MYSQL_OPT_CONNECT_TIMEOUT, ParamTypeUInt),
  106. #if (MYSQL_VERSION_ID < 80000)
  107. addoption(MYSQL_OPT_GUESS_CONNECTION, ParamTypeNone),
  108. addoption(MYSQL_OPT_SSL_VERIFY_SERVER_CERT, ParamTypeBool),
  109. addoption(MYSQL_OPT_USE_EMBEDDED_CONNECTION, ParamTypeNone),
  110. addoption(MYSQL_OPT_USE_REMOTE_CONNECTION, ParamTypeNone),
  111. addoption(MYSQL_SECURE_AUTH, ParamTypeBool),
  112. addoption(MYSQL_SET_CLIENT_IP, ParamTypeString),
  113. #endif
  114. addoption(MYSQL_OPT_LOCAL_INFILE, ParamTypeUInt),
  115. addoption(MYSQL_OPT_NAMED_PIPE, ParamTypeNone),
  116. addoption(MYSQL_OPT_PROTOCOL, ParamTypeUInt),
  117. addoption(MYSQL_OPT_READ_TIMEOUT, ParamTypeUInt),
  118. addoption(MYSQL_OPT_RECONNECT, ParamTypeBool),
  119. addoption(MYSQL_OPT_USE_RESULT, ParamTypeNone),
  120. addoption(MYSQL_OPT_WRITE_TIMEOUT, ParamTypeUInt),
  121. addoption(MYSQL_READ_DEFAULT_FILE, ParamTypeString),
  122. addoption(MYSQL_READ_DEFAULT_GROUP, ParamTypeString),
  123. addoption(MYSQL_REPORT_DATA_TRUNCATION, ParamTypeBool),
  124. addoption(MYSQL_SET_CHARSET_DIR, ParamTypeString),
  125. addoption(MYSQL_SET_CHARSET_NAME, ParamTypeString),
  126. addoption(MYSQL_SHARED_MEMORY_BASE_NAME, ParamTypeString),
  127. #if MYSQL_VERSION_ID >= 50507
  128. addoption(MYSQL_DEFAULT_AUTH, ParamTypeString),
  129. addoption(MYSQL_PLUGIN_DIR, ParamTypeString),
  130. #endif
  131. #if (MYSQL_VERSION_ID >= 50601)
  132. addoption(MYSQL_OPT_BIND, ParamTypeString),
  133. #endif
  134. #if (MYSQL_VERSION_ID >= 50603)
  135. addoption(MYSQL_OPT_SSL_CA, ParamTypeString),
  136. addoption(MYSQL_OPT_SSL_CAPATH, ParamTypeString),
  137. addoption(MYSQL_OPT_SSL_CERT, ParamTypeString),
  138. addoption(MYSQL_OPT_SSL_CIPHER, ParamTypeString),
  139. addoption(MYSQL_OPT_SSL_CRL, ParamTypeString),
  140. addoption(MYSQL_OPT_SSL_CRLPATH, ParamTypeString),
  141. addoption(MYSQL_OPT_SSL_KEY, ParamTypeString),
  142. #endif
  143. #if (MYSQL_VERSION_ID >= 50606)
  144. addoption(MYSQL_SERVER_PUBLIC_KEY, ParamTypeString),
  145. #endif
  146. #if (MYSQL_VERSION_ID >= 50527 && MYSQL_VERSION_ID < 50600) || MYSQL_VERSION_ID >= 50607
  147. addoption(MYSQL_ENABLE_CLEARTEXT_PLUGIN, ParamTypeBool),
  148. #endif
  149. addoption(MYSQL_INIT_COMMAND, ParamTypeString),
  150. #if (MYSQL_VERSION_ID >= 50610)
  151. addoption(MYSQL_OPT_CAN_HANDLE_EXPIRED_PASSWORDS, ParamTypeBool),
  152. #endif
  153. #if (MYSQL_VERSION_ID >= 50703 && MYSQL_VERSION_ID < 80000)
  154. addoption(MYSQL_OPT_SSL_ENFORCE, ParamTypeBool),
  155. #endif
  156. #if (MYSQL_VERSION_ID >= 50709)
  157. addoption(MYSQL_OPT_MAX_ALLOWED_PACKET, ParamTypeULong),
  158. addoption(MYSQL_OPT_NET_BUFFER_LENGTH, ParamTypeULong),
  159. #endif
  160. #if (MYSQL_VERSION_ID >= 50710)
  161. addoption(MYSQL_OPT_TLS_VERSION, ParamTypeString),
  162. #endif
  163. #if (MYSQL_VERSION_ID >= 50711)
  164. addoption(MYSQL_OPT_SSL_MODE, ParamTypeUInt),
  165. #endif
  166. { nullptr, (enum mysql_option) 0, ParamTypeNone }
  167. };
  168. static MySQLOptionDefinition &lookupOption(const char *optName)
  169. {
  170. for (MySQLOptionDefinition *optDef = options; optDef->name != nullptr; optDef++)
  171. {
  172. if (stricmp(optName, optDef->name)==0)
  173. return *optDef;
  174. }
  175. failx("Unknown option %s", optName);
  176. }
  177. class MySQLConnectionCloserThread : public Thread
  178. {
  179. virtual int run() override;
  180. public:
  181. static Semaphore closing;
  182. } *connectionCloserThread = nullptr;
  183. class MySQLConnection : public CInterface
  184. {
  185. public:
  186. MySQLConnection(MYSQL *_conn, const char *_cacheOptions, bool _threadCached, bool _globalCached) : conn(_conn), threadCached(_threadCached), globalCached(_globalCached)
  187. {
  188. if (_cacheOptions && (threadCached || globalCached))
  189. cacheOptions = strdup(_cacheOptions);
  190. else
  191. cacheOptions = nullptr;
  192. created = msTick();
  193. }
  194. ~MySQLConnection()
  195. {
  196. if (conn)
  197. {
  198. if (threadCached || globalCached)
  199. {
  200. Owned<MySQLConnection> cacheEntry = new MySQLConnection(*this); // Note - takes ownership of this->cacheOptions and this->conn
  201. cacheOptions = NULL;
  202. conn = NULL;
  203. if (threadCached)
  204. setThreadCache(cacheEntry.getLink());
  205. else // globalCached
  206. {
  207. CriticalBlock b(globalCacheCrit);
  208. if (globalCachedConnections.length()==mysqlConnectionCacheSize)
  209. {
  210. MySQLConnection &goer = globalCachedConnections.popGet();
  211. goer.globalCached = false; // Make sure we don't recache it!
  212. goer.Release();
  213. }
  214. globalCachedConnections.add(*cacheEntry.getClear(), 0);
  215. if (!connectionCloserThread)
  216. {
  217. connectionCloserThread = new MySQLConnectionCloserThread;
  218. connectionCloserThread->start();
  219. }
  220. }
  221. }
  222. else
  223. {
  224. mysql_close(conn);
  225. free((char *) cacheOptions);
  226. }
  227. }
  228. }
  229. inline operator MYSQL *() const
  230. {
  231. return conn;
  232. }
  233. inline bool matches (const char *_options)
  234. {
  235. return _options && cacheOptions && streq(_options, cacheOptions);
  236. }
  237. static MySQLConnection *findCachedConnection(const char *options, bool bypassCache)
  238. {
  239. const char *server = "localhost";
  240. const char *user = "";
  241. const char *password = "";
  242. const char *database = "";
  243. bool hasMySQLOpt = false;
  244. bool threadCache = false;
  245. bool globalCache = true;
  246. unsigned port = 0;
  247. StringArray opts;
  248. opts.appendList(options, ",");
  249. ForEachItemIn(idx, opts)
  250. {
  251. const char *opt = opts.item(idx);
  252. const char *val = strchr(opt, '=');
  253. if (val)
  254. {
  255. StringBuffer optName(val-opt, opt);
  256. val++;
  257. if (stricmp(optName, "server")==0)
  258. server = val; // Note that lifetime of val is adequate for this to be safe
  259. else if (stricmp(optName, "port")==0)
  260. port = atoi(val);
  261. else if (stricmp(optName, "user")==0)
  262. user = val;
  263. else if (stricmp(optName, "password")==0)
  264. password = val;
  265. else if (stricmp(optName, "database")==0)
  266. database = val;
  267. else if (stricmp(optName, "cache")==0)
  268. {
  269. if (clipStrToBool(val) || strieq(val, "thread"))
  270. {
  271. threadCache = true;
  272. globalCache = false;
  273. }
  274. else if (strieq(val, "global"))
  275. globalCache = true;
  276. else if (strieq(val, "none") || strieq(val, "false") || strieq(val, "off") || strieq(val, "0"))
  277. globalCache = false;
  278. else
  279. failx("Unknown cache option %s", val);
  280. }
  281. else if (strnicmp(optName, "MYSQL_", 6)==0)
  282. hasMySQLOpt = true;
  283. else
  284. failx("Unknown option %s", optName.str());
  285. }
  286. }
  287. if (!bypassCache)
  288. {
  289. if (threadCache)
  290. {
  291. if (threadCachedConnection && threadCachedConnection->matches(options))
  292. {
  293. MySQLConnection *ret = threadCachedConnection;
  294. threadCachedConnection = nullptr;
  295. return ret;
  296. }
  297. }
  298. else if (globalCache)
  299. {
  300. CriticalBlock b(globalCacheCrit);
  301. ForEachItemIn(idx, globalCachedConnections)
  302. {
  303. MySQLConnection &cached = globalCachedConnections.item(idx);
  304. if (cached.matches(options))
  305. {
  306. globalCachedConnections.remove(idx, true);
  307. return &cached;
  308. }
  309. }
  310. }
  311. }
  312. MySQLConnection::clearThreadCache();
  313. Owned<MySQLConnection> newConn = new MySQLConnection(mysql_init(NULL), options, threadCache, globalCache);
  314. if (hasMySQLOpt)
  315. {
  316. ForEachItemIn(idx, opts)
  317. {
  318. const char *opt = opts.item(idx);
  319. if (strnicmp(opt, "MYSQL_", 6)==0)
  320. {
  321. const char *val = strchr(opt, '=');
  322. StringBuffer optName(opt);
  323. if (val)
  324. {
  325. optName.setLength(val-opt);
  326. val++;
  327. }
  328. MySQLOptionDefinition &optDef = lookupOption(optName);
  329. int rc;
  330. if (optDef.paramType == ParamTypeNone)
  331. {
  332. if (val)
  333. failx("Option %s does not take a value", optName.str());
  334. rc = mysql_options(*newConn, optDef.option, nullptr);
  335. }
  336. else
  337. {
  338. if (!val)
  339. failx("Option %s requires a value", optName.str());
  340. switch (optDef.paramType)
  341. {
  342. case ParamTypeString:
  343. rc = mysql_options(*newConn, optDef.option, val);
  344. break;
  345. case ParamTypeUInt:
  346. {
  347. unsigned int oval = strtoul(val, nullptr, 10);
  348. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  349. break;
  350. }
  351. case ParamTypeULong:
  352. {
  353. unsigned long oval = strtoul(val, nullptr, 10);
  354. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  355. break;
  356. }
  357. case ParamTypeBool:
  358. {
  359. my_bool oval = clipStrToBool(val);
  360. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  361. break;
  362. }
  363. }
  364. }
  365. if (rc)
  366. failx("Failed to set option %s (%s)", optName.str(), mysql_error(*newConn));
  367. }
  368. }
  369. }
  370. if (!mysql_real_connect(*newConn, server, user, password, database, port, NULL, 0))
  371. failx("Failed to connect (%s)", mysql_error(*newConn));
  372. return newConn.getClear();
  373. }
  374. static void clearThreadCache()
  375. {
  376. ::Release(threadCachedConnection);
  377. threadCachedConnection = NULL;
  378. }
  379. static void setThreadCache(MySQLConnection *connection)
  380. {
  381. clearThreadCache();
  382. threadCachedConnection = connection;
  383. }
  384. bool wasCached() const
  385. {
  386. return reusing;
  387. }
  388. MySQLConnection *reopen()
  389. {
  390. threadCached = false;
  391. globalCached = false;
  392. return findCachedConnection(cacheOptions, true);
  393. }
  394. static void retireCache(unsigned maxAge)
  395. {
  396. CriticalBlock b(globalCacheCrit);
  397. unsigned now = msTick();
  398. ForEachItemInRev(idx, globalCachedConnections)
  399. {
  400. MySQLConnection &cached = globalCachedConnections.item(idx);
  401. if (!maxAge || (now - cached.created > maxAge))
  402. {
  403. cached.globalCached = false; // Make sure we don't re-add it!
  404. globalCachedConnections.remove(idx);
  405. }
  406. }
  407. }
  408. private:
  409. MySQLConnection(const MySQLConnection &from)
  410. {
  411. conn = from.conn; // Taking over ownership
  412. cacheOptions = from.cacheOptions; // Taking over ownership
  413. threadCached = from.threadCached;
  414. globalCached = from.globalCached;
  415. reusing = true;
  416. created = msTick();
  417. }
  418. static CIArrayOf<MySQLConnection> globalCachedConnections;
  419. static CriticalSection globalCacheCrit;
  420. MYSQL *conn;
  421. const char *cacheOptions; // Not done as a StringAttr, in order to avoid reallocation when recaching after use (see copy constructor above)
  422. unsigned created;
  423. bool threadCached;
  424. bool globalCached;
  425. bool reusing = false;
  426. };
  427. CIArrayOf<MySQLConnection> MySQLConnection::globalCachedConnections;
  428. CriticalSection MySQLConnection::globalCacheCrit;
  429. Semaphore MySQLConnectionCloserThread::closing;
  430. int MySQLConnectionCloserThread::run()
  431. {
  432. for (;;)
  433. {
  434. if (closing.wait(mysqlCacheCheckPeriod))
  435. {
  436. break;
  437. }
  438. MySQLConnection::retireCache(mysqlCacheTimeoutPeriod);
  439. }
  440. return 0;
  441. }
  442. MODULE_INIT(INIT_PRIORITY_STANDARD)
  443. {
  444. return true;
  445. }
  446. MODULE_EXIT()
  447. {
  448. if (connectionCloserThread)
  449. {
  450. MySQLConnectionCloserThread::closing.signal();
  451. connectionCloserThread->join();
  452. connectionCloserThread->Release();
  453. }
  454. MySQLConnection::retireCache(0);
  455. }
  456. class MySQLResult : public CInterface
  457. {
  458. public:
  459. MySQLResult(MYSQL_RES *_res) : res(_res)
  460. {
  461. }
  462. ~MySQLResult()
  463. {
  464. if (res)
  465. mysql_free_result(res);
  466. }
  467. inline operator MYSQL_RES *() const
  468. {
  469. return res;
  470. }
  471. private:
  472. MySQLResult(const MySQLResult &);
  473. MYSQL_RES *res;
  474. };
  475. class MySQLStatement : public CInterface
  476. {
  477. public:
  478. MySQLStatement(MYSQL_STMT *_stmt) : stmt(_stmt)
  479. {
  480. }
  481. ~MySQLStatement()
  482. {
  483. if (stmt)
  484. mysql_stmt_close(stmt);
  485. }
  486. inline operator MYSQL_STMT *() const
  487. {
  488. return stmt;
  489. }
  490. private:
  491. MySQLStatement(const MySQLStatement &);
  492. MYSQL_STMT *stmt;
  493. };
  494. class MySQLBindingArray
  495. {
  496. public:
  497. MySQLBindingArray()
  498. {
  499. columns = 0;
  500. bindinfo = NULL;
  501. is_null = NULL;;
  502. error = NULL;
  503. lengths = NULL;
  504. }
  505. void init(unsigned count)
  506. {
  507. columns = count;
  508. if (columns)
  509. {
  510. bindinfo = new MYSQL_BIND [columns];
  511. is_null = new my_bool [columns];
  512. error = new my_bool [columns];
  513. lengths = new unsigned long [columns];
  514. memset(bindinfo, 0, columns * sizeof(bindinfo[0]));
  515. memset(is_null, 0, columns * sizeof(is_null[0]));
  516. memset(error, 0, columns * sizeof(error[0]));
  517. memset(lengths, 0, columns * sizeof(lengths[0]));
  518. for (int i = 0; i < columns; i++)
  519. {
  520. bindinfo[i].is_null = &is_null[i];
  521. bindinfo[i].length = &lengths[i];
  522. bindinfo[i].error = &error[i];
  523. }
  524. }
  525. }
  526. void bindResults(MYSQL_RES *res)
  527. {
  528. init(mysql_num_fields(res));
  529. for (int i = 0; i < columns; i++)
  530. {
  531. MYSQL_FIELD *col = mysql_fetch_field_direct(res, i);
  532. switch (col->type)
  533. {
  534. case MYSQL_TYPE_DECIMAL:
  535. case MYSQL_TYPE_NEWDECIMAL:
  536. bindinfo[i].buffer_type = MYSQL_TYPE_STRING;
  537. bindinfo[i].buffer_length = 100; // MORE - is there a better guess?
  538. break;
  539. case MYSQL_TYPE_TIMESTAMP:
  540. case MYSQL_TYPE_DATETIME:
  541. case MYSQL_TYPE_TIME:
  542. case MYSQL_TYPE_DATE:
  543. bindinfo[i].buffer_type = col->type;
  544. bindinfo[i].buffer_length = sizeof(MYSQL_TIME);
  545. break;
  546. default:
  547. bindinfo[i].buffer_type = col->type;
  548. bindinfo[i].buffer_length = col->length;
  549. break;
  550. }
  551. bindinfo[i].buffer = rtlMalloc(bindinfo[i].buffer_length);
  552. }
  553. }
  554. ~MySQLBindingArray()
  555. {
  556. for (int i = 0; i < columns; i++)
  557. {
  558. rtlFree(bindinfo[i].buffer);
  559. }
  560. delete [] bindinfo;
  561. delete [] is_null;
  562. delete [] error;
  563. delete [] lengths;
  564. }
  565. inline int numColumns() const
  566. {
  567. return columns;
  568. }
  569. inline MYSQL_BIND &queryColumn(int colIdx, const char *name) const
  570. {
  571. if (colIdx >= columns)
  572. {
  573. VStringBuffer error("No matching bound column for parameter %d", colIdx);
  574. if (name)
  575. error.appendf(" (%s)", name);
  576. fail(error);
  577. }
  578. return bindinfo[colIdx];
  579. }
  580. inline MYSQL_BIND *queryBindings() const
  581. {
  582. return bindinfo;
  583. }
  584. private:
  585. MYSQL_BIND *bindinfo;
  586. my_bool *is_null;
  587. my_bool *error;
  588. unsigned long *lengths;
  589. int columns;
  590. };
  591. class MySQLPreparedStatement : public CInterface
  592. {
  593. public:
  594. MySQLPreparedStatement(MySQLConnection *_conn, MySQLStatement *_stmt)
  595. : conn(_conn), stmt(_stmt)
  596. {
  597. // Create bindings for input parameters
  598. inputBindings.init(mysql_stmt_param_count(*stmt));
  599. // Bindings for results are created after the execute, as they are not always available until then (e.g. when calling a stored procedure)
  600. if (mysql_stmt_errno(*stmt))
  601. fail(mysql_stmt_error(*stmt));
  602. }
  603. MySQLPreparedStatement(MySQLConnection *_conn, const char *_query, unsigned _len)
  604. : conn(_conn)
  605. {
  606. // Used for cases with no parameters or result, that are not supported in prepared query protocol - e.g. DROP PROCEDURE
  607. query.set(_query, _len);
  608. inputBindings.init(0);
  609. }
  610. ~MySQLPreparedStatement()
  611. {
  612. stop();
  613. }
  614. inline void stop()
  615. {
  616. res.clear();
  617. stmt.clear();
  618. }
  619. bool next()
  620. {
  621. if (!stmt)
  622. return false;
  623. int rc = mysql_stmt_fetch(*stmt);
  624. if (rc == MYSQL_NO_DATA)
  625. return false;
  626. else if (rc)
  627. fail(mysql_stmt_error(*stmt));
  628. else
  629. return true;
  630. }
  631. void execute()
  632. {
  633. if (stmt && *stmt)
  634. {
  635. // NOTE - we ignore all but the first result from stored procedures
  636. #if MYSQL_VERSION_ID >= 50500
  637. while (mysql_stmt_next_result(*stmt)==0)
  638. {
  639. // Ignore and discard any additional results for the current row - typically the status result from a stored procedure call
  640. mysql_stmt_free_result(*stmt);
  641. }
  642. #endif
  643. if (inputBindings.numColumns() && mysql_stmt_bind_param(*stmt, inputBindings.queryBindings()))
  644. fail(mysql_stmt_error(*stmt));
  645. if (mysql_stmt_execute(*stmt))
  646. fail(mysql_stmt_error(*stmt));
  647. res.setown(new MySQLResult(mysql_stmt_result_metadata(*stmt)));
  648. if (*res)
  649. {
  650. resultBindings.bindResults(*res);
  651. /* Bind the result buffers */
  652. if (mysql_stmt_bind_result(*stmt, resultBindings.queryBindings()))
  653. fail(mysql_stmt_error(*stmt));
  654. }
  655. }
  656. else
  657. {
  658. if (mysql_query(*conn, query))
  659. fail(mysql_error(*conn));
  660. }
  661. }
  662. inline const MySQLBindingArray &queryResultBindings() const
  663. {
  664. return resultBindings;
  665. }
  666. inline const MySQLBindingArray &queryInputBindings() const
  667. {
  668. return inputBindings;
  669. }
  670. inline bool hasResult() const
  671. {
  672. return *res != NULL;
  673. }
  674. protected:
  675. Linked<MySQLConnection> conn;
  676. Linked<MySQLStatement> stmt;
  677. Owned<MySQLResult> res;
  678. MySQLBindingArray inputBindings;
  679. MySQLBindingArray resultBindings;
  680. StringAttr query;
  681. };
  682. // Conversions from MySQL values to ECL data
  683. __declspec(noreturn) static void typeError(const char *expected, const RtlFieldInfo *field) __attribute__((noreturn));
  684. static void typeError(const char *expected, const RtlFieldInfo *field)
  685. {
  686. VStringBuffer msg("mysql: type mismatch - %s expected", expected);
  687. if (field)
  688. msg.appendf(" for field %s", field->name);
  689. rtlFail(0, msg.str());
  690. }
  691. static bool isInteger(enum_field_types type)
  692. {
  693. switch (type)
  694. {
  695. case MYSQL_TYPE_TINY:
  696. case MYSQL_TYPE_SHORT:
  697. case MYSQL_TYPE_LONG:
  698. case MYSQL_TYPE_LONGLONG:
  699. case MYSQL_TYPE_INT24:
  700. return true;
  701. default:
  702. return false;
  703. }
  704. }
  705. static bool isDateTime(enum_field_types type)
  706. {
  707. switch (type)
  708. {
  709. case MYSQL_TYPE_TIMESTAMP:
  710. case MYSQL_TYPE_DATETIME:
  711. case MYSQL_TYPE_DATE:
  712. case MYSQL_TYPE_TIME:
  713. return true;
  714. default:
  715. return false;
  716. }
  717. }
  718. static bool isString(enum_field_types type)
  719. {
  720. switch (type)
  721. {
  722. case MYSQL_TYPE_BIT: // Slightly dubious but MySQL seems to represent them as string fields in their gui...
  723. case MYSQL_TYPE_TINY_BLOB:
  724. case MYSQL_TYPE_MEDIUM_BLOB:
  725. case MYSQL_TYPE_LONG_BLOB:
  726. case MYSQL_TYPE_BLOB:
  727. case MYSQL_TYPE_STRING:
  728. case MYSQL_TYPE_VAR_STRING:
  729. return true;
  730. default:
  731. return false;
  732. }
  733. }
  734. static unsigned __int64 getDateTimeValue(const MYSQL_BIND &bound)
  735. {
  736. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  737. switch (bound.buffer_type)
  738. {
  739. case MYSQL_TYPE_TIMESTAMP:
  740. case MYSQL_TYPE_DATETIME:
  741. //What format should this be? Possibly a timestamp_t
  742. return (unsigned __int64)((time->year * 10000) + (time->month * 100) + (time->day)) * 1000000 +
  743. (time->hour * 10000) + (time->minute * 100) + (time->second);
  744. case MYSQL_TYPE_DATE:
  745. return (time->year * 10000) + (time->month * 100) + (time->day);
  746. case MYSQL_TYPE_TIME:
  747. return (time->hour * 10000) + (time->minute * 100) + (time->second);
  748. default:
  749. throwUnexpected();
  750. }
  751. }
  752. static void getDateTimeText(const MYSQL_BIND &bound, size32_t &chars, char * &result)
  753. {
  754. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  755. char temp[20];
  756. switch (bound.buffer_type)
  757. {
  758. case MYSQL_TYPE_TIMESTAMP:
  759. case MYSQL_TYPE_DATETIME:
  760. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u %02u:%02u:%02u", time->year, time->month, time->day, time->hour, time->minute, time->second);
  761. break;
  762. case MYSQL_TYPE_DATE:
  763. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u", time->year, time->month, time->day);
  764. break;
  765. case MYSQL_TYPE_TIME:
  766. _snprintf(temp, sizeof(temp), "%02u:%02u:%02u", time->hour, time->minute, time->second);
  767. break;
  768. default:
  769. throwUnexpected();
  770. }
  771. rtlStrToStrX(chars, result, strlen(temp), temp);
  772. }
  773. static bool getBooleanResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  774. {
  775. if (*bound.is_null)
  776. {
  777. NullFieldProcessor p(field);
  778. return p.boolResult;
  779. }
  780. if (!isInteger(bound.buffer_type))
  781. typeError("boolean", field);
  782. return rtlReadUInt(bound.buffer, *bound.length) != 0;
  783. }
  784. static void getDataResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, void * &result)
  785. {
  786. if (*bound.is_null)
  787. {
  788. NullFieldProcessor p(field);
  789. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  790. return;
  791. }
  792. if (isString(bound.buffer_type))
  793. rtlStrToDataX(chars, result, *bound.length, bound.buffer); // This feels like it may not work to me - will preallocate rather larger than we want
  794. else
  795. typeError("blob", field);
  796. }
  797. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  798. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  799. static double getRealResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  800. {
  801. if (*bound.is_null)
  802. {
  803. NullFieldProcessor p(field);
  804. return p.doubleResult;
  805. }
  806. if (bound.buffer_type == MYSQL_TYPE_BIT)
  807. return (double) getUnsignedResult(field, bound);
  808. if (isInteger(bound.buffer_type))
  809. {
  810. if (bound.is_unsigned)
  811. return (double) getUnsignedResult(field, bound);
  812. else
  813. return (double) getSignedResult(field, bound);
  814. }
  815. else if (bound.buffer_type == MYSQL_TYPE_FLOAT)
  816. return * (float *) bound.buffer;
  817. else if (bound.buffer_type == MYSQL_TYPE_DOUBLE)
  818. return * (double *) bound.buffer;
  819. else
  820. typeError("double", field);
  821. }
  822. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  823. {
  824. if (*bound.is_null)
  825. {
  826. NullFieldProcessor p(field);
  827. return p.intResult;
  828. }
  829. if (isDateTime(bound.buffer_type))
  830. return getDateTimeValue(bound);
  831. if (bound.buffer_type == MYSQL_TYPE_BIT)
  832. {
  833. // These are stored as big-endian values it seems...
  834. return (__int64) rtlReadSwapUInt(bound.buffer, *bound.length);
  835. }
  836. if (isInteger(bound.buffer_type))
  837. {
  838. if (bound.is_unsigned)
  839. return (__int64) rtlReadUInt(bound.buffer, *bound.length);
  840. else
  841. return rtlReadInt(bound.buffer, *bound.length);
  842. }
  843. else
  844. typeError("integer", field);
  845. }
  846. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  847. {
  848. if (*bound.is_null)
  849. {
  850. NullFieldProcessor p(field);
  851. return p.uintResult;
  852. }
  853. if (isDateTime(bound.buffer_type))
  854. return getDateTimeValue(bound);
  855. if (bound.buffer_type == MYSQL_TYPE_BIT)
  856. return rtlReadSwapUInt(bound.buffer, *bound.length);
  857. if (!isInteger(bound.buffer_type))
  858. typeError("integer", field);
  859. if (bound.is_unsigned)
  860. return rtlReadUInt(bound.buffer, *bound.length);
  861. else
  862. return (unsigned __int64) rtlReadInt(bound.buffer, *bound.length);
  863. }
  864. static void getStringResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  865. {
  866. if (*bound.is_null)
  867. {
  868. NullFieldProcessor p(field);
  869. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  870. return;
  871. }
  872. if (isDateTime(bound.buffer_type))
  873. {
  874. getDateTimeText(bound, chars, result);
  875. return;
  876. }
  877. if (!isString(bound.buffer_type))
  878. typeError("string", field);
  879. const char *text = (const char *) bound.buffer;
  880. unsigned long bytes = *bound.length;
  881. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  882. rtlUtf8ToStrX(chars, result, numchars, text);
  883. }
  884. static void getUTF8Result(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  885. {
  886. if (*bound.is_null)
  887. {
  888. NullFieldProcessor p(field);
  889. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  890. return;
  891. }
  892. if (isDateTime(bound.buffer_type))
  893. {
  894. getDateTimeText(bound, chars, result);
  895. return;
  896. }
  897. if (!isString(bound.buffer_type))
  898. typeError("string", field);
  899. const char *text = (const char *) bound.buffer;
  900. unsigned long bytes = *bound.length;
  901. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  902. rtlUtf8ToUtf8X(chars, result, numchars, text);
  903. }
  904. static void getUnicodeResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, UChar * &result)
  905. {
  906. if (*bound.is_null)
  907. {
  908. NullFieldProcessor p(field);
  909. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  910. return;
  911. }
  912. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  913. typeError("string", field);
  914. const char *text = (const char *) bound.buffer;
  915. unsigned long bytes = *bound.length;
  916. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  917. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  918. }
  919. static void getDecimalResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, Decimal &value)
  920. {
  921. if (*bound.is_null)
  922. {
  923. NullFieldProcessor p(field);
  924. value.set(p.decimalResult);
  925. return;
  926. }
  927. size32_t chars;
  928. rtlDataAttr result;
  929. mysqlembed::getStringResult(field, bound, chars, result.refstr());
  930. value.setString(chars, result.getstr());
  931. if (field)
  932. {
  933. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  934. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  935. }
  936. }
  937. static void createBindBuffer(MYSQL_BIND & bindInfo, enum_field_types sqlType, unsigned size)
  938. {
  939. if (size)
  940. {
  941. if (!bindInfo.buffer)
  942. {
  943. bindInfo.buffer_type = sqlType;
  944. bindInfo.buffer = rtlMalloc(size);
  945. }
  946. else
  947. assertex(bindInfo.buffer_type == sqlType);
  948. }
  949. else
  950. {
  951. // Buffer is reallocated each time - caller is responsible for it.
  952. bindInfo.buffer_type = sqlType;
  953. rtlFree(bindInfo.buffer);
  954. bindInfo.buffer = NULL;
  955. }
  956. }
  957. // A MySQLRowBuilder object is used to construct an ECL row from a MySQL row
  958. class MySQLRowBuilder : public CInterfaceOf<IFieldSource>
  959. {
  960. public:
  961. MySQLRowBuilder(const MySQLBindingArray &_resultInfo)
  962. : resultInfo(_resultInfo), colIdx(-1)
  963. {
  964. }
  965. virtual bool getBooleanResult(const RtlFieldInfo *field)
  966. {
  967. return mysqlembed::getBooleanResult(field, nextField(field));
  968. }
  969. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  970. {
  971. mysqlembed::getDataResult(field, nextField(field), len, result);
  972. }
  973. virtual double getRealResult(const RtlFieldInfo *field)
  974. {
  975. return mysqlembed::getRealResult(field, nextField(field));
  976. }
  977. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  978. {
  979. return mysqlembed::getSignedResult(field, nextField(field));
  980. }
  981. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  982. {
  983. return mysqlembed::getUnsignedResult(field, nextField(field));
  984. }
  985. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  986. {
  987. mysqlembed::getStringResult(field, nextField(field), chars, result);
  988. }
  989. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  990. {
  991. mysqlembed::getUTF8Result(field, nextField(field), chars, result);
  992. }
  993. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  994. {
  995. mysqlembed::getUnicodeResult(field, nextField(field), chars, result);
  996. }
  997. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  998. {
  999. mysqlembed::getDecimalResult(field, nextField(field), value);
  1000. }
  1001. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  1002. {
  1003. UNSUPPORTED("SET fields");
  1004. }
  1005. virtual bool processNextSet(const RtlFieldInfo * field)
  1006. {
  1007. throwUnexpected();
  1008. }
  1009. virtual void processBeginDataset(const RtlFieldInfo * field)
  1010. {
  1011. UNSUPPORTED("Nested datasets");
  1012. }
  1013. virtual void processBeginRow(const RtlFieldInfo * field)
  1014. {
  1015. }
  1016. virtual bool processNextRow(const RtlFieldInfo * field)
  1017. {
  1018. throwUnexpected();
  1019. }
  1020. virtual void processEndSet(const RtlFieldInfo * field)
  1021. {
  1022. throwUnexpected();
  1023. }
  1024. virtual void processEndDataset(const RtlFieldInfo * field)
  1025. {
  1026. throwUnexpected();
  1027. }
  1028. virtual void processEndRow(const RtlFieldInfo * field)
  1029. {
  1030. }
  1031. protected:
  1032. const MYSQL_BIND &nextField(const RtlFieldInfo * field)
  1033. {
  1034. if (colIdx < resultInfo.numColumns())
  1035. colIdx++;
  1036. else
  1037. fail("Too many fields in ECL output row");
  1038. const MYSQL_BIND &column = resultInfo.queryColumn(colIdx,field->name);
  1039. if (*column.error)
  1040. failx("Error fetching column %s", field->name);
  1041. return column;
  1042. }
  1043. const MySQLBindingArray &resultInfo;
  1044. int colIdx;
  1045. };
  1046. // Bind MySQL variables from an ECL record
  1047. class MySQLRecordBinder : public CInterfaceOf<IFieldProcessor>
  1048. {
  1049. public:
  1050. MySQLRecordBinder(const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  1051. : typeInfo(_typeInfo), bindings(_bindings), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  1052. {
  1053. }
  1054. int numFields()
  1055. {
  1056. int count = 0;
  1057. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  1058. assertex(fields);
  1059. while (*fields++)
  1060. count++;
  1061. return count;
  1062. }
  1063. void processRow(const byte *row)
  1064. {
  1065. thisParam = firstParam;
  1066. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  1067. }
  1068. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  1069. {
  1070. size32_t utf8chars;
  1071. char *utf8;
  1072. rtlStrToUtf8X(utf8chars, utf8, len, value);
  1073. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1074. bindInfo.buffer = utf8;
  1075. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1076. bindInfo.length = &bindInfo.buffer_length;
  1077. }
  1078. virtual void processBool(bool value, const RtlFieldInfo * field)
  1079. {
  1080. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_TINY, sizeof(value));
  1081. * (bool *) bindInfo.buffer = value;
  1082. bindInfo.is_unsigned = true;
  1083. }
  1084. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  1085. {
  1086. size32_t bytes;
  1087. void *data;
  1088. rtlStrToDataX(bytes, data, len, value);
  1089. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_BLOB, 0);
  1090. bindInfo.buffer = data;
  1091. bindInfo.buffer_length = bytes;
  1092. bindInfo.length = &bindInfo.buffer_length;
  1093. }
  1094. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  1095. {
  1096. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  1097. * (__int64 *) bindInfo.buffer = value;
  1098. bindInfo.is_unsigned = false;
  1099. }
  1100. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  1101. {
  1102. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  1103. * (unsigned __int64 *) bindInfo.buffer = value;
  1104. bindInfo.is_unsigned = true;
  1105. }
  1106. virtual void processReal(double value, const RtlFieldInfo * field)
  1107. {
  1108. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_DOUBLE, sizeof(value));
  1109. * (double *) bindInfo.buffer = value;
  1110. }
  1111. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1112. {
  1113. Decimal val;
  1114. size32_t bytes;
  1115. char *data;
  1116. val.setDecimal(digits, precision, value);
  1117. val.getStringX(bytes, data);
  1118. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1119. bindInfo.buffer = data;
  1120. bindInfo.buffer_length = bytes;
  1121. bindInfo.length = &bindInfo.buffer_length;
  1122. }
  1123. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1124. {
  1125. Decimal val;
  1126. size32_t bytes;
  1127. char *data;
  1128. val.setUDecimal(digits, precision, value);
  1129. val.getStringX(bytes, data);
  1130. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1131. bindInfo.buffer = data;
  1132. bindInfo.buffer_length = bytes;
  1133. bindInfo.length = &bindInfo.buffer_length;
  1134. }
  1135. virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
  1136. {
  1137. size32_t utf8chars;
  1138. char *utf8;
  1139. rtlUnicodeToUtf8X(utf8chars, utf8, len, value);
  1140. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1141. bindInfo.buffer = utf8;
  1142. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1143. bindInfo.length = &bindInfo.buffer_length;
  1144. }
  1145. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1146. {
  1147. size32_t charCount;
  1148. rtlDataAttr text;
  1149. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1150. processString(charCount, text.getstr(), field);
  1151. }
  1152. virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
  1153. {
  1154. size32_t utf8chars;
  1155. char *utf8;
  1156. rtlUtf8ToUtf8X(utf8chars, utf8, len, value);
  1157. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1158. bindInfo.buffer = utf8;
  1159. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1160. bindInfo.length = &bindInfo.buffer_length;
  1161. }
  1162. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1163. {
  1164. UNSUPPORTED("SET fields");
  1165. return false;
  1166. }
  1167. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1168. {
  1169. UNSUPPORTED("Nested datasets");
  1170. return false;
  1171. }
  1172. virtual bool processBeginRow(const RtlFieldInfo * field)
  1173. {
  1174. return true;
  1175. }
  1176. virtual void processEndSet(const RtlFieldInfo * field)
  1177. {
  1178. throwUnexpected();
  1179. }
  1180. virtual void processEndDataset(const RtlFieldInfo * field)
  1181. {
  1182. throwUnexpected();
  1183. }
  1184. virtual void processEndRow(const RtlFieldInfo * field)
  1185. {
  1186. }
  1187. protected:
  1188. MYSQL_BIND &createBindBuffer(enum_field_types sqlType, unsigned size)
  1189. {
  1190. MYSQL_BIND &bindInfo = bindings.queryColumn(thisParam++, NULL);
  1191. mysqlembed::createBindBuffer(bindInfo, sqlType, size);
  1192. return bindInfo;
  1193. }
  1194. const RtlTypeInfo *typeInfo;
  1195. const MySQLBindingArray &bindings;
  1196. int firstParam;
  1197. RtlFieldStrInfo dummyField;
  1198. int thisParam;
  1199. };
  1200. //
  1201. class MySQLDatasetBinder : public MySQLRecordBinder
  1202. {
  1203. public:
  1204. MySQLDatasetBinder(IRowStream * _input, const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  1205. : input(_input), MySQLRecordBinder(_typeInfo, _bindings, _firstParam)
  1206. {
  1207. }
  1208. bool bindNext()
  1209. {
  1210. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1211. if (!nextRow)
  1212. return false;
  1213. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1214. return true;
  1215. }
  1216. void executeAll(MySQLPreparedStatement *stmtInfo)
  1217. {
  1218. while (bindNext())
  1219. {
  1220. stmtInfo->execute();
  1221. }
  1222. }
  1223. protected:
  1224. Owned<IRowStream> input;
  1225. };
  1226. // A MySQL function that returns a dataset will return a MySQLRowStream object that can be
  1227. // interrogated to return each row of the result in turn
  1228. class MySQLRowStream : public CInterfaceOf<IRowStream>
  1229. {
  1230. public:
  1231. MySQLRowStream(MySQLDatasetBinder *_inputStream, MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1232. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1233. {
  1234. executePending = true;
  1235. eof = false;
  1236. }
  1237. virtual const void *nextRow()
  1238. {
  1239. // A little complex when streaming data in as well as out - want to execute for every input record
  1240. if (eof)
  1241. return NULL;
  1242. for (;;)
  1243. {
  1244. if (executePending)
  1245. {
  1246. executePending = false;
  1247. if (inputStream && !inputStream->bindNext())
  1248. {
  1249. noteEOF();
  1250. return NULL;
  1251. }
  1252. stmtInfo->execute();
  1253. }
  1254. if (stmtInfo->next())
  1255. break;
  1256. if (inputStream)
  1257. executePending = true;
  1258. else
  1259. {
  1260. noteEOF();
  1261. return NULL;
  1262. }
  1263. }
  1264. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1265. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  1266. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1267. assertex(typeInfo);
  1268. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1269. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  1270. return rowBuilder.finalizeRowClear(len);
  1271. }
  1272. virtual void stop()
  1273. {
  1274. resultAllocator.clear();
  1275. stmtInfo->stop();
  1276. }
  1277. protected:
  1278. void noteEOF()
  1279. {
  1280. if (!eof)
  1281. {
  1282. eof = true;
  1283. stop();
  1284. }
  1285. }
  1286. Linked<MySQLDatasetBinder> inputStream;
  1287. Linked<MySQLPreparedStatement> stmtInfo;
  1288. Linked<IEngineRowAllocator> resultAllocator;
  1289. bool executePending;
  1290. bool eof;
  1291. };
  1292. // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
  1293. static bool mysqlInitialized = false;
  1294. static __thread bool mysqlThreadInitialized = false;
  1295. static CriticalSection initCrit;
  1296. static bool terminateMySqlThread(bool isPooled)
  1297. {
  1298. MySQLConnection::clearThreadCache();
  1299. mysql_thread_end();
  1300. mysqlThreadInitialized = false; // In case it was a threadpool thread...
  1301. return false;
  1302. }
  1303. static void initializeMySqlThread()
  1304. {
  1305. if (!mysqlThreadInitialized)
  1306. {
  1307. {
  1308. CriticalBlock b(initCrit);
  1309. if (!mysqlInitialized)
  1310. {
  1311. mysqlInitialized = true;
  1312. mysql_library_init(0, NULL, NULL);
  1313. }
  1314. }
  1315. mysql_thread_init();
  1316. addThreadTermFunc(terminateMySqlThread);
  1317. mysqlThreadInitialized = true;
  1318. }
  1319. }
  1320. class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1321. {
  1322. public:
  1323. MySQLEmbedFunctionContext(const IThorActivityContext *_ctx, unsigned _flags, const char *options)
  1324. : flags(_flags), nextParam(0), activityCtx(_ctx)
  1325. {
  1326. initializeMySqlThread();
  1327. conn.setown(MySQLConnection::findCachedConnection(options, false));
  1328. }
  1329. virtual bool getBooleanResult()
  1330. {
  1331. bool ret = mysqlembed::getBooleanResult(NULL, getScalarResult());
  1332. checkSingleRow();
  1333. return ret;
  1334. }
  1335. virtual void getDataResult(size32_t &len, void * &result)
  1336. {
  1337. mysqlembed::getDataResult(NULL, getScalarResult(), len, result);
  1338. checkSingleRow();
  1339. }
  1340. virtual double getRealResult()
  1341. {
  1342. double ret = mysqlembed::getRealResult(NULL, getScalarResult());
  1343. checkSingleRow();
  1344. return ret;
  1345. }
  1346. virtual __int64 getSignedResult()
  1347. {
  1348. __int64 ret = mysqlembed::getSignedResult(NULL, getScalarResult());
  1349. checkSingleRow();
  1350. return ret;
  1351. }
  1352. virtual unsigned __int64 getUnsignedResult()
  1353. {
  1354. unsigned __int64 ret = mysqlembed::getUnsignedResult(NULL, getScalarResult());
  1355. checkSingleRow();
  1356. return ret;
  1357. }
  1358. virtual void getStringResult(size32_t &chars, char * &result)
  1359. {
  1360. mysqlembed::getStringResult(NULL, getScalarResult(), chars, result);
  1361. checkSingleRow();
  1362. }
  1363. virtual void getUTF8Result(size32_t &chars, char * &result)
  1364. {
  1365. mysqlembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1366. checkSingleRow();
  1367. }
  1368. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1369. {
  1370. mysqlembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1371. checkSingleRow();
  1372. }
  1373. virtual void getDecimalResult(Decimal &value)
  1374. {
  1375. mysqlembed::getDecimalResult(NULL, getScalarResult(), value);
  1376. checkSingleRow();
  1377. }
  1378. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1379. {
  1380. UNSUPPORTED("SET results");
  1381. }
  1382. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1383. {
  1384. return new MySQLRowStream(inputStream, stmtInfo, _resultAllocator);
  1385. }
  1386. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1387. {
  1388. lazyExecute();
  1389. if (!stmtInfo->hasResult())
  1390. typeError("row", NULL);
  1391. MySQLRowStream stream(NULL, stmtInfo, _resultAllocator);
  1392. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1393. roxiemem::OwnedConstRoxieRow ret2 = stream.nextRow();
  1394. stream.stop();
  1395. if (ret == NULL || ret2 != NULL) // Check for exactly one returned row
  1396. typeError("row", NULL);
  1397. return (byte *) ret.getClear();
  1398. }
  1399. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1400. {
  1401. lazyExecute();
  1402. if (!stmtInfo->next())
  1403. typeError("row", NULL);
  1404. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  1405. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1406. assertex(typeInfo);
  1407. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1408. size32_t ret = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  1409. if (stmtInfo->next())
  1410. typeError("row", NULL); // Check that a single row was returned
  1411. return ret;
  1412. }
  1413. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, const byte *val) override
  1414. {
  1415. MySQLRecordBinder binder(metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam);
  1416. binder.processRow(val);
  1417. nextParam += binder.numFields();
  1418. }
  1419. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1420. {
  1421. // We only support a single dataset parameter...
  1422. if (inputStream)
  1423. {
  1424. fail("At most one dataset parameter supported");
  1425. }
  1426. inputStream.setown(new MySQLDatasetBinder(LINK(val), metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam));
  1427. nextParam += inputStream->numFields();
  1428. }
  1429. virtual void bindBooleanParam(const char *name, bool val)
  1430. {
  1431. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_TINY, sizeof(val));
  1432. * (bool *) bindInfo.buffer = val;
  1433. bindInfo.is_unsigned = true;
  1434. }
  1435. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1436. {
  1437. size32_t bytes;
  1438. void *data;
  1439. rtlStrToDataX(bytes, data, len, val);
  1440. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_BLOB, 0);
  1441. bindInfo.buffer = data;
  1442. bindInfo.buffer_length = bytes;
  1443. bindInfo.length = &bindInfo.buffer_length;
  1444. }
  1445. virtual void bindFloatParam(const char *name, float val)
  1446. {
  1447. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_FLOAT, sizeof(val));
  1448. * (float *) bindInfo.buffer = val;
  1449. }
  1450. virtual void bindRealParam(const char *name, double val)
  1451. {
  1452. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_DOUBLE, sizeof(val));
  1453. * (double *) bindInfo.buffer = val;
  1454. }
  1455. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1456. {
  1457. bindSignedParam(name, val);
  1458. }
  1459. virtual void bindSignedParam(const char *name, __int64 val)
  1460. {
  1461. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1462. * (__int64 *) bindInfo.buffer = val;
  1463. bindInfo.is_unsigned = false;
  1464. }
  1465. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1466. {
  1467. bindUnsignedParam(name, val);
  1468. }
  1469. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1470. {
  1471. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1472. * (unsigned __int64 *) bindInfo.buffer = val;
  1473. bindInfo.is_unsigned = true;
  1474. }
  1475. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1476. {
  1477. size32_t utf8chars;
  1478. char *utf8;
  1479. rtlStrToUtf8X(utf8chars, utf8, len, val);
  1480. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1481. bindInfo.buffer = utf8;
  1482. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1483. bindInfo.length = &bindInfo.buffer_length;
  1484. }
  1485. virtual void bindVStringParam(const char *name, const char *val)
  1486. {
  1487. bindStringParam(name, strlen(val), val);
  1488. }
  1489. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1490. {
  1491. size32_t utf8chars;
  1492. char *utf8;
  1493. rtlUtf8ToUtf8X(utf8chars, utf8, chars, val);
  1494. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1495. bindInfo.buffer = utf8;
  1496. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1497. bindInfo.length = &bindInfo.buffer_length;
  1498. }
  1499. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1500. {
  1501. size32_t utf8chars;
  1502. char *utf8;
  1503. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  1504. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1505. bindInfo.buffer = utf8;
  1506. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1507. bindInfo.length = &bindInfo.buffer_length;
  1508. }
  1509. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  1510. {
  1511. UNSUPPORTED("SET parameters"); // MySQL does support sets, so MIGHT be possible...
  1512. }
  1513. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  1514. {
  1515. return NULL;
  1516. }
  1517. virtual void paramWriterCommit(IInterface *writer)
  1518. {
  1519. }
  1520. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  1521. {
  1522. }
  1523. virtual void importFunction(size32_t lenChars, const char *text)
  1524. {
  1525. throwUnexpected();
  1526. }
  1527. virtual void compileEmbeddedScript(size32_t chars, const char *script)
  1528. {
  1529. StringBuffer scriptStr;
  1530. size32_t len;
  1531. if (activityCtx)
  1532. {
  1533. rtlSubstituteActivityContext(scriptStr, activityCtx, chars, script);
  1534. script = scriptStr.str();
  1535. len = scriptStr.length();
  1536. }
  1537. else
  1538. len = rtlUtf8Size(chars, script);
  1539. for (;;)
  1540. {
  1541. Owned<MySQLStatement> stmt = new MySQLStatement(mysql_stmt_init(*conn));
  1542. if (!*stmt)
  1543. fail("failed to create statement");
  1544. if (mysql_stmt_prepare(*stmt, script, len))
  1545. {
  1546. int rc = mysql_stmt_errno(*stmt);
  1547. if (rc == ER_UNSUPPORTED_PS)
  1548. {
  1549. // Some functions are not supported in prepared statements, but are still handy to be able to call
  1550. // So long as they have no bound vars and no return value, we can probably call them ok
  1551. if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
  1552. {
  1553. stmtInfo.setown(new MySQLPreparedStatement(conn, script, len));
  1554. break;
  1555. }
  1556. fail(mysql_stmt_error(*stmt));
  1557. }
  1558. // If we get an error, it could be that the cached connection is stale - retry
  1559. if (conn->wasCached())
  1560. {
  1561. conn.setown(conn->reopen());
  1562. continue;
  1563. }
  1564. fail(mysql_stmt_error(*stmt));
  1565. }
  1566. stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
  1567. break;
  1568. }
  1569. }
  1570. virtual void callFunction()
  1571. {
  1572. if (nextParam != stmtInfo->queryInputBindings().numColumns())
  1573. failx("Not enough parameters supplied (%d parameters supplied, but statement has %d bound columns)", nextParam, stmtInfo->queryInputBindings().numColumns());
  1574. // We actually do the execute later, when the result is fetched
  1575. // Unless, there is no expected result, in that case execute query now
  1576. if (flags & EFnoreturn)
  1577. lazyExecute();
  1578. }
  1579. virtual void loadCompiledScript(size32_t chars, const void *_script) override
  1580. {
  1581. throwUnexpected();
  1582. }
  1583. virtual void enter() override {}
  1584. virtual void reenter(ICodeContext *codeCtx) override {}
  1585. virtual void exit() override {}
  1586. protected:
  1587. void lazyExecute()
  1588. {
  1589. if (inputStream)
  1590. inputStream->executeAll(stmtInfo);
  1591. else
  1592. stmtInfo->execute();
  1593. }
  1594. const MYSQL_BIND &getScalarResult()
  1595. {
  1596. lazyExecute();
  1597. if (!stmtInfo->hasResult() || stmtInfo->queryResultBindings().numColumns() != 1)
  1598. typeError("scalar", NULL);
  1599. if (!stmtInfo->next())
  1600. typeError("scalar", NULL);
  1601. return stmtInfo->queryResultBindings().queryColumn(0, NULL);
  1602. }
  1603. void checkSingleRow()
  1604. {
  1605. if (stmtInfo->next())
  1606. typeError("scalar", NULL);
  1607. }
  1608. inline MYSQL_BIND &findParameter(const char *name, enum_field_types sqlType, unsigned size)
  1609. {
  1610. // Everything is positional in MySQL
  1611. MYSQL_BIND &bindInfo = stmtInfo->queryInputBindings().queryColumn(nextParam++, name);
  1612. createBindBuffer(bindInfo, sqlType, size);
  1613. return bindInfo;
  1614. }
  1615. Owned<MySQLConnection> conn;
  1616. Owned<MySQLPreparedStatement> stmtInfo;
  1617. Owned<MySQLDatasetBinder> inputStream;
  1618. const IThorActivityContext *activityCtx;
  1619. unsigned flags;
  1620. int nextParam;
  1621. };
  1622. class MySQLEmbedContext : public CInterfaceOf<IEmbedContext>
  1623. {
  1624. public:
  1625. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
  1626. {
  1627. return createFunctionContextEx(nullptr, nullptr, flags, options);
  1628. }
  1629. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
  1630. {
  1631. if (flags & EFimport)
  1632. UNSUPPORTED("IMPORT");
  1633. else
  1634. return new MySQLEmbedFunctionContext(activityCtx, flags, options);
  1635. }
  1636. virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
  1637. {
  1638. throwUnexpected();
  1639. }
  1640. };
  1641. extern DECL_EXPORT IEmbedContext* getEmbedContext()
  1642. {
  1643. return new MySQLEmbedContext();
  1644. }
  1645. extern DECL_EXPORT bool syntaxCheck(const char *script)
  1646. {
  1647. return true; // MORE
  1648. }
  1649. } // namespace