couchbaseembed.cpp 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "couchbaseembed.hpp"
  14. #include "platform.h"
  15. #include "jexcept.hpp"
  16. #include "jlog.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #ifdef _WIN32
  23. #define EXPORT __declspec(dllexport)
  24. #else
  25. #define EXPORT
  26. #endif
  27. static const char *g_moduleName = "couchbase";
  28. static const char *g_moduleDescription = "Couchbase Embed Helper";
  29. static const char *g_version = "Couchbase Embed Helper 1.0.0";
  30. static const char *g_compatibleVersions[] = { g_version, nullptr };
  31. static const NullFieldProcessor NULLFIELD(NULL);
  32. COUCHBASEEMBED_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  33. {
  34. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  35. {
  36. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  37. pbx->compatibleVersions = g_compatibleVersions;
  38. }
  39. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  40. return false;
  41. pb->magicVersion = PLUGIN_VERSION;
  42. pb->version = g_version;
  43. pb->moduleName = g_moduleName;
  44. pb->ECL = NULL;
  45. pb->flags = PLUGIN_IMPLICIT_MODULE;
  46. pb->description = g_moduleDescription;
  47. return true;
  48. }
  49. namespace couchbaseembed
  50. {
  51. //--------------------------------------------------------------------------
  52. // Plugin Classes
  53. //--------------------------------------------------------------------------
  54. void reportIfQueryFailure(Couchbase::Query * query)
  55. {
  56. auto status = query->meta().status();
  57. if (status.errcode())
  58. {
  59. if (status.isNetworkError())
  60. failx("NetworkErr: %s", status.description());
  61. else if (status.isDataError())
  62. failx("DataErr: %s", status.description());
  63. else if (status.isInputError())
  64. failx("InputErr: %s", status.description());
  65. else if (status.isTemporary())
  66. failx("TempErr: %s", status.description());
  67. else
  68. failx("Couchbase err: %s", status.description());
  69. }
  70. //consider parsing json result
  71. if (strstr(query->meta().body().to_string().c_str(), "\"status\": \"errors\""))
  72. failx("Err: %s", query->meta().body().to_string().c_str());
  73. }
  74. CouchbaseRowStream::CouchbaseRowStream(IEngineRowAllocator* resultAllocator, Couchbase::Query * cbaseQuery)
  75. : m_CouchBaseQuery(cbaseQuery),
  76. m_resultAllocator(resultAllocator)
  77. {
  78. m_currentRow = 0;
  79. m_shouldRead = true;
  80. //iterating over result rows and copying them to stringarray
  81. //is there a way to independently step through original result rows?
  82. for (auto cbrow : *m_CouchBaseQuery)
  83. m_Rows.append(cbrow.json().to_string().c_str());
  84. reportIfQueryFailure(m_CouchBaseQuery);
  85. }
  86. CouchbaseRowStream::~CouchbaseRowStream() {}
  87. const void * CouchbaseRowStream::nextRow()
  88. {
  89. const void * result = NULL;
  90. if (m_shouldRead && m_currentRow < m_Rows.length())
  91. {
  92. auto json = m_Rows.item(m_currentRow++);
  93. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json,ipt_caseInsensitive);
  94. if (contentTree)
  95. {
  96. CouchbaseRowBuilder * cbRowBuilder = new CouchbaseRowBuilder(contentTree);
  97. RtlDynamicRowBuilder rowBuilder(m_resultAllocator);
  98. const RtlTypeInfo *typeInfo = m_resultAllocator->queryOutputMeta()->queryTypeInfo();
  99. assertex(typeInfo);
  100. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  101. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, *cbRowBuilder);
  102. return rowBuilder.finalizeRowClear(len);
  103. }
  104. else
  105. failx("Error processing result row");
  106. }
  107. return result;
  108. }
  109. void CouchbaseRowStream::stop()
  110. {
  111. m_resultAllocator.clear();
  112. m_shouldRead = false;
  113. }
  114. Couchbase::Query * CouchbaseConnection::query(Couchbase::QueryCommand * qcommand)
  115. {
  116. Couchbase::Status queryStatus;
  117. Couchbase::Query * pQuery = new Couchbase::Query(*m_pCouchbaseClient, *qcommand, queryStatus);
  118. if (!queryStatus)
  119. failx("Couldn't issue query: %s", queryStatus.description());
  120. if (!pQuery->status())
  121. failx("Couldn't execute query, reason: %s\nBody is: ", pQuery->meta().body().data());
  122. if (pQuery->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
  123. failx("Query execution error: %s", m_pQuery->meta().body().data());
  124. return pQuery;
  125. }
  126. extern void UNSUPPORTED(const char *feature)
  127. {
  128. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in %s", feature, g_version);
  129. }
  130. extern void failx(const char *message, ...)
  131. {
  132. va_list args;
  133. va_start(args,message);
  134. StringBuffer msg;
  135. msg.appendf("%s: ", g_moduleName).valist_appendf(message,args);
  136. va_end(args);
  137. rtlFail(0, msg.str());
  138. }
  139. extern void fail(const char *message)
  140. {
  141. StringBuffer msg;
  142. msg.appendf("%s: ", g_moduleName).append(message);
  143. rtlFail(0, msg.str());
  144. }
  145. void bindStringParam(unsigned len, const char *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  146. {
  147. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  148. if (pQcmd)
  149. {
  150. size32_t utf8chars;
  151. char *utf8;
  152. rtlStrToUtf8X(utf8chars, utf8, len, value);
  153. auto status = pQcmd->named_param(cbPlaceholder.str(), utf8);
  154. if (!status.success())
  155. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  156. }
  157. else
  158. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  159. }
  160. void bindBoolParam(bool value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  161. {
  162. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  163. if (pQcmd)
  164. {
  165. StringBuffer serialized;
  166. TokenSerializer tokenSerializer;
  167. tokenSerializer.serialize(value, serialized);
  168. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  169. if (!status.success())
  170. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  171. }
  172. else
  173. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  174. }
  175. void bindDataParam(unsigned len, const void *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  176. {
  177. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  178. if (pQcmd)
  179. {
  180. size32_t bytes;
  181. void *data;
  182. rtlStrToDataX(bytes, data, len, value);
  183. auto status = pQcmd->named_param(cbPlaceholder.str(), (char *)data);
  184. if (!status.success())
  185. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), (char *)data);
  186. }
  187. else
  188. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  189. }
  190. void bindIntParam(__int64 value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  191. {
  192. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  193. if (pQcmd)
  194. {
  195. StringBuffer serialized;
  196. TokenSerializer tokenSerializer;
  197. tokenSerializer.serialize(value, serialized);
  198. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  199. if (!status.success())
  200. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  201. }
  202. else
  203. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  204. }
  205. void bindUIntParam(unsigned __int64 value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  206. {
  207. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  208. if (pQcmd)
  209. {
  210. StringBuffer serialized;
  211. TokenSerializer tokenSerializer;
  212. tokenSerializer.serialize(value, serialized);
  213. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  214. if (!status.success())
  215. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  216. }
  217. else
  218. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  219. }
  220. void bindRealParam(double value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  221. {
  222. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  223. if (pQcmd)
  224. {
  225. StringBuffer serialized;
  226. TokenSerializer tokenSerializer;
  227. tokenSerializer.serialize(value, serialized);
  228. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  229. if (!status.success())
  230. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  231. }
  232. else
  233. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  234. }
  235. void bindUnicodeParam(unsigned chars, const UChar *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  236. {
  237. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  238. if (pQcmd)
  239. {
  240. size32_t utf8chars;
  241. char *utf8;
  242. rtlUnicodeToUtf8X(utf8chars, utf8, chars, value);
  243. auto status = pQcmd->named_param(cbPlaceholder.str(), utf8);
  244. if (!status.success())
  245. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  246. }
  247. else
  248. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  249. }
  250. int CouchbaseRecordBinder::numFields()
  251. {
  252. int count = 0;
  253. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  254. assertex(fields);
  255. while (*fields++)
  256. count++;
  257. return count;
  258. }
  259. void CouchbaseRecordBinder::processRow(const byte *row)
  260. {
  261. thisParam = firstParam;
  262. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  263. }
  264. void CouchbaseRecordBinder::processString(unsigned len, const char *value, const RtlFieldInfo * field)
  265. {
  266. checkNextParam(field);
  267. bindStringParam(len, value, field, m_pQcmd);
  268. }
  269. void CouchbaseRecordBinder::processBool(bool value, const RtlFieldInfo * field)
  270. {
  271. bindBoolParam(value, field, m_pQcmd);
  272. }
  273. void CouchbaseRecordBinder::processData(unsigned len, const void *value, const RtlFieldInfo * field)
  274. {
  275. bindDataParam(len, value, field, m_pQcmd);
  276. }
  277. void CouchbaseRecordBinder::processInt(__int64 value, const RtlFieldInfo * field)
  278. {
  279. bindIntParam(value, field, m_pQcmd);
  280. }
  281. void CouchbaseRecordBinder::processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  282. {
  283. bindUIntParam(value, field,m_pQcmd);
  284. }
  285. void CouchbaseRecordBinder::processReal(double value, const RtlFieldInfo * field)
  286. {
  287. bindRealParam(value, field, m_pQcmd);
  288. }
  289. void CouchbaseRecordBinder::processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  290. {
  291. Decimal val;
  292. size32_t bytes;
  293. rtlDataAttr decText;
  294. val.setDecimal(digits, precision, value);
  295. val.getStringX(bytes, decText.refstr());
  296. processUtf8(bytes, decText.getstr(), field);
  297. }
  298. void CouchbaseRecordBinder::processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  299. {
  300. bindUnicodeParam(chars, value, field, m_pQcmd);
  301. }
  302. void CouchbaseRecordBinder::processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  303. {
  304. size32_t charCount;
  305. rtlDataAttr text;
  306. rtlQStrToStrX(charCount, text.refstr(), len, value);
  307. processUtf8(charCount, text.getstr(), field);
  308. }
  309. void CouchbaseRecordBinder::processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  310. {
  311. bindStringParam(strlen(value), value, field, m_pQcmd);
  312. }
  313. unsigned CouchbaseRecordBinder::checkNextParam(const RtlFieldInfo * field)
  314. {
  315. if (logctx.queryTraceLevel() > 4)
  316. logctx.CTXLOG("Binding %s to %d", str(field->name), thisParam);
  317. return thisParam++;
  318. }
  319. CouchbaseEmbedFunctionContext::CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags)
  320. : logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags)
  321. {
  322. cbQueryIterator = NULL;
  323. m_pCouchbaseClient = nullptr;
  324. m_pQuery = nullptr;
  325. m_pQcmd = nullptr;
  326. const char *server = "localhost";
  327. const char *user = "";
  328. const char *password = "";
  329. const char *bucketname = "default";
  330. unsigned port = 8091;
  331. bool useSSL = false;
  332. StringBuffer connectionOptions;
  333. StringArray inputOptions;
  334. inputOptions.appendList(options, ",");
  335. ForEachItemIn(idx, inputOptions)
  336. {
  337. const char *opt = inputOptions.item(idx);
  338. const char *val = strchr(opt, '=');
  339. if (val)
  340. {
  341. StringBuffer optName(val-opt, opt);
  342. val++;
  343. if (stricmp(optName, "server")==0)
  344. server = val; // Note that lifetime of val is adequate for this to be safe
  345. else if (stricmp(optName, "port")==0)
  346. port = atoi(val);
  347. else if (stricmp(optName, "user")==0)
  348. user = val;
  349. else if (stricmp(optName, "password")==0)
  350. password = val;
  351. else if (stricmp(optName, "bucket")==0)
  352. bucketname = val;
  353. else if (stricmp(optName, "useSSL")==0)
  354. useSSL = clipStrToBool(val);
  355. //Connection String options
  356. else if (stricmp(optName, "detailed_errcodes")==0
  357. || stricmp(optName, "operation_timeout")==0
  358. || stricmp(optName, "config_total_timeout")==0
  359. || stricmp(optName, "http_poolsize")==0
  360. || stricmp(optName, "detailed_errcodes")==0)
  361. connectionOptions.appendf("%s%s=%s", connectionOptions.length() == 0 ? "?" : "&", optName.str(), val);
  362. else
  363. failx("Unknown option %s", optName.str());
  364. }
  365. }
  366. m_oCBConnection.setown(new CouchbaseConnection(useSSL, server, port, bucketname, user, password, connectionOptions.str()));
  367. m_oCBConnection->connect();
  368. }
  369. IPropertyTree * CouchbaseEmbedFunctionContext::nextResultRowTree()
  370. {
  371. for (auto cbrow : *m_pQuery)
  372. {
  373. auto json = cbrow.json().to_string();
  374. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json.c_str());
  375. return contentTree.getLink();
  376. }
  377. reportIfQueryFailure(m_pQuery);
  378. return nullptr;
  379. }
  380. IPropertyTreeIterator * CouchbaseEmbedFunctionContext::nextResultRowIterator()
  381. {
  382. for (auto cbrow : *m_pQuery)
  383. {
  384. auto json = cbrow.json().to_string();
  385. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json.c_str());
  386. if (contentTree)
  387. return contentTree->getElements("./*");
  388. failx("Could not fetch next result row.");
  389. break;
  390. }
  391. reportIfQueryFailure(m_pQuery);
  392. return nullptr;
  393. }
  394. const char * CouchbaseEmbedFunctionContext::nextResultScalar()
  395. {
  396. auto resultrow = nextResultRowIterator();
  397. if (resultrow)
  398. {
  399. resultrow->first();
  400. if(resultrow->isValid() == true)
  401. {
  402. if (resultrow->query().hasChildren())
  403. typeError("scalar", "");
  404. return resultrow->query().queryProp("");
  405. }
  406. else
  407. failx("Could not fetch next result column.");
  408. }
  409. else
  410. failx("Could not fetch next result row.");
  411. return nullptr;
  412. }
  413. bool CouchbaseEmbedFunctionContext::getBooleanResult()
  414. {
  415. bool mybool;
  416. auto scalar = nextResultScalar();
  417. handleDeserializeOutcome(m_tokenDeserializer.deserialize(scalar, mybool), "bool", scalar);
  418. return mybool;
  419. }
  420. void CouchbaseEmbedFunctionContext::getDataResult(size32_t &len, void * &result)
  421. {
  422. auto value = nextResultScalar();
  423. if (value && *value)
  424. {
  425. rtlStrToDataX(len, result, strlen(value), value); // This feels like it may not work to me - will preallocate rather larger than we want
  426. }
  427. else
  428. {
  429. rtlStrToDataX(len, result, NULLFIELD.resultChars, NULLFIELD.stringResult);
  430. }
  431. }
  432. double CouchbaseEmbedFunctionContext::getRealResult()
  433. {
  434. double mydouble;
  435. auto value = nextResultScalar();
  436. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mydouble), "real", value);
  437. return mydouble;
  438. }
  439. __int64 CouchbaseEmbedFunctionContext::getSignedResult()
  440. {
  441. __int64 myint64;
  442. auto value = nextResultScalar();
  443. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myint64), "signed", value);
  444. return myint64;
  445. }
  446. unsigned __int64 CouchbaseEmbedFunctionContext::getUnsignedResult()
  447. {
  448. unsigned __int64 myuint64;
  449. auto value = nextResultScalar();
  450. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myuint64), "unsigned", value);
  451. return myuint64;
  452. }
  453. void CouchbaseEmbedFunctionContext::getStringResult(size32_t &chars, char * &result)
  454. {
  455. auto value = nextResultScalar();
  456. if (value && *value)
  457. {
  458. unsigned numchars = rtlUtf8Length(strlen(value), value);
  459. rtlUtf8ToStrX(chars, result, numchars, value);
  460. }
  461. else
  462. {
  463. rtlStrToStrX(chars, result, NULLFIELD.resultChars, NULLFIELD.stringResult);
  464. }
  465. }
  466. void CouchbaseEmbedFunctionContext::getUTF8Result(size32_t &chars, char * &result)
  467. {
  468. getStringResult(chars, result);
  469. }
  470. void CouchbaseEmbedFunctionContext::getUnicodeResult(size32_t &chars, UChar * &result)
  471. {
  472. auto value = nextResultScalar();
  473. if (value && *value)
  474. {
  475. unsigned numchars = rtlUtf8Length(strlen(value), value);
  476. rtlUtf8ToUnicodeX(chars, result, numchars, value);
  477. }
  478. else
  479. {
  480. rtlUnicodeToUnicodeX(chars, result, NULLFIELD.resultChars, NULLFIELD.unicodeResult);
  481. }
  482. }
  483. void CouchbaseEmbedFunctionContext::getDecimalResult(Decimal &value)
  484. {
  485. auto text = nextResultScalar();
  486. if (text && *text)
  487. value.setString(rtlUtf8Length(strlen(text), text), text);
  488. else
  489. value.set(NULLFIELD.decimalResult);
  490. }
  491. IRowStream * CouchbaseEmbedFunctionContext::getDatasetResult(IEngineRowAllocator * _resultAllocator)
  492. {
  493. Owned<CouchbaseRowStream> cbaseRowStream;
  494. cbaseRowStream.set(new CouchbaseRowStream(_resultAllocator, m_pQuery));
  495. return cbaseRowStream.getLink();
  496. }
  497. byte * CouchbaseEmbedFunctionContext::getRowResult(IEngineRowAllocator * _resultAllocator)
  498. {
  499. Owned<CouchbaseRowStream> cbaseRowStream;
  500. cbaseRowStream.set(new CouchbaseRowStream(_resultAllocator, m_pQuery));
  501. return (byte *)cbaseRowStream->nextRow();
  502. }
  503. size32_t CouchbaseEmbedFunctionContext::getTransformResult(ARowBuilder & rowBuilder)
  504. {
  505. execute();
  506. auto resultrow = nextResultRowTree();
  507. if (!resultrow)
  508. fail("Failed to read row");
  509. if (resultrow->getCount("./*") != 1)
  510. typeError("row", "");
  511. CouchbaseRowBuilder couchbaseRowBuilder(resultrow);
  512. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  513. assertex(typeInfo);
  514. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  515. return typeInfo->build(rowBuilder, 0, &dummyField, couchbaseRowBuilder);
  516. }
  517. void CouchbaseEmbedFunctionContext::bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  518. {
  519. CouchbaseRecordBinder binder(logctx, metaVal.queryTypeInfo(), m_pQcmd, m_nextParam);
  520. binder.processRow(val);
  521. m_nextParam += binder.numFields();
  522. }
  523. void CouchbaseEmbedFunctionContext::bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  524. {
  525. // We only support a single dataset parameter...
  526. // MORE - look into batch?
  527. if (m_oInputStream)
  528. {
  529. fail("At most one dataset parameter supported");
  530. }
  531. m_oInputStream.setown(new CouchbaseDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), m_pQcmd, m_nextParam));
  532. m_nextParam += m_oInputStream->numFields();
  533. }
  534. void CouchbaseEmbedFunctionContext::bindBooleanParam(const char *name, bool val)
  535. {
  536. checkNextParam(name);
  537. StringBuffer serialized;
  538. m_tokenSerializer.serialize(val, serialized);
  539. VStringBuffer cbPlaceholder("$%s", name);
  540. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  541. if (!status.success())
  542. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  543. }
  544. void CouchbaseEmbedFunctionContext::bindDataParam(const char *name, size32_t len, const void *val)
  545. {
  546. checkNextParam(name);
  547. VStringBuffer cbPlaceholder("$%s", name);
  548. size32_t bytes;
  549. void *data;
  550. rtlStrToDataX(bytes, data, len, val);
  551. auto status = m_pQcmd->named_param(cbPlaceholder.str(), (char *)data);
  552. if (!status.success())
  553. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), (char *)data);
  554. }
  555. void CouchbaseEmbedFunctionContext::bindFloatParam(const char *name, float val)
  556. {
  557. checkNextParam(name);
  558. StringBuffer serialized;
  559. m_tokenSerializer.serialize(val, serialized);
  560. VStringBuffer cbPlaceholder("$%s", name);
  561. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  562. if (!status.success())
  563. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  564. }
  565. void CouchbaseEmbedFunctionContext::bindRealParam(const char *name, double val)
  566. {
  567. checkNextParam(name);
  568. StringBuffer serialized;
  569. m_tokenSerializer.serialize(val, serialized);
  570. VStringBuffer cbPlaceholder("$%s", name);
  571. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  572. if (!status.success())
  573. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  574. }
  575. void CouchbaseEmbedFunctionContext::bindSignedSizeParam(const char *name, int size, __int64 val)
  576. {
  577. bindSignedParam(name, val);
  578. }
  579. void CouchbaseEmbedFunctionContext::bindSignedParam(const char *name, __int64 val)
  580. {
  581. checkNextParam(name);
  582. StringBuffer serialized;
  583. m_tokenSerializer.serialize(val, serialized);
  584. VStringBuffer cbPlaceholder("$%s", name);
  585. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  586. if (!status.success())
  587. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  588. }
  589. void CouchbaseEmbedFunctionContext::bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  590. {
  591. bindUnsignedParam(name, val);
  592. }
  593. void CouchbaseEmbedFunctionContext::bindUnsignedParam(const char *name, unsigned __int64 val)
  594. {
  595. checkNextParam(name);
  596. StringBuffer serialized;
  597. m_tokenSerializer.serialize(val, serialized);
  598. VStringBuffer cbPlaceholder("$%s", name);
  599. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  600. if (!status.success())
  601. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  602. }
  603. void CouchbaseEmbedFunctionContext::bindStringParam(const char *name, size32_t len, const char *val)
  604. {
  605. checkNextParam(name);
  606. VStringBuffer cbPlaceholder("$%s", name);
  607. size32_t utf8chars;
  608. char *utf8;
  609. rtlStrToUtf8X(utf8chars, utf8, len, val);
  610. auto status = m_pQcmd->named_param(cbPlaceholder.str(), utf8);
  611. if (!status.success())
  612. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  613. }
  614. void CouchbaseEmbedFunctionContext::bindVStringParam(const char *name, const char *val)
  615. {
  616. checkNextParam(name);
  617. bindStringParam(name, strlen(val), val);
  618. }
  619. void CouchbaseEmbedFunctionContext::bindUTF8Param(const char *name, size32_t chars, const char *val)
  620. {
  621. checkNextParam(name);
  622. bindStringParam(name, strlen(val), val);
  623. }
  624. void CouchbaseEmbedFunctionContext::bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  625. {
  626. checkNextParam(name);
  627. VStringBuffer cbPlaceholder("$%s", name);
  628. size32_t utf8chars;
  629. char *utf8;
  630. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  631. auto status = m_pQcmd->named_param(cbPlaceholder.str(), utf8);
  632. if (!status.success())
  633. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  634. }
  635. void CouchbaseEmbedFunctionContext::compileEmbeddedScript(size32_t chars, const char *script)
  636. {
  637. if (script && *script)
  638. {
  639. m_pQcmd = new Couchbase::QueryCommand(script);
  640. if ((m_scriptFlags & EFnoparams) == 0)
  641. m_numParams = countParameterPlaceholders(script);
  642. else
  643. m_numParams = 0;
  644. }
  645. else
  646. failx("Empty N1QL query detected");
  647. }
  648. void CouchbaseEmbedFunctionContext::callFunction()
  649. {
  650. execute();
  651. }
  652. void CouchbaseEmbedFunctionContext::execute()
  653. {
  654. if (m_oInputStream)
  655. m_oInputStream->executeAll(m_oCBConnection);
  656. else
  657. {
  658. m_pQuery = m_oCBConnection->query(m_pQcmd);
  659. reportIfQueryFailure(m_pQuery);
  660. }
  661. }
  662. unsigned CouchbaseEmbedFunctionContext::checkNextParam(const char *name)
  663. {
  664. if (m_nextParam == m_numParams)
  665. failx("Too many parameters supplied: No matching $<name> placeholder for parameter %s", name);
  666. return m_nextParam++;
  667. }
  668. bool CouchbaseRowBuilder::getBooleanResult(const RtlFieldInfo *field)
  669. {
  670. const char * value = nextField(field);
  671. if (!value && !*value)
  672. {
  673. NullFieldProcessor p(field);
  674. return p.boolResult;
  675. }
  676. bool mybool;
  677. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mybool), "bool", value);
  678. return mybool;
  679. }
  680. void CouchbaseRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  681. {
  682. const char * value = nextField(field);
  683. if (!value || !*value)
  684. {
  685. NullFieldProcessor p(field);
  686. rtlStrToDataX(len, result, p.resultChars, p.stringResult);
  687. return;
  688. }
  689. rtlStrToDataX(len, result, strlen(value), value); // This feels like it may not work to me - will preallocate rather larger than we want
  690. }
  691. double CouchbaseRowBuilder::getRealResult(const RtlFieldInfo *field)
  692. {
  693. const char * value = nextField(field);
  694. if (!value || !*value)
  695. {
  696. NullFieldProcessor p(field);
  697. return p.doubleResult;
  698. }
  699. double mydouble;
  700. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mydouble), "real", value);
  701. return mydouble;
  702. }
  703. __int64 CouchbaseRowBuilder::getSignedResult(const RtlFieldInfo *field)
  704. {
  705. const char * value = nextField(field);
  706. if (!value || !*value)
  707. {
  708. NullFieldProcessor p(field);
  709. return p.uintResult;
  710. }
  711. __int64 myint64;
  712. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myint64), "signed", value);
  713. return myint64;
  714. }
  715. unsigned __int64 CouchbaseRowBuilder::getUnsignedResult(const RtlFieldInfo *field)
  716. {
  717. const char * value = nextField(field);
  718. if (!value || !*value)
  719. {
  720. NullFieldProcessor p(field);
  721. return p.uintResult;
  722. }
  723. unsigned __int64 myuint64;
  724. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myuint64), "unsigned", value);
  725. return myuint64;
  726. }
  727. void CouchbaseRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  728. {
  729. const char * value = nextField(field);
  730. if (!value || !*value)
  731. {
  732. NullFieldProcessor p(field);
  733. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  734. return;
  735. }
  736. unsigned numchars = rtlUtf8Length(strlen(value), value); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  737. rtlUtf8ToStrX(chars, result, numchars, value);
  738. return;
  739. }
  740. void CouchbaseRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  741. {
  742. getStringResult(field, chars, result);
  743. return;
  744. }
  745. void CouchbaseRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  746. {
  747. const char * value = nextField(field);
  748. if (!value || !*value)
  749. {
  750. NullFieldProcessor p(field);
  751. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  752. return;
  753. }
  754. unsigned numchars = rtlUtf8Length(strlen(value), value); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  755. rtlUtf8ToUnicodeX(chars, result, numchars, value);
  756. return;
  757. }
  758. void CouchbaseRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  759. {
  760. const char * dvalue = nextField(field);
  761. if (!dvalue || !*dvalue)
  762. {
  763. NullFieldProcessor p(field);
  764. value.set(p.decimalResult);
  765. return;
  766. }
  767. size32_t chars;
  768. rtlDataAttr result;
  769. value.setString(strlen(dvalue), dvalue);
  770. if (field)
  771. {
  772. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  773. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  774. }
  775. }
  776. void CouchbaseRowBuilder::processBeginDataset(const RtlFieldInfo * field)
  777. {
  778. /*
  779. *
  780. *childRec := RECORD real x; real y; END;
  781. *parentRec := RECORD
  782. * childRec child1, <-- flatens out the childrec, this function would receive a field of name x
  783. * dataset(childRec) child2; <-- keeps nested structure, this funciton would receive a field of name child2
  784. *END;
  785. */
  786. if (getNumFields(field->type->queryChildType()) > 0)
  787. m_oNestedField.set(m_oResultRow->queryBranch(field->name->queryStr()));
  788. }
  789. void CouchbaseRowBuilder::processBeginRow(const RtlFieldInfo * field)
  790. {
  791. m_fieldsProcessedCount = 0;
  792. m_rowFieldCount = getNumFields(field->type);
  793. }
  794. bool CouchbaseRowBuilder::processNextRow(const RtlFieldInfo * field)
  795. {
  796. return m_fieldsProcessedCount + 1 == m_rowFieldCount;
  797. }
  798. void CouchbaseRowBuilder::processEndDataset(const RtlFieldInfo * field)
  799. {
  800. if(m_oNestedField)
  801. m_oNestedField.clear();
  802. }
  803. void CouchbaseRowBuilder::processEndRow(const RtlFieldInfo * field)
  804. {
  805. if(m_oNestedField)
  806. m_oNestedField.clear();
  807. }
  808. const char * CouchbaseRowBuilder::nextField(const RtlFieldInfo * field)
  809. {
  810. m_fieldsProcessedCount++;
  811. if (!m_oResultRow)
  812. failx("Missing result row data");
  813. const char * fieldname = field->name->queryStr();
  814. if (!fieldname || !*fieldname)
  815. failx("Missing result column metadata (name)");
  816. if (!m_oResultRow->hasProp(fieldname))
  817. {
  818. VStringBuffer nxpath("locationData/%s", fieldname);
  819. if (m_oNestedField)
  820. {
  821. if (!m_oNestedField->hasProp(fieldname))
  822. {
  823. StringBuffer xml;
  824. toXML(m_oResultRow, xml);
  825. failx("Result row does not contain field: %s: %s", fieldname, xml.str());
  826. }
  827. return m_oNestedField->queryProp(fieldname);
  828. }
  829. }
  830. return m_oResultRow->queryProp(fieldname);
  831. }
  832. class CouchbaseEmbedContext : public CInterfaceOf<IEmbedContext>
  833. {
  834. public:
  835. virtual IEmbedFunctionContext * createFunctionContext(unsigned flags, const char *options)
  836. {
  837. return createFunctionContextEx(NULL, flags, options);
  838. }
  839. virtual IEmbedFunctionContext * createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  840. {
  841. if (flags & EFimport)
  842. {
  843. UNSUPPORTED("IMPORT");
  844. return nullptr;
  845. }
  846. else
  847. return new CouchbaseEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), options, flags);
  848. }
  849. virtual IEmbedServiceContext * createServiceContext(const char *service, unsigned flags, const char *options)
  850. {
  851. throwUnexpected();
  852. return nullptr;
  853. }
  854. };
  855. extern IEmbedContext* getEmbedContext()
  856. {
  857. return new CouchbaseEmbedContext();
  858. }
  859. extern bool syntaxCheck(const char *script)
  860. {
  861. return true; // TO-DO
  862. }
  863. } // namespace