mysqlembed.cpp 55 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "mysql.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. __declspec(noreturn) static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  28. static unsigned mysqlCacheCheckPeriod = 10000;
  29. static unsigned mysqlCacheTimeoutPeriod = 60000;
  30. static unsigned mysqlConnectionCacheSize = 10;
  31. static void UNSUPPORTED(const char *feature)
  32. {
  33. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in mysql plugin", feature);
  34. }
  35. static const char * compatibleVersions[] = {
  36. "MySQL Embed Helper 1.0.0",
  37. NULL };
  38. static const char *version = "MySQL Embed Helper 1.0.0";
  39. extern "C" DECL_EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  40. {
  41. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  42. {
  43. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  44. pbx->compatibleVersions = compatibleVersions;
  45. }
  46. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  47. return false;
  48. pb->magicVersion = PLUGIN_VERSION;
  49. pb->version = version;
  50. pb->moduleName = "mysql";
  51. pb->ECL = NULL;
  52. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  53. pb->description = "MySQL Embed Helper";
  54. return true;
  55. }
  56. extern "C" DECL_EXPORT void setPluginContextEx(IPluginContextEx * _ctx)
  57. {
  58. mysqlCacheCheckPeriod = _ctx->ctxGetPropInt("@mysqlCacheCheckPeriod", 10000);
  59. mysqlCacheTimeoutPeriod = _ctx->ctxGetPropInt("@mysqlCacheTimeoutPeriod", 60000);
  60. mysqlConnectionCacheSize = _ctx->ctxGetPropInt("@mysqlConnectionCacheSize", 10);
  61. }
  62. namespace mysqlembed {
  63. __declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
  64. __declspec(noreturn) static void fail(const char *msg) __attribute__((noreturn));
  65. static void failx(const char *message, ...)
  66. {
  67. va_list args;
  68. va_start(args,message);
  69. StringBuffer msg;
  70. msg.append("mysql: ").valist_appendf(message,args);
  71. va_end(args);
  72. rtlFail(0, msg.str());
  73. }
  74. static void fail(const char *message)
  75. {
  76. StringBuffer msg;
  77. msg.append("mysql: ").append(message);
  78. rtlFail(0, msg.str());
  79. }
  80. // Wrappers to MySQL structures that require corresponding releases
  81. class MySQLConnection;
  82. static __thread MySQLConnection *threadCachedConnection = nullptr;
  83. enum MySQLOptionParamType
  84. {
  85. ParamTypeNone,
  86. ParamTypeString,
  87. ParamTypeUInt,
  88. ParamTypeULong,
  89. ParamTypeBool
  90. };
  91. struct MySQLOptionDefinition
  92. {
  93. const char *name;
  94. enum mysql_option option;
  95. MySQLOptionParamType paramType;
  96. };
  97. #define addoption(a,b) { #a, a, b }
  98. MySQLOptionDefinition options[] =
  99. {
  100. addoption(MYSQL_OPT_COMPRESS, ParamTypeNone),
  101. addoption(MYSQL_OPT_CONNECT_TIMEOUT, ParamTypeUInt),
  102. addoption(MYSQL_OPT_GUESS_CONNECTION, ParamTypeNone),
  103. addoption(MYSQL_OPT_LOCAL_INFILE, ParamTypeUInt),
  104. addoption(MYSQL_OPT_NAMED_PIPE, ParamTypeNone),
  105. addoption(MYSQL_OPT_PROTOCOL, ParamTypeUInt),
  106. addoption(MYSQL_OPT_READ_TIMEOUT, ParamTypeUInt),
  107. addoption(MYSQL_OPT_RECONNECT, ParamTypeBool),
  108. addoption(MYSQL_OPT_SSL_VERIFY_SERVER_CERT, ParamTypeBool),
  109. addoption(MYSQL_OPT_USE_EMBEDDED_CONNECTION, ParamTypeNone),
  110. addoption(MYSQL_OPT_USE_REMOTE_CONNECTION, ParamTypeNone),
  111. addoption(MYSQL_OPT_USE_RESULT, ParamTypeNone),
  112. addoption(MYSQL_OPT_WRITE_TIMEOUT, ParamTypeUInt),
  113. addoption(MYSQL_READ_DEFAULT_FILE, ParamTypeString),
  114. addoption(MYSQL_READ_DEFAULT_GROUP, ParamTypeString),
  115. addoption(MYSQL_REPORT_DATA_TRUNCATION, ParamTypeBool),
  116. addoption(MYSQL_SECURE_AUTH, ParamTypeBool),
  117. addoption(MYSQL_SET_CHARSET_DIR, ParamTypeString),
  118. addoption(MYSQL_SET_CHARSET_NAME, ParamTypeString),
  119. addoption(MYSQL_SET_CLIENT_IP, ParamTypeString),
  120. addoption(MYSQL_SHARED_MEMORY_BASE_NAME, ParamTypeString),
  121. #if MYSQL_VERSION_ID >= 50507
  122. addoption(MYSQL_DEFAULT_AUTH, ParamTypeString),
  123. addoption(MYSQL_PLUGIN_DIR, ParamTypeString),
  124. #endif
  125. #if (MYSQL_VERSION_ID >= 50601)
  126. addoption(MYSQL_OPT_BIND, ParamTypeString),
  127. #endif
  128. #if (MYSQL_VERSION_ID >= 50603)
  129. addoption(MYSQL_OPT_SSL_CA, ParamTypeString),
  130. addoption(MYSQL_OPT_SSL_CAPATH, ParamTypeString),
  131. addoption(MYSQL_OPT_SSL_CERT, ParamTypeString),
  132. addoption(MYSQL_OPT_SSL_CIPHER, ParamTypeString),
  133. addoption(MYSQL_OPT_SSL_CRL, ParamTypeString),
  134. addoption(MYSQL_OPT_SSL_CRLPATH, ParamTypeString),
  135. addoption(MYSQL_OPT_SSL_KEY, ParamTypeString),
  136. #endif
  137. #if (MYSQL_VERSION_ID >= 50606)
  138. addoption(MYSQL_SERVER_PUBLIC_KEY, ParamTypeString),
  139. #endif
  140. #if (MYSQL_VERSION_ID >= 50527 && MYSQL_VERSION_ID < 50600) || MYSQL_VERSION_ID >= 50607
  141. addoption(MYSQL_ENABLE_CLEARTEXT_PLUGIN, ParamTypeBool),
  142. #endif
  143. addoption(MYSQL_INIT_COMMAND, ParamTypeString),
  144. #if (MYSQL_VERSION_ID >= 50610)
  145. addoption(MYSQL_OPT_CAN_HANDLE_EXPIRED_PASSWORDS, ParamTypeBool),
  146. #endif
  147. #if (MYSQL_VERSION_ID >= 50703)
  148. addoption(MYSQL_OPT_SSL_ENFORCE, ParamTypeBool),
  149. #endif
  150. #if (MYSQL_VERSION_ID >= 50709)
  151. addoption(MYSQL_OPT_MAX_ALLOWED_PACKET, ParamTypeULong),
  152. addoption(MYSQL_OPT_NET_BUFFER_LENGTH, ParamTypeULong),
  153. #endif
  154. #if (MYSQL_VERSION_ID >= 50710)
  155. addoption(MYSQL_OPT_TLS_VERSION, ParamTypeString),
  156. #endif
  157. #if (MYSQL_VERSION_ID >= 50711)
  158. addoption(MYSQL_OPT_SSL_MODE, ParamTypeUInt),
  159. #endif
  160. { nullptr, (enum mysql_option) 0, ParamTypeNone }
  161. };
  162. static MySQLOptionDefinition &lookupOption(const char *optName)
  163. {
  164. for (MySQLOptionDefinition *optDef = options; optDef->name != nullptr; optDef++)
  165. {
  166. if (stricmp(optName, optDef->name)==0)
  167. return *optDef;
  168. }
  169. failx("Unknown option %s", optName);
  170. }
  171. class MySQLConnectionCloserThread : public Thread
  172. {
  173. virtual int run() override;
  174. public:
  175. static Semaphore closing;
  176. } *connectionCloserThread = nullptr;
  177. class MySQLConnection : public CInterface
  178. {
  179. public:
  180. MySQLConnection(MYSQL *_conn, const char *_cacheOptions, bool _threadCached, bool _globalCached) : conn(_conn), threadCached(_threadCached), globalCached(_globalCached)
  181. {
  182. if (_cacheOptions && (threadCached || globalCached))
  183. cacheOptions = strdup(_cacheOptions);
  184. else
  185. cacheOptions = nullptr;
  186. created = msTick();
  187. }
  188. ~MySQLConnection()
  189. {
  190. if (conn)
  191. {
  192. if (threadCached || globalCached)
  193. {
  194. Owned<MySQLConnection> cacheEntry = new MySQLConnection(*this); // Note - takes ownership of this->cacheOptions and this->conn
  195. cacheOptions = NULL;
  196. conn = NULL;
  197. if (threadCached)
  198. setThreadCache(cacheEntry.getLink());
  199. else // globalCached
  200. {
  201. CriticalBlock b(globalCacheCrit);
  202. if (globalCachedConnections.length()==mysqlConnectionCacheSize)
  203. {
  204. MySQLConnection &goer = globalCachedConnections.popGet();
  205. goer.globalCached = false; // Make sure we don't recache it!
  206. goer.Release();
  207. }
  208. globalCachedConnections.add(*cacheEntry.getClear(), 0);
  209. if (!connectionCloserThread)
  210. {
  211. connectionCloserThread = new MySQLConnectionCloserThread;
  212. connectionCloserThread->start();
  213. }
  214. }
  215. }
  216. else
  217. {
  218. mysql_close(conn);
  219. free((char *) cacheOptions);
  220. }
  221. }
  222. }
  223. inline operator MYSQL *() const
  224. {
  225. return conn;
  226. }
  227. inline bool matches (const char *_options)
  228. {
  229. return _options && cacheOptions && streq(_options, cacheOptions);
  230. }
  231. static MySQLConnection *findCachedConnection(const char *options, bool bypassCache)
  232. {
  233. const char *server = "localhost";
  234. const char *user = "";
  235. const char *password = "";
  236. const char *database = "";
  237. bool hasMySQLOpt = false;
  238. bool threadCache = false;
  239. bool globalCache = true;
  240. unsigned port = 0;
  241. StringArray opts;
  242. opts.appendList(options, ",");
  243. ForEachItemIn(idx, opts)
  244. {
  245. const char *opt = opts.item(idx);
  246. const char *val = strchr(opt, '=');
  247. if (val)
  248. {
  249. StringBuffer optName(val-opt, opt);
  250. val++;
  251. if (stricmp(optName, "server")==0)
  252. server = val; // Note that lifetime of val is adequate for this to be safe
  253. else if (stricmp(optName, "port")==0)
  254. port = atoi(val);
  255. else if (stricmp(optName, "user")==0)
  256. user = val;
  257. else if (stricmp(optName, "password")==0)
  258. password = val;
  259. else if (stricmp(optName, "database")==0)
  260. database = val;
  261. else if (stricmp(optName, "cache")==0)
  262. {
  263. if (clipStrToBool(val) || strieq(val, "thread"))
  264. {
  265. threadCache = true;
  266. globalCache = false;
  267. }
  268. else if (strieq(val, "global"))
  269. globalCache = true;
  270. else if (strieq(val, "none") || strieq(val, "false") || strieq(val, "off") || strieq(val, "0"))
  271. globalCache = false;
  272. else
  273. failx("Unknown cache option %s", val);
  274. }
  275. else if (strnicmp(optName, "MYSQL_", 6)==0)
  276. hasMySQLOpt = true;
  277. else
  278. failx("Unknown option %s", optName.str());
  279. }
  280. }
  281. if (!bypassCache)
  282. {
  283. if (threadCache)
  284. {
  285. if (threadCachedConnection && threadCachedConnection->matches(options))
  286. {
  287. MySQLConnection *ret = threadCachedConnection;
  288. threadCachedConnection = nullptr;
  289. return ret;
  290. }
  291. }
  292. else if (globalCache)
  293. {
  294. CriticalBlock b(globalCacheCrit);
  295. ForEachItemIn(idx, globalCachedConnections)
  296. {
  297. MySQLConnection &cached = globalCachedConnections.item(idx);
  298. if (cached.matches(options))
  299. {
  300. globalCachedConnections.remove(idx, true);
  301. return &cached;
  302. }
  303. }
  304. }
  305. }
  306. MySQLConnection::clearThreadCache();
  307. Owned<MySQLConnection> newConn = new MySQLConnection(mysql_init(NULL), options, threadCache, globalCache);
  308. if (hasMySQLOpt)
  309. {
  310. ForEachItemIn(idx, opts)
  311. {
  312. const char *opt = opts.item(idx);
  313. if (strnicmp(opt, "MYSQL_", 6)==0)
  314. {
  315. const char *val = strchr(opt, '=');
  316. StringBuffer optName(opt);
  317. if (val)
  318. {
  319. optName.setLength(val-opt);
  320. val++;
  321. }
  322. MySQLOptionDefinition &optDef = lookupOption(optName);
  323. int rc;
  324. if (optDef.paramType == ParamTypeNone)
  325. {
  326. if (val)
  327. failx("Option %s does not take a value", optName.str());
  328. rc = mysql_options(*newConn, optDef.option, nullptr);
  329. }
  330. else
  331. {
  332. if (!val)
  333. failx("Option %s requires a value", optName.str());
  334. switch (optDef.paramType)
  335. {
  336. case ParamTypeString:
  337. rc = mysql_options(*newConn, optDef.option, val);
  338. break;
  339. case ParamTypeUInt:
  340. {
  341. unsigned int oval = strtoul(val, nullptr, 10);
  342. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  343. break;
  344. }
  345. case ParamTypeULong:
  346. {
  347. unsigned long oval = strtoul(val, nullptr, 10);
  348. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  349. break;
  350. }
  351. case ParamTypeBool:
  352. {
  353. my_bool oval = clipStrToBool(val);
  354. rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
  355. break;
  356. }
  357. }
  358. }
  359. if (rc)
  360. failx("Failed to set option %s (%s)", optName.str(), mysql_error(*newConn));
  361. }
  362. }
  363. }
  364. if (!mysql_real_connect(*newConn, server, user, password, database, port, NULL, 0))
  365. failx("Failed to connect (%s)", mysql_error(*newConn));
  366. return newConn.getClear();
  367. }
  368. static void clearThreadCache()
  369. {
  370. ::Release(threadCachedConnection);
  371. threadCachedConnection = NULL;
  372. }
  373. static void setThreadCache(MySQLConnection *connection)
  374. {
  375. clearThreadCache();
  376. threadCachedConnection = connection;
  377. }
  378. bool wasCached() const
  379. {
  380. return reusing;
  381. }
  382. MySQLConnection *reopen()
  383. {
  384. threadCached = false;
  385. globalCached = false;
  386. return findCachedConnection(cacheOptions, true);
  387. }
  388. static void retireCache(unsigned maxAge)
  389. {
  390. CriticalBlock b(globalCacheCrit);
  391. unsigned now = msTick();
  392. ForEachItemInRev(idx, globalCachedConnections)
  393. {
  394. MySQLConnection &cached = globalCachedConnections.item(idx);
  395. if (!maxAge || (now - cached.created > maxAge))
  396. {
  397. cached.globalCached = false; // Make sure we don't re-add it!
  398. globalCachedConnections.remove(idx);
  399. }
  400. }
  401. }
  402. private:
  403. MySQLConnection(const MySQLConnection &from)
  404. {
  405. conn = from.conn; // Taking over ownership
  406. cacheOptions = from.cacheOptions; // Taking over ownership
  407. threadCached = from.threadCached;
  408. globalCached = from.globalCached;
  409. reusing = true;
  410. created = msTick();
  411. }
  412. static CIArrayOf<MySQLConnection> globalCachedConnections;
  413. static CriticalSection globalCacheCrit;
  414. MYSQL *conn;
  415. const char *cacheOptions; // Not done as a StringAttr, in order to avoid reallocation when recaching after use (see copy constructor above)
  416. unsigned created;
  417. bool threadCached;
  418. bool globalCached;
  419. bool reusing = false;
  420. };
  421. CIArrayOf<MySQLConnection> MySQLConnection::globalCachedConnections;
  422. CriticalSection MySQLConnection::globalCacheCrit;
  423. Semaphore MySQLConnectionCloserThread::closing;
  424. int MySQLConnectionCloserThread::run()
  425. {
  426. for (;;)
  427. {
  428. if (closing.wait(mysqlCacheCheckPeriod))
  429. {
  430. break;
  431. }
  432. MySQLConnection::retireCache(mysqlCacheTimeoutPeriod);
  433. }
  434. return 0;
  435. }
  436. MODULE_INIT(INIT_PRIORITY_STANDARD)
  437. {
  438. return true;
  439. }
  440. MODULE_EXIT()
  441. {
  442. if (connectionCloserThread)
  443. {
  444. MySQLConnectionCloserThread::closing.signal();
  445. connectionCloserThread->join();
  446. connectionCloserThread->Release();
  447. }
  448. MySQLConnection::retireCache(0);
  449. }
  450. class MySQLResult : public CInterface
  451. {
  452. public:
  453. MySQLResult(MYSQL_RES *_res) : res(_res)
  454. {
  455. }
  456. ~MySQLResult()
  457. {
  458. if (res)
  459. mysql_free_result(res);
  460. }
  461. inline operator MYSQL_RES *() const
  462. {
  463. return res;
  464. }
  465. private:
  466. MySQLResult(const MySQLResult &);
  467. MYSQL_RES *res;
  468. };
  469. class MySQLStatement : public CInterface
  470. {
  471. public:
  472. MySQLStatement(MYSQL_STMT *_stmt) : stmt(_stmt)
  473. {
  474. }
  475. ~MySQLStatement()
  476. {
  477. if (stmt)
  478. mysql_stmt_close(stmt);
  479. }
  480. inline operator MYSQL_STMT *() const
  481. {
  482. return stmt;
  483. }
  484. private:
  485. MySQLStatement(const MySQLStatement &);
  486. MYSQL_STMT *stmt;
  487. };
  488. class MySQLBindingArray
  489. {
  490. public:
  491. MySQLBindingArray()
  492. {
  493. columns = 0;
  494. bindinfo = NULL;
  495. is_null = NULL;;
  496. error = NULL;
  497. lengths = NULL;
  498. }
  499. void init(unsigned count)
  500. {
  501. columns = count;
  502. if (columns)
  503. {
  504. bindinfo = new MYSQL_BIND [columns];
  505. is_null = new my_bool [columns];
  506. error = new my_bool [columns];
  507. lengths = new unsigned long [columns];
  508. memset(bindinfo, 0, columns * sizeof(bindinfo[0]));
  509. memset(is_null, 0, columns * sizeof(is_null[0]));
  510. memset(error, 0, columns * sizeof(error[0]));
  511. memset(lengths, 0, columns * sizeof(lengths[0]));
  512. for (int i = 0; i < columns; i++)
  513. {
  514. bindinfo[i].is_null = &is_null[i];
  515. bindinfo[i].length = &lengths[i];
  516. bindinfo[i].error = &error[i];
  517. }
  518. }
  519. }
  520. void bindResults(MYSQL_RES *res)
  521. {
  522. init(mysql_num_fields(res));
  523. for (int i = 0; i < columns; i++)
  524. {
  525. MYSQL_FIELD *col = mysql_fetch_field_direct(res, i);
  526. switch (col->type)
  527. {
  528. case MYSQL_TYPE_DECIMAL:
  529. case MYSQL_TYPE_NEWDECIMAL:
  530. bindinfo[i].buffer_type = MYSQL_TYPE_STRING;
  531. bindinfo[i].buffer_length = 100; // MORE - is there a better guess?
  532. break;
  533. case MYSQL_TYPE_TIMESTAMP:
  534. case MYSQL_TYPE_DATETIME:
  535. case MYSQL_TYPE_TIME:
  536. case MYSQL_TYPE_DATE:
  537. bindinfo[i].buffer_type = col->type;
  538. bindinfo[i].buffer_length = sizeof(MYSQL_TIME);
  539. break;
  540. default:
  541. bindinfo[i].buffer_type = col->type;
  542. bindinfo[i].buffer_length = col->length;
  543. break;
  544. }
  545. bindinfo[i].buffer = rtlMalloc(bindinfo[i].buffer_length);
  546. }
  547. }
  548. ~MySQLBindingArray()
  549. {
  550. for (int i = 0; i < columns; i++)
  551. {
  552. rtlFree(bindinfo[i].buffer);
  553. }
  554. delete [] bindinfo;
  555. delete [] is_null;
  556. delete [] error;
  557. delete [] lengths;
  558. }
  559. inline int numColumns() const
  560. {
  561. return columns;
  562. }
  563. inline MYSQL_BIND &queryColumn(int colIdx, const char *name) const
  564. {
  565. if (colIdx >= columns)
  566. {
  567. VStringBuffer error("No matching bound column for parameter %d", colIdx);
  568. if (name)
  569. error.appendf(" (%s)", name);
  570. fail(error);
  571. }
  572. return bindinfo[colIdx];
  573. }
  574. inline MYSQL_BIND *queryBindings() const
  575. {
  576. return bindinfo;
  577. }
  578. private:
  579. MYSQL_BIND *bindinfo;
  580. my_bool *is_null;
  581. my_bool *error;
  582. unsigned long *lengths;
  583. int columns;
  584. };
  585. class MySQLPreparedStatement : public CInterface
  586. {
  587. public:
  588. MySQLPreparedStatement(MySQLConnection *_conn, MySQLStatement *_stmt)
  589. : conn(_conn), stmt(_stmt)
  590. {
  591. // Create bindings for input parameters
  592. inputBindings.init(mysql_stmt_param_count(*stmt));
  593. // And for results
  594. res.setown(new MySQLResult(mysql_stmt_result_metadata(*stmt)));
  595. if (*res)
  596. {
  597. resultBindings.bindResults(*res);
  598. /* Bind the result buffers */
  599. if (mysql_stmt_bind_result(*stmt, resultBindings.queryBindings()))
  600. fail(mysql_stmt_error(*stmt));
  601. }
  602. else if (mysql_stmt_errno(*stmt)) // SQL actions don't return results...
  603. fail(mysql_stmt_error(*stmt));
  604. }
  605. ~MySQLPreparedStatement()
  606. {
  607. stop();
  608. }
  609. inline void stop()
  610. {
  611. res.clear();
  612. stmt.clear();
  613. }
  614. bool next()
  615. {
  616. if (!stmt)
  617. return false;
  618. int rc = mysql_stmt_fetch(*stmt);
  619. if (rc == MYSQL_NO_DATA)
  620. return false;
  621. else if (rc)
  622. fail(mysql_stmt_error(*stmt));
  623. else
  624. return true;
  625. }
  626. void execute()
  627. {
  628. assertex(stmt && *stmt);
  629. if (inputBindings.numColumns() && mysql_stmt_bind_param(*stmt, inputBindings.queryBindings()))
  630. fail(mysql_stmt_error(*stmt));
  631. if (mysql_stmt_execute(*stmt))
  632. fail(mysql_stmt_error(*stmt));
  633. }
  634. inline const MySQLBindingArray &queryResultBindings() const
  635. {
  636. return resultBindings;
  637. }
  638. inline const MySQLBindingArray &queryInputBindings() const
  639. {
  640. return inputBindings;
  641. }
  642. inline bool hasResult() const
  643. {
  644. return *res != NULL;
  645. }
  646. protected:
  647. Linked<MySQLConnection> conn;
  648. Linked<MySQLStatement> stmt;
  649. Owned<MySQLResult> res;
  650. MySQLBindingArray inputBindings;
  651. MySQLBindingArray resultBindings;
  652. };
  653. // Conversions from MySQL values to ECL data
  654. __declspec(noreturn) static void typeError(const char *expected, const RtlFieldInfo *field) __attribute__((noreturn));
  655. static void typeError(const char *expected, const RtlFieldInfo *field)
  656. {
  657. VStringBuffer msg("mysql: type mismatch - %s expected", expected);
  658. if (field)
  659. msg.appendf(" for field %s", field->name);
  660. rtlFail(0, msg.str());
  661. }
  662. static bool isInteger(enum_field_types type)
  663. {
  664. switch (type)
  665. {
  666. case MYSQL_TYPE_TINY:
  667. case MYSQL_TYPE_SHORT:
  668. case MYSQL_TYPE_LONG:
  669. case MYSQL_TYPE_LONGLONG:
  670. case MYSQL_TYPE_INT24:
  671. return true;
  672. default:
  673. return false;
  674. }
  675. }
  676. static bool isDateTime(enum_field_types type)
  677. {
  678. switch (type)
  679. {
  680. case MYSQL_TYPE_TIMESTAMP:
  681. case MYSQL_TYPE_DATETIME:
  682. case MYSQL_TYPE_DATE:
  683. case MYSQL_TYPE_TIME:
  684. return true;
  685. default:
  686. return false;
  687. }
  688. }
  689. static bool isString(enum_field_types type)
  690. {
  691. switch (type)
  692. {
  693. case MYSQL_TYPE_BIT: // Slightly dubious but MySQL seems to represent them as string fields in their gui...
  694. case MYSQL_TYPE_TINY_BLOB:
  695. case MYSQL_TYPE_MEDIUM_BLOB:
  696. case MYSQL_TYPE_LONG_BLOB:
  697. case MYSQL_TYPE_BLOB:
  698. case MYSQL_TYPE_STRING:
  699. case MYSQL_TYPE_VAR_STRING:
  700. return true;
  701. default:
  702. return false;
  703. }
  704. }
  705. static unsigned __int64 getDateTimeValue(const MYSQL_BIND &bound)
  706. {
  707. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  708. switch (bound.buffer_type)
  709. {
  710. case MYSQL_TYPE_TIMESTAMP:
  711. case MYSQL_TYPE_DATETIME:
  712. //What format should this be? Possibly a timestamp_t
  713. return (unsigned __int64)((time->year * 10000) + (time->month * 100) + (time->day)) * 1000000 +
  714. (time->hour * 10000) + (time->minute * 100) + (time->second);
  715. case MYSQL_TYPE_DATE:
  716. return (time->year * 10000) + (time->month * 100) + (time->day);
  717. case MYSQL_TYPE_TIME:
  718. return (time->hour * 10000) + (time->minute * 100) + (time->second);
  719. default:
  720. throwUnexpected();
  721. }
  722. }
  723. static void getDateTimeText(const MYSQL_BIND &bound, size32_t &chars, char * &result)
  724. {
  725. const MYSQL_TIME * time = (const MYSQL_TIME *) bound.buffer;
  726. char temp[20];
  727. switch (bound.buffer_type)
  728. {
  729. case MYSQL_TYPE_TIMESTAMP:
  730. case MYSQL_TYPE_DATETIME:
  731. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u %02u:%02u:%02u", time->year, time->month, time->day, time->hour, time->minute, time->second);
  732. break;
  733. case MYSQL_TYPE_DATE:
  734. _snprintf(temp, sizeof(temp), "%4u-%02u-%02u", time->year, time->month, time->day);
  735. break;
  736. case MYSQL_TYPE_TIME:
  737. _snprintf(temp, sizeof(temp), "%02u:%02u:%02u", time->hour, time->minute, time->second);
  738. break;
  739. default:
  740. throwUnexpected();
  741. }
  742. rtlStrToStrX(chars, result, strlen(temp), temp);
  743. }
  744. static bool getBooleanResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  745. {
  746. if (*bound.is_null)
  747. {
  748. NullFieldProcessor p(field);
  749. return p.boolResult;
  750. }
  751. if (!isInteger(bound.buffer_type))
  752. typeError("boolean", field);
  753. return rtlReadUInt(bound.buffer, *bound.length) != 0;
  754. }
  755. static void getDataResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, void * &result)
  756. {
  757. if (*bound.is_null)
  758. {
  759. NullFieldProcessor p(field);
  760. rtlStrToDataX(chars, result, p.resultChars, p.stringResult);
  761. return;
  762. }
  763. if (isString(bound.buffer_type))
  764. rtlStrToDataX(chars, result, *bound.length, bound.buffer); // This feels like it may not work to me - will preallocate rather larger than we want
  765. else
  766. typeError("blob", field);
  767. }
  768. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  769. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound);
  770. static double getRealResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  771. {
  772. if (*bound.is_null)
  773. {
  774. NullFieldProcessor p(field);
  775. return p.doubleResult;
  776. }
  777. if (bound.buffer_type == MYSQL_TYPE_BIT)
  778. return (double) getUnsignedResult(field, bound);
  779. if (isInteger(bound.buffer_type))
  780. {
  781. if (bound.is_unsigned)
  782. return (double) getUnsignedResult(field, bound);
  783. else
  784. return (double) getSignedResult(field, bound);
  785. }
  786. else if (bound.buffer_type == MYSQL_TYPE_FLOAT)
  787. return * (float *) bound.buffer;
  788. else if (bound.buffer_type == MYSQL_TYPE_DOUBLE)
  789. return * (double *) bound.buffer;
  790. else
  791. typeError("double", field);
  792. }
  793. static __int64 getSignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  794. {
  795. if (*bound.is_null)
  796. {
  797. NullFieldProcessor p(field);
  798. return p.intResult;
  799. }
  800. if (isDateTime(bound.buffer_type))
  801. return getDateTimeValue(bound);
  802. if (bound.buffer_type == MYSQL_TYPE_BIT)
  803. {
  804. // These are stored as big-endian values it seems...
  805. return (__int64) rtlReadSwapUInt(bound.buffer, *bound.length);
  806. }
  807. if (isInteger(bound.buffer_type))
  808. {
  809. if (bound.is_unsigned)
  810. return (__int64) rtlReadUInt(bound.buffer, *bound.length);
  811. else
  812. return rtlReadInt(bound.buffer, *bound.length);
  813. }
  814. else
  815. typeError("integer", field);
  816. }
  817. static unsigned __int64 getUnsignedResult(const RtlFieldInfo *field, const MYSQL_BIND &bound)
  818. {
  819. if (*bound.is_null)
  820. {
  821. NullFieldProcessor p(field);
  822. return p.uintResult;
  823. }
  824. if (isDateTime(bound.buffer_type))
  825. return getDateTimeValue(bound);
  826. if (bound.buffer_type == MYSQL_TYPE_BIT)
  827. return rtlReadSwapUInt(bound.buffer, *bound.length);
  828. if (!isInteger(bound.buffer_type))
  829. typeError("integer", field);
  830. if (bound.is_unsigned)
  831. return rtlReadUInt(bound.buffer, *bound.length);
  832. else
  833. return (unsigned __int64) rtlReadInt(bound.buffer, *bound.length);
  834. }
  835. static void getStringResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  836. {
  837. if (*bound.is_null)
  838. {
  839. NullFieldProcessor p(field);
  840. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  841. return;
  842. }
  843. if (isDateTime(bound.buffer_type))
  844. {
  845. getDateTimeText(bound, chars, result);
  846. return;
  847. }
  848. if (!isString(bound.buffer_type))
  849. typeError("string", field);
  850. const char *text = (const char *) bound.buffer;
  851. unsigned long bytes = *bound.length;
  852. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  853. rtlUtf8ToStrX(chars, result, numchars, text);
  854. }
  855. static void getUTF8Result(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, char * &result)
  856. {
  857. if (*bound.is_null)
  858. {
  859. NullFieldProcessor p(field);
  860. rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
  861. return;
  862. }
  863. if (isDateTime(bound.buffer_type))
  864. {
  865. getDateTimeText(bound, chars, result);
  866. return;
  867. }
  868. if (!isString(bound.buffer_type))
  869. typeError("string", field);
  870. const char *text = (const char *) bound.buffer;
  871. unsigned long bytes = *bound.length;
  872. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  873. rtlUtf8ToUtf8X(chars, result, numchars, text);
  874. }
  875. static void getUnicodeResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, size32_t &chars, UChar * &result)
  876. {
  877. if (*bound.is_null)
  878. {
  879. NullFieldProcessor p(field);
  880. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  881. return;
  882. }
  883. if (bound.buffer_type != MYSQL_TYPE_STRING && bound.buffer_type != MYSQL_TYPE_VAR_STRING)
  884. typeError("string", field);
  885. const char *text = (const char *) bound.buffer;
  886. unsigned long bytes = *bound.length;
  887. unsigned numchars = rtlUtf8Length(bytes, text); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  888. rtlUtf8ToUnicodeX(chars, result, numchars, text);
  889. }
  890. static void getDecimalResult(const RtlFieldInfo *field, const MYSQL_BIND &bound, Decimal &value)
  891. {
  892. if (*bound.is_null)
  893. {
  894. NullFieldProcessor p(field);
  895. value.set(p.decimalResult);
  896. return;
  897. }
  898. size32_t chars;
  899. rtlDataAttr result;
  900. mysqlembed::getStringResult(field, bound, chars, result.refstr());
  901. value.setString(chars, result.getstr());
  902. if (field)
  903. {
  904. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  905. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  906. }
  907. }
  908. static void createBindBuffer(MYSQL_BIND & bindInfo, enum_field_types sqlType, unsigned size)
  909. {
  910. if (size)
  911. {
  912. if (!bindInfo.buffer)
  913. {
  914. bindInfo.buffer_type = sqlType;
  915. bindInfo.buffer = rtlMalloc(size);
  916. }
  917. else
  918. assertex(bindInfo.buffer_type == sqlType);
  919. }
  920. else
  921. {
  922. // Buffer is reallocated each time - caller is responsible for it.
  923. bindInfo.buffer_type = sqlType;
  924. rtlFree(bindInfo.buffer);
  925. bindInfo.buffer = NULL;
  926. }
  927. }
  928. // A MySQLRowBuilder object is used to construct an ECL row from a MySQL row
  929. class MySQLRowBuilder : public CInterfaceOf<IFieldSource>
  930. {
  931. public:
  932. MySQLRowBuilder(const MySQLBindingArray &_resultInfo)
  933. : resultInfo(_resultInfo), colIdx(-1)
  934. {
  935. }
  936. virtual bool getBooleanResult(const RtlFieldInfo *field)
  937. {
  938. return mysqlembed::getBooleanResult(field, nextField(field));
  939. }
  940. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  941. {
  942. mysqlembed::getDataResult(field, nextField(field), len, result);
  943. }
  944. virtual double getRealResult(const RtlFieldInfo *field)
  945. {
  946. return mysqlembed::getRealResult(field, nextField(field));
  947. }
  948. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  949. {
  950. return mysqlembed::getSignedResult(field, nextField(field));
  951. }
  952. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  953. {
  954. return mysqlembed::getUnsignedResult(field, nextField(field));
  955. }
  956. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  957. {
  958. mysqlembed::getStringResult(field, nextField(field), chars, result);
  959. }
  960. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  961. {
  962. mysqlembed::getUTF8Result(field, nextField(field), chars, result);
  963. }
  964. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  965. {
  966. mysqlembed::getUnicodeResult(field, nextField(field), chars, result);
  967. }
  968. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  969. {
  970. mysqlembed::getDecimalResult(field, nextField(field), value);
  971. }
  972. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  973. {
  974. UNSUPPORTED("SET fields");
  975. }
  976. virtual bool processNextSet(const RtlFieldInfo * field)
  977. {
  978. throwUnexpected();
  979. }
  980. virtual void processBeginDataset(const RtlFieldInfo * field)
  981. {
  982. UNSUPPORTED("Nested datasets");
  983. }
  984. virtual void processBeginRow(const RtlFieldInfo * field)
  985. {
  986. }
  987. virtual bool processNextRow(const RtlFieldInfo * field)
  988. {
  989. throwUnexpected();
  990. }
  991. virtual void processEndSet(const RtlFieldInfo * field)
  992. {
  993. throwUnexpected();
  994. }
  995. virtual void processEndDataset(const RtlFieldInfo * field)
  996. {
  997. throwUnexpected();
  998. }
  999. virtual void processEndRow(const RtlFieldInfo * field)
  1000. {
  1001. }
  1002. protected:
  1003. const MYSQL_BIND &nextField(const RtlFieldInfo * field)
  1004. {
  1005. if (colIdx < resultInfo.numColumns())
  1006. colIdx++;
  1007. else
  1008. fail("Too many fields in ECL output row");
  1009. const MYSQL_BIND &column = resultInfo.queryColumn(colIdx,field->name);
  1010. if (*column.error)
  1011. failx("Error fetching column %s", field->name);
  1012. return column;
  1013. }
  1014. const MySQLBindingArray &resultInfo;
  1015. int colIdx;
  1016. };
  1017. // Bind MySQL variables from an ECL record
  1018. class MySQLRecordBinder : public CInterfaceOf<IFieldProcessor>
  1019. {
  1020. public:
  1021. MySQLRecordBinder(const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  1022. : typeInfo(_typeInfo), bindings(_bindings), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam)
  1023. {
  1024. }
  1025. int numFields()
  1026. {
  1027. int count = 0;
  1028. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  1029. assertex(fields);
  1030. while (*fields++)
  1031. count++;
  1032. return count;
  1033. }
  1034. void processRow(const byte *row)
  1035. {
  1036. thisParam = firstParam;
  1037. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  1038. }
  1039. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
  1040. {
  1041. size32_t utf8chars;
  1042. char *utf8;
  1043. rtlStrToUtf8X(utf8chars, utf8, len, value);
  1044. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1045. bindInfo.buffer = utf8;
  1046. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1047. bindInfo.length = &bindInfo.buffer_length;
  1048. }
  1049. virtual void processBool(bool value, const RtlFieldInfo * field)
  1050. {
  1051. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_TINY, sizeof(value));
  1052. * (bool *) bindInfo.buffer = value;
  1053. bindInfo.is_unsigned = true;
  1054. }
  1055. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
  1056. {
  1057. size32_t bytes;
  1058. void *data;
  1059. rtlStrToDataX(bytes, data, len, value);
  1060. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_BLOB, 0);
  1061. bindInfo.buffer = data;
  1062. bindInfo.buffer_length = bytes;
  1063. bindInfo.length = &bindInfo.buffer_length;
  1064. }
  1065. virtual void processInt(__int64 value, const RtlFieldInfo * field)
  1066. {
  1067. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  1068. * (__int64 *) bindInfo.buffer = value;
  1069. bindInfo.is_unsigned = false;
  1070. }
  1071. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  1072. {
  1073. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_LONGLONG, sizeof(value));
  1074. * (unsigned __int64 *) bindInfo.buffer = value;
  1075. bindInfo.is_unsigned = true;
  1076. }
  1077. virtual void processReal(double value, const RtlFieldInfo * field)
  1078. {
  1079. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_DOUBLE, sizeof(value));
  1080. * (double *) bindInfo.buffer = value;
  1081. }
  1082. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1083. {
  1084. Decimal val;
  1085. size32_t bytes;
  1086. char *data;
  1087. val.setDecimal(digits, precision, value);
  1088. val.getStringX(bytes, data);
  1089. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1090. bindInfo.buffer = data;
  1091. bindInfo.buffer_length = bytes;
  1092. bindInfo.length = &bindInfo.buffer_length;
  1093. }
  1094. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  1095. {
  1096. Decimal val;
  1097. size32_t bytes;
  1098. char *data;
  1099. val.setUDecimal(digits, precision, value);
  1100. val.getStringX(bytes, data);
  1101. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1102. bindInfo.buffer = data;
  1103. bindInfo.buffer_length = bytes;
  1104. bindInfo.length = &bindInfo.buffer_length;
  1105. }
  1106. virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
  1107. {
  1108. size32_t utf8chars;
  1109. char *utf8;
  1110. rtlUnicodeToUtf8X(utf8chars, utf8, len, value);
  1111. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1112. bindInfo.buffer = utf8;
  1113. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1114. bindInfo.length = &bindInfo.buffer_length;
  1115. }
  1116. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  1117. {
  1118. size32_t charCount;
  1119. rtlDataAttr text;
  1120. rtlQStrToStrX(charCount, text.refstr(), len, value);
  1121. processString(charCount, text.getstr(), field);
  1122. }
  1123. virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
  1124. {
  1125. size32_t utf8chars;
  1126. char *utf8;
  1127. rtlUtf8ToUtf8X(utf8chars, utf8, len, value);
  1128. MYSQL_BIND &bindInfo = createBindBuffer(MYSQL_TYPE_STRING, 0);
  1129. bindInfo.buffer = utf8;
  1130. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1131. bindInfo.length = &bindInfo.buffer_length;
  1132. }
  1133. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  1134. {
  1135. UNSUPPORTED("SET fields");
  1136. return false;
  1137. }
  1138. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  1139. {
  1140. UNSUPPORTED("Nested datasets");
  1141. return false;
  1142. }
  1143. virtual bool processBeginRow(const RtlFieldInfo * field)
  1144. {
  1145. return true;
  1146. }
  1147. virtual void processEndSet(const RtlFieldInfo * field)
  1148. {
  1149. throwUnexpected();
  1150. }
  1151. virtual void processEndDataset(const RtlFieldInfo * field)
  1152. {
  1153. throwUnexpected();
  1154. }
  1155. virtual void processEndRow(const RtlFieldInfo * field)
  1156. {
  1157. }
  1158. protected:
  1159. MYSQL_BIND &createBindBuffer(enum_field_types sqlType, unsigned size)
  1160. {
  1161. MYSQL_BIND &bindInfo = bindings.queryColumn(thisParam++, NULL);
  1162. mysqlembed::createBindBuffer(bindInfo, sqlType, size);
  1163. return bindInfo;
  1164. }
  1165. const RtlTypeInfo *typeInfo;
  1166. const MySQLBindingArray &bindings;
  1167. int firstParam;
  1168. RtlFieldStrInfo dummyField;
  1169. int thisParam;
  1170. };
  1171. //
  1172. class MySQLDatasetBinder : public MySQLRecordBinder
  1173. {
  1174. public:
  1175. MySQLDatasetBinder(IRowStream * _input, const RtlTypeInfo *_typeInfo, const MySQLBindingArray &_bindings, int _firstParam)
  1176. : input(_input), MySQLRecordBinder(_typeInfo, _bindings, _firstParam)
  1177. {
  1178. }
  1179. bool bindNext()
  1180. {
  1181. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  1182. if (!nextRow)
  1183. return false;
  1184. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  1185. return true;
  1186. }
  1187. void executeAll(MySQLPreparedStatement *stmtInfo)
  1188. {
  1189. while (bindNext())
  1190. {
  1191. stmtInfo->execute();
  1192. }
  1193. }
  1194. protected:
  1195. Owned<IRowStream> input;
  1196. };
  1197. // A MySQL function that returns a dataset will return a MySQLRowStream object that can be
  1198. // interrogated to return each row of the result in turn
  1199. class MySQLRowStream : public CInterfaceOf<IRowStream>
  1200. {
  1201. public:
  1202. MySQLRowStream(MySQLDatasetBinder *_inputStream, MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
  1203. : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
  1204. {
  1205. executePending = true;
  1206. eof = false;
  1207. }
  1208. virtual const void *nextRow()
  1209. {
  1210. // A little complex when streaming data in as well as out - want to execute for every input record
  1211. if (eof)
  1212. return NULL;
  1213. for (;;)
  1214. {
  1215. if (executePending)
  1216. {
  1217. executePending = false;
  1218. if (inputStream && !inputStream->bindNext())
  1219. {
  1220. noteEOF();
  1221. return NULL;
  1222. }
  1223. stmtInfo->execute();
  1224. }
  1225. if (stmtInfo->next())
  1226. break;
  1227. if (inputStream)
  1228. executePending = true;
  1229. else
  1230. {
  1231. noteEOF();
  1232. return NULL;
  1233. }
  1234. }
  1235. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  1236. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  1237. const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
  1238. assertex(typeInfo);
  1239. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1240. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  1241. return rowBuilder.finalizeRowClear(len);
  1242. }
  1243. virtual void stop()
  1244. {
  1245. resultAllocator.clear();
  1246. stmtInfo->stop();
  1247. }
  1248. protected:
  1249. void noteEOF()
  1250. {
  1251. if (!eof)
  1252. {
  1253. eof = true;
  1254. stop();
  1255. }
  1256. }
  1257. Linked<MySQLDatasetBinder> inputStream;
  1258. Linked<MySQLPreparedStatement> stmtInfo;
  1259. Linked<IEngineRowAllocator> resultAllocator;
  1260. bool executePending;
  1261. bool eof;
  1262. };
  1263. // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
  1264. static __thread ThreadTermFunc threadHookChain;
  1265. static bool mysqlInitialized = false;
  1266. static __thread bool mysqlThreadInitialized = false;
  1267. static CriticalSection initCrit;
  1268. static void terminateMySqlThread()
  1269. {
  1270. MySQLConnection::clearThreadCache();
  1271. mysql_thread_end();
  1272. mysqlThreadInitialized = false; // In case it was a threadpool thread...
  1273. if (threadHookChain)
  1274. {
  1275. (*threadHookChain)();
  1276. threadHookChain = NULL;
  1277. }
  1278. }
  1279. static void initializeMySqlThread()
  1280. {
  1281. if (!mysqlThreadInitialized)
  1282. {
  1283. {
  1284. CriticalBlock b(initCrit);
  1285. if (!mysqlInitialized)
  1286. {
  1287. mysqlInitialized = true;
  1288. mysql_library_init(0, NULL, NULL);
  1289. }
  1290. }
  1291. mysql_thread_init();
  1292. threadHookChain = addThreadTermFunc(terminateMySqlThread);
  1293. mysqlThreadInitialized = true;
  1294. }
  1295. }
  1296. class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  1297. {
  1298. public:
  1299. MySQLEmbedFunctionContext(const IThorActivityContext *_ctx, const char *options)
  1300. : nextParam(0), activityCtx(_ctx)
  1301. {
  1302. initializeMySqlThread();
  1303. conn.setown(MySQLConnection::findCachedConnection(options, false));
  1304. }
  1305. virtual bool getBooleanResult()
  1306. {
  1307. bool ret = mysqlembed::getBooleanResult(NULL, getScalarResult());
  1308. checkSingleRow();
  1309. return ret;
  1310. }
  1311. virtual void getDataResult(size32_t &len, void * &result)
  1312. {
  1313. mysqlembed::getDataResult(NULL, getScalarResult(), len, result);
  1314. checkSingleRow();
  1315. }
  1316. virtual double getRealResult()
  1317. {
  1318. double ret = mysqlembed::getRealResult(NULL, getScalarResult());
  1319. checkSingleRow();
  1320. return ret;
  1321. }
  1322. virtual __int64 getSignedResult()
  1323. {
  1324. __int64 ret = mysqlembed::getSignedResult(NULL, getScalarResult());
  1325. checkSingleRow();
  1326. return ret;
  1327. }
  1328. virtual unsigned __int64 getUnsignedResult()
  1329. {
  1330. unsigned __int64 ret = mysqlembed::getUnsignedResult(NULL, getScalarResult());
  1331. checkSingleRow();
  1332. return ret;
  1333. }
  1334. virtual void getStringResult(size32_t &chars, char * &result)
  1335. {
  1336. mysqlembed::getStringResult(NULL, getScalarResult(), chars, result);
  1337. checkSingleRow();
  1338. }
  1339. virtual void getUTF8Result(size32_t &chars, char * &result)
  1340. {
  1341. mysqlembed::getUTF8Result(NULL, getScalarResult(), chars, result);
  1342. checkSingleRow();
  1343. }
  1344. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1345. {
  1346. mysqlembed::getUnicodeResult(NULL, getScalarResult(), chars, result);
  1347. checkSingleRow();
  1348. }
  1349. virtual void getDecimalResult(Decimal &value)
  1350. {
  1351. mysqlembed::getDecimalResult(NULL, getScalarResult(), value);
  1352. checkSingleRow();
  1353. }
  1354. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  1355. {
  1356. UNSUPPORTED("SET results");
  1357. }
  1358. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1359. {
  1360. return new MySQLRowStream(inputStream, stmtInfo, _resultAllocator);
  1361. }
  1362. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1363. {
  1364. if (!stmtInfo->hasResult())
  1365. typeError("row", NULL);
  1366. lazyExecute();
  1367. MySQLRowStream stream(NULL, stmtInfo, _resultAllocator);
  1368. roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
  1369. roxiemem::OwnedConstRoxieRow ret2 = stream.nextRow();
  1370. stream.stop();
  1371. if (ret == NULL || ret2 != NULL) // Check for exactly one returned row
  1372. typeError("row", NULL);
  1373. return (byte *) ret.getClear();
  1374. }
  1375. virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
  1376. {
  1377. lazyExecute();
  1378. if (!stmtInfo->next())
  1379. typeError("row", NULL);
  1380. MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
  1381. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  1382. assertex(typeInfo);
  1383. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1384. size32_t ret = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
  1385. if (stmtInfo->next())
  1386. typeError("row", NULL); // Check that a single row was returned
  1387. return ret;
  1388. }
  1389. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, const byte *val) override
  1390. {
  1391. MySQLRecordBinder binder(metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam);
  1392. binder.processRow(val);
  1393. nextParam += binder.numFields();
  1394. }
  1395. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1396. {
  1397. // We only support a single dataset parameter...
  1398. if (inputStream)
  1399. {
  1400. fail("At most one dataset parameter supported");
  1401. }
  1402. inputStream.setown(new MySQLDatasetBinder(LINK(val), metaVal.queryTypeInfo(), stmtInfo->queryInputBindings(), nextParam));
  1403. nextParam += inputStream->numFields();
  1404. }
  1405. virtual void bindBooleanParam(const char *name, bool val)
  1406. {
  1407. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_TINY, sizeof(val));
  1408. * (bool *) bindInfo.buffer = val;
  1409. bindInfo.is_unsigned = true;
  1410. }
  1411. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1412. {
  1413. size32_t bytes;
  1414. void *data;
  1415. rtlStrToDataX(bytes, data, len, val);
  1416. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_BLOB, 0);
  1417. bindInfo.buffer = data;
  1418. bindInfo.buffer_length = bytes;
  1419. bindInfo.length = &bindInfo.buffer_length;
  1420. }
  1421. virtual void bindFloatParam(const char *name, float val)
  1422. {
  1423. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_FLOAT, sizeof(val));
  1424. * (float *) bindInfo.buffer = val;
  1425. }
  1426. virtual void bindRealParam(const char *name, double val)
  1427. {
  1428. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_DOUBLE, sizeof(val));
  1429. * (double *) bindInfo.buffer = val;
  1430. }
  1431. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1432. {
  1433. bindSignedParam(name, val);
  1434. }
  1435. virtual void bindSignedParam(const char *name, __int64 val)
  1436. {
  1437. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1438. * (__int64 *) bindInfo.buffer = val;
  1439. bindInfo.is_unsigned = false;
  1440. }
  1441. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1442. {
  1443. bindUnsignedParam(name, val);
  1444. }
  1445. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1446. {
  1447. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_LONGLONG, sizeof(val));
  1448. * (unsigned __int64 *) bindInfo.buffer = val;
  1449. bindInfo.is_unsigned = true;
  1450. }
  1451. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1452. {
  1453. size32_t utf8chars;
  1454. char *utf8;
  1455. rtlStrToUtf8X(utf8chars, utf8, len, val);
  1456. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1457. bindInfo.buffer = utf8;
  1458. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1459. bindInfo.length = &bindInfo.buffer_length;
  1460. }
  1461. virtual void bindVStringParam(const char *name, const char *val)
  1462. {
  1463. bindStringParam(name, strlen(val), val);
  1464. }
  1465. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1466. {
  1467. size32_t utf8chars;
  1468. char *utf8;
  1469. rtlUtf8ToUtf8X(utf8chars, utf8, chars, val);
  1470. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1471. bindInfo.buffer = utf8;
  1472. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1473. bindInfo.length = &bindInfo.buffer_length;
  1474. }
  1475. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1476. {
  1477. size32_t utf8chars;
  1478. char *utf8;
  1479. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  1480. MYSQL_BIND &bindInfo = findParameter(name, MYSQL_TYPE_STRING, 0);
  1481. bindInfo.buffer = utf8;
  1482. bindInfo.buffer_length = rtlUtf8Size(utf8chars, utf8);
  1483. bindInfo.length = &bindInfo.buffer_length;
  1484. }
  1485. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  1486. {
  1487. UNSUPPORTED("SET parameters"); // MySQL does support sets, so MIGHT be possible...
  1488. }
  1489. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  1490. {
  1491. return NULL;
  1492. }
  1493. virtual void paramWriterCommit(IInterface *writer)
  1494. {
  1495. }
  1496. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  1497. {
  1498. }
  1499. virtual void importFunction(size32_t lenChars, const char *text)
  1500. {
  1501. throwUnexpected();
  1502. }
  1503. virtual void compileEmbeddedScript(size32_t chars, const char *script)
  1504. {
  1505. StringBuffer scriptStr;
  1506. size32_t len;
  1507. if (activityCtx)
  1508. {
  1509. rtlSubstituteActivityContext(scriptStr, activityCtx, chars, script);
  1510. script = scriptStr.str();
  1511. len = scriptStr.length();
  1512. }
  1513. else
  1514. len = rtlUtf8Size(chars, script);
  1515. for (;;)
  1516. {
  1517. Owned<MySQLStatement> stmt = new MySQLStatement(mysql_stmt_init(*conn));
  1518. if (!*stmt)
  1519. fail("failed to create statement");
  1520. if (mysql_stmt_prepare(*stmt, script, len))
  1521. {
  1522. // If we get an error, it could be that the cached connection is stale - retry
  1523. if (conn->wasCached())
  1524. {
  1525. conn.setown(conn->reopen());
  1526. continue;
  1527. }
  1528. fail(mysql_stmt_error(*stmt));
  1529. }
  1530. stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
  1531. break;
  1532. }
  1533. }
  1534. virtual void callFunction()
  1535. {
  1536. if (nextParam != stmtInfo->queryInputBindings().numColumns())
  1537. failx("Not enough parameters supplied (%d parameters supplied, but statement has %d bound columns)", nextParam, stmtInfo->queryInputBindings().numColumns());
  1538. // We actually do the execute later, when the result is fetched
  1539. // Unless, there is no expected result, in that case execute query now
  1540. if (stmtInfo->queryResultBindings().numColumns() == 0)
  1541. lazyExecute();
  1542. }
  1543. protected:
  1544. void lazyExecute()
  1545. {
  1546. if (inputStream)
  1547. inputStream->executeAll(stmtInfo);
  1548. else
  1549. stmtInfo->execute();
  1550. }
  1551. const MYSQL_BIND &getScalarResult()
  1552. {
  1553. if (!stmtInfo->hasResult() || stmtInfo->queryResultBindings().numColumns() != 1)
  1554. typeError("scalar", NULL);
  1555. lazyExecute();
  1556. if (!stmtInfo->next())
  1557. typeError("scalar", NULL);
  1558. return stmtInfo->queryResultBindings().queryColumn(0, NULL);
  1559. }
  1560. void checkSingleRow()
  1561. {
  1562. if (stmtInfo->next())
  1563. typeError("scalar", NULL);
  1564. }
  1565. inline MYSQL_BIND &findParameter(const char *name, enum_field_types sqlType, unsigned size)
  1566. {
  1567. // Everything is positional in MySQL
  1568. MYSQL_BIND &bindInfo = stmtInfo->queryInputBindings().queryColumn(nextParam++, name);
  1569. createBindBuffer(bindInfo, sqlType, size);
  1570. return bindInfo;
  1571. }
  1572. Owned<MySQLConnection> conn;
  1573. Owned<MySQLPreparedStatement> stmtInfo;
  1574. Owned<MySQLDatasetBinder> inputStream;
  1575. const IThorActivityContext *activityCtx;
  1576. int nextParam;
  1577. };
  1578. class MySQLEmbedContext : public CInterfaceOf<IEmbedContext>
  1579. {
  1580. public:
  1581. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
  1582. {
  1583. return createFunctionContextEx(nullptr, nullptr, flags, options);
  1584. }
  1585. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
  1586. {
  1587. if (flags & EFimport)
  1588. UNSUPPORTED("IMPORT");
  1589. else
  1590. return new MySQLEmbedFunctionContext(activityCtx, options);
  1591. }
  1592. virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
  1593. {
  1594. throwUnexpected();
  1595. }
  1596. };
  1597. extern DECL_EXPORT IEmbedContext* getEmbedContext()
  1598. {
  1599. return new MySQLEmbedContext();
  1600. }
  1601. extern DECL_EXPORT bool syntaxCheck(const char *script)
  1602. {
  1603. return true; // MORE
  1604. }
  1605. } // namespace