couchbaseembed.cpp 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "couchbaseembed.hpp"
  14. #include "platform.h"
  15. #include "jexcept.hpp"
  16. #include "jlog.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include <map>
  23. #include <mutex>
  24. #include <thread>
  25. static const char *g_moduleName = "couchbase";
  26. static const char *g_moduleDescription = "Couchbase Embed Helper";
  27. static const char *g_version = "Couchbase Embed Helper 1.0.0";
  28. static const char *g_compatibleVersions[] = { g_version, nullptr };
  29. static const NullFieldProcessor NULLFIELD(NULL);
  30. extern "C" COUCHBASEEMBED_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  31. {
  32. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  33. {
  34. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  35. pbx->compatibleVersions = g_compatibleVersions;
  36. }
  37. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  38. return false;
  39. pb->magicVersion = PLUGIN_VERSION;
  40. pb->version = g_version;
  41. pb->moduleName = g_moduleName;
  42. pb->ECL = nullptr;
  43. pb->flags = PLUGIN_IMPLICIT_MODULE;
  44. pb->description = g_moduleDescription;
  45. return true;
  46. }
  47. namespace couchbaseembed
  48. {
  49. const time_t OBJECT_EXPIRE_TIMEOUT_SECONDS = 60 * 2; // Two minutes
  50. static std::once_flag connectionCacheInitFlag;
  51. //--------------------------------------------------------------------------
  52. // Plugin Classes
  53. //--------------------------------------------------------------------------
  54. void reportIfQueryFailure(Couchbase::Query * query)
  55. {
  56. auto status = query->meta().status();
  57. if (status.errcode())
  58. {
  59. if (status.isNetworkError())
  60. failx("NetworkErr: %s", status.description());
  61. else if (status.isDataError())
  62. failx("DataErr: %s", status.description());
  63. else if (status.isInputError())
  64. failx("InputErr: %s", status.description());
  65. else if (status.isTemporary())
  66. failx("TempErr: %s", status.description());
  67. else
  68. failx("Couchbase err: %s (%d)", status.description(), status.errcode());
  69. }
  70. //consider parsing json result
  71. if (strstr(query->meta().body().to_string().c_str(), "\"status\": \"errors\""))
  72. failx("Err: %s", query->meta().body().to_string().c_str());
  73. }
  74. CouchbaseRowStream::CouchbaseRowStream(IEngineRowAllocator* resultAllocator, Couchbase::Query * cbaseQuery)
  75. : m_resultAllocator(resultAllocator)
  76. {
  77. m_currentRow = 0;
  78. m_shouldRead = true;
  79. //iterating over result rows and copying them to stringarray
  80. //is there a way to independently step through original result rows?
  81. for (auto cbrow : *cbaseQuery)
  82. m_Rows.append(cbrow.json().to_string().c_str());
  83. reportIfQueryFailure(cbaseQuery);
  84. }
  85. CouchbaseRowStream::~CouchbaseRowStream() {}
  86. const void * CouchbaseRowStream::nextRow()
  87. {
  88. const void * result = nullptr;
  89. if (m_shouldRead && m_currentRow < m_Rows.length())
  90. {
  91. auto json = m_Rows.item(m_currentRow++);
  92. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json,ipt_caseInsensitive);
  93. if (contentTree)
  94. {
  95. CouchbaseRowBuilder cbRowBuilder(contentTree);
  96. RtlDynamicRowBuilder rowBuilder(m_resultAllocator);
  97. const RtlTypeInfo *typeInfo = m_resultAllocator->queryOutputMeta()->queryTypeInfo();
  98. assertex(typeInfo);
  99. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  100. size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, cbRowBuilder);
  101. return rowBuilder.finalizeRowClear(len);
  102. }
  103. else
  104. failx("Error processing result row");
  105. }
  106. return result;
  107. }
  108. void CouchbaseRowStream::stop()
  109. {
  110. m_resultAllocator.clear();
  111. m_shouldRead = false;
  112. }
  113. Couchbase::Query * CouchbaseConnection::query(Couchbase::QueryCommand * qcommand)
  114. {
  115. Couchbase::Status queryStatus;
  116. Couchbase::Query * pQuery = new Couchbase::Query(*m_pCouchbaseClient, *qcommand, queryStatus); // will be owned by method caller
  117. if (!queryStatus)
  118. failx("Couldn't issue query: %s", queryStatus.description());
  119. if (!pQuery->status())
  120. failx("Couldn't execute query, reason: %s\nBody is: ", pQuery->meta().body().data());
  121. if (pQuery->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
  122. failx("Query execution error: %s", pQuery->meta().body().data());
  123. return pQuery;
  124. }
  125. extern void UNSUPPORTED(const char *feature)
  126. {
  127. throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in %s", feature, g_version);
  128. }
  129. extern void failx(const char *message, ...)
  130. {
  131. va_list args;
  132. va_start(args,message);
  133. StringBuffer msg;
  134. msg.appendf("%s: ", g_moduleName).valist_appendf(message,args);
  135. va_end(args);
  136. rtlFail(0, msg.str());
  137. }
  138. extern void fail(const char *message)
  139. {
  140. StringBuffer msg;
  141. msg.appendf("%s: ", g_moduleName).append(message);
  142. rtlFail(0, msg.str());
  143. }
  144. void bindStringParam(unsigned len, const char *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  145. {
  146. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  147. if (pQcmd)
  148. {
  149. size32_t utf8chars;
  150. char *utf8;
  151. rtlStrToUtf8X(utf8chars, utf8, len, value);
  152. auto status = pQcmd->named_param(cbPlaceholder.str(), utf8);
  153. if (!status.success())
  154. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  155. }
  156. else
  157. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  158. }
  159. void bindBoolParam(bool value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  160. {
  161. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  162. if (pQcmd)
  163. {
  164. StringBuffer serialized;
  165. TokenSerializer tokenSerializer;
  166. tokenSerializer.serialize(value, serialized);
  167. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  168. if (!status.success())
  169. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  170. }
  171. else
  172. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  173. }
  174. void bindDataParam(unsigned len, const void *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  175. {
  176. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  177. if (pQcmd)
  178. {
  179. size32_t bytes;
  180. void *data;
  181. rtlStrToDataX(bytes, data, len, value);
  182. auto status = pQcmd->named_param(cbPlaceholder.str(), (char *)data);
  183. if (!status.success())
  184. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), (char *)data);
  185. }
  186. else
  187. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  188. }
  189. void bindIntParam(__int64 value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  190. {
  191. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  192. if (pQcmd)
  193. {
  194. StringBuffer serialized;
  195. TokenSerializer tokenSerializer;
  196. tokenSerializer.serialize(value, serialized);
  197. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  198. if (!status.success())
  199. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  200. }
  201. else
  202. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  203. }
  204. void bindUIntParam(unsigned __int64 value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  205. {
  206. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  207. if (pQcmd)
  208. {
  209. StringBuffer serialized;
  210. TokenSerializer tokenSerializer;
  211. tokenSerializer.serialize(value, serialized);
  212. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  213. if (!status.success())
  214. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  215. }
  216. else
  217. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  218. }
  219. void bindRealParam(double value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  220. {
  221. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  222. if (pQcmd)
  223. {
  224. StringBuffer serialized;
  225. TokenSerializer tokenSerializer;
  226. tokenSerializer.serialize(value, serialized);
  227. auto status = pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  228. if (!status.success())
  229. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  230. }
  231. else
  232. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  233. }
  234. void bindUnicodeParam(unsigned chars, const UChar *value, const RtlFieldInfo * field, Couchbase::QueryCommand * pQcmd)
  235. {
  236. VStringBuffer cbPlaceholder("$%s", field->name->queryStr());
  237. if (pQcmd)
  238. {
  239. size32_t utf8chars;
  240. char *utf8;
  241. rtlUnicodeToUtf8X(utf8chars, utf8, chars, value);
  242. auto status = pQcmd->named_param(cbPlaceholder.str(), utf8);
  243. if (!status.success())
  244. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  245. }
  246. else
  247. failx("Internal error: detected invalid CouchbaseQueryCommand while attempting to bind to field: %s", cbPlaceholder.str());
  248. }
  249. int CouchbaseRecordBinder::numFields()
  250. {
  251. int count = 0;
  252. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  253. assertex(fields);
  254. while (*fields++)
  255. count++;
  256. return count;
  257. }
  258. void CouchbaseRecordBinder::processRow(const byte *row)
  259. {
  260. thisParam = firstParam;
  261. typeInfo->process(row, row, &dummyField, *this); // Bind the variables for the current row
  262. }
  263. void CouchbaseRecordBinder::processString(unsigned len, const char *value, const RtlFieldInfo * field)
  264. {
  265. checkNextParam(field);
  266. bindStringParam(len, value, field, m_pQcmd);
  267. }
  268. void CouchbaseRecordBinder::processBool(bool value, const RtlFieldInfo * field)
  269. {
  270. bindBoolParam(value, field, m_pQcmd);
  271. }
  272. void CouchbaseRecordBinder::processData(unsigned len, const void *value, const RtlFieldInfo * field)
  273. {
  274. bindDataParam(len, value, field, m_pQcmd);
  275. }
  276. void CouchbaseRecordBinder::processInt(__int64 value, const RtlFieldInfo * field)
  277. {
  278. bindIntParam(value, field, m_pQcmd);
  279. }
  280. void CouchbaseRecordBinder::processUInt(unsigned __int64 value, const RtlFieldInfo * field)
  281. {
  282. bindUIntParam(value, field,m_pQcmd);
  283. }
  284. void CouchbaseRecordBinder::processReal(double value, const RtlFieldInfo * field)
  285. {
  286. bindRealParam(value, field, m_pQcmd);
  287. }
  288. void CouchbaseRecordBinder::processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  289. {
  290. Decimal val;
  291. size32_t bytes;
  292. rtlDataAttr decText;
  293. val.setDecimal(digits, precision, value);
  294. val.getStringX(bytes, decText.refstr());
  295. processUtf8(bytes, decText.getstr(), field);
  296. }
  297. void CouchbaseRecordBinder::processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field)
  298. {
  299. bindUnicodeParam(chars, value, field, m_pQcmd);
  300. }
  301. void CouchbaseRecordBinder::processQString(unsigned len, const char *value, const RtlFieldInfo * field)
  302. {
  303. size32_t charCount;
  304. rtlDataAttr text;
  305. rtlQStrToStrX(charCount, text.refstr(), len, value);
  306. processUtf8(charCount, text.getstr(), field);
  307. }
  308. void CouchbaseRecordBinder::processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
  309. {
  310. bindStringParam(strlen(value), value, field, m_pQcmd);
  311. }
  312. unsigned CouchbaseRecordBinder::checkNextParam(const RtlFieldInfo * field)
  313. {
  314. if (logctx.queryTraceLevel() > 4)
  315. logctx.CTXLOG("Binding %s to %d", str(field->name), thisParam);
  316. return thisParam++;
  317. }
  318. static class ConnectionCacheObj
  319. {
  320. private:
  321. typedef std::vector<CouchbaseConnection*> ConnectionList;
  322. typedef std::map<hash64_t, ConnectionList> ObjMap;
  323. public:
  324. ConnectionCacheObj(int _traceLevel)
  325. : traceLevel(_traceLevel)
  326. {
  327. }
  328. ~ConnectionCacheObj()
  329. {
  330. deleteAll();
  331. }
  332. void deleteAll()
  333. {
  334. CriticalBlock block(cacheLock);
  335. // Delete all idle connection objects
  336. for (ObjMap::iterator keyIter = idleConnections.begin(); keyIter != idleConnections.end(); keyIter++)
  337. {
  338. for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
  339. {
  340. if (*connectionIter)
  341. {
  342. delete(*connectionIter);
  343. }
  344. }
  345. }
  346. idleConnections.clear();
  347. // Delete all active connection objects
  348. for (ObjMap::iterator keyIter = activeConnections.begin(); keyIter != activeConnections.end(); keyIter++)
  349. {
  350. for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
  351. {
  352. if (*connectionIter)
  353. {
  354. delete(*connectionIter);
  355. }
  356. }
  357. }
  358. activeConnections.clear();
  359. }
  360. void releaseActive(CouchbaseConnection* connectionPtr)
  361. {
  362. CriticalBlock block(cacheLock);
  363. // Find given connection in our active list and move it to our
  364. // idle list
  365. for (ObjMap::iterator keyIter = activeConnections.begin(); keyIter != activeConnections.end(); keyIter++)
  366. {
  367. for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
  368. {
  369. if (*connectionIter == connectionPtr)
  370. {
  371. connectionPtr->updateTimeTouched();
  372. keyIter->second.erase(connectionIter);
  373. idleConnections[keyIter->first].push_back(connectionPtr);
  374. if (traceLevel > 4)
  375. {
  376. DBGLOG("Couchbase: Released connection object %p", connectionPtr);
  377. }
  378. return;
  379. }
  380. }
  381. }
  382. }
  383. void expire()
  384. {
  385. if (!idleConnections.empty())
  386. {
  387. CriticalBlock block(cacheLock);
  388. time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
  389. __int32 expireCount = 0;
  390. for (ObjMap::iterator keyIter = idleConnections.begin(); keyIter != idleConnections.end(); keyIter++)
  391. {
  392. ConnectionList::iterator connectionIter = keyIter->second.begin();
  393. while (connectionIter != keyIter->second.end())
  394. {
  395. if (*connectionIter)
  396. {
  397. if ((*connectionIter)->getTimeTouched() < oldestAllowedTime)
  398. {
  399. delete(*connectionIter);
  400. connectionIter = keyIter->second.erase(connectionIter);
  401. ++expireCount;
  402. }
  403. else
  404. {
  405. ++connectionIter;
  406. }
  407. }
  408. else
  409. {
  410. connectionIter = keyIter->second.erase(connectionIter);
  411. }
  412. }
  413. }
  414. if (traceLevel > 4 && expireCount > 0)
  415. {
  416. DBGLOG("Couchbase: Expired %d cached connection%s", expireCount, (expireCount == 1 ? "" : "s"));
  417. }
  418. }
  419. }
  420. CouchbaseConnection* getConnection(bool useSSL, const char * host, unsigned port, const char * bucketname, const char * password, const char * connOptions, unsigned int maxConnections)
  421. {
  422. CouchbaseConnection* connectionObjPtr = nullptr;
  423. StringBuffer connectionString;
  424. CouchbaseConnection::makeConnectionString(useSSL, host, port, bucketname, connOptions, connectionString);
  425. // Use a hash of the connection string as the key to finding
  426. // any idle connection objects
  427. hash64_t key = rtlHash64VStr(connectionString.str(), 0);
  428. while (true)
  429. {
  430. {
  431. CriticalBlock block(cacheLock);
  432. ConnectionList& idleConnectionList = idleConnections[key];
  433. if (!idleConnectionList.empty())
  434. {
  435. // We have at least one idle connection; use that
  436. connectionObjPtr = idleConnectionList.back();
  437. idleConnectionList.pop_back();
  438. connectionObjPtr->updateTimeTouched();
  439. // Push the connection object onto our active list
  440. activeConnections[key].push_back(connectionObjPtr);
  441. if (traceLevel > 4)
  442. {
  443. DBGLOG("Couchbase: Using cached connection object %p: %s", connectionObjPtr, connectionString.str());
  444. }
  445. break;
  446. }
  447. else if (maxConnections == 0 || activeConnections[key].size() < maxConnections)
  448. {
  449. // No idle connections but we don't have to wait for
  450. // one; exit the loop and create a new connection
  451. break;
  452. }
  453. }
  454. // We can't exit the loop and allow a new connection to
  455. // be created because there are too many active
  456. // connections already; wait for a short while
  457. // and try again
  458. std::this_thread::sleep_for(std::chrono::microseconds(10));
  459. }
  460. if (!connectionObjPtr)
  461. {
  462. // An idle connection for that particular combination of
  463. // options does not exist so we need to create one;
  464. // use a small loop to retry connections if necessary
  465. unsigned int connectAttempt = 0;
  466. unsigned int MAX_ATTEMPTS = 10;
  467. // coverity[DC.WEAK_CRYPTO]
  468. useconds_t SLEEP_TIME = 100 + (rand() % 200); // Add jitter to sleep time
  469. while (true)
  470. {
  471. connectionObjPtr = new CouchbaseConnection(connectionString, password);
  472. connectionObjPtr->connect();
  473. if (connectionObjPtr->getConnectionStatus().success())
  474. {
  475. {
  476. // Push new connection object onto our active list
  477. CriticalBlock block(cacheLock);
  478. connectionObjPtr->updateTimeTouched();
  479. ConnectionList& activeConnectionList = activeConnections[key];
  480. activeConnectionList.push_back(connectionObjPtr);
  481. }
  482. if (traceLevel > 4)
  483. {
  484. DBGLOG("Couchbase: Created and cached new connection object %p: %s", connectionObjPtr, connectionString.str());
  485. }
  486. break;
  487. }
  488. else if (connectionObjPtr->getConnectionStatus().isTemporary())
  489. {
  490. ++connectAttempt;
  491. if (connectAttempt < MAX_ATTEMPTS)
  492. {
  493. // According to libcouchbase-cxx, we need
  494. // to destroy the connection object if
  495. // there has been a failure of any kind
  496. delete(connectionObjPtr);
  497. connectionObjPtr = nullptr;
  498. std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIME));
  499. }
  500. else
  501. {
  502. // Capture the final failure reason and
  503. // destroy the connection object before
  504. // throwing an error
  505. std::string reason = connectionObjPtr->getConnectionStatus().description();
  506. delete(connectionObjPtr);
  507. connectionObjPtr = nullptr;
  508. failx("Failed to connect to couchbase instance: %s Reason: '%s'", connectionString.str(), reason.c_str());
  509. }
  510. }
  511. else
  512. {
  513. // Capture the final failure reason and
  514. // destroy the connection object before
  515. // throwing an error
  516. std::string reason = connectionObjPtr->getConnectionStatus().description();
  517. delete(connectionObjPtr);
  518. connectionObjPtr = nullptr;
  519. failx("Failed to connect to couchbase instance: %s Reason: '%s'", connectionString.str(), reason.c_str());
  520. }
  521. }
  522. }
  523. return connectionObjPtr;
  524. }
  525. private:
  526. ObjMap idleConnections; //!< std::map of created CouchbaseConnection object pointers
  527. ObjMap activeConnections; //!< std::map of created CouchbaseConnection object pointers
  528. CriticalSection cacheLock; //!< Mutex guarding modifications to connection pools
  529. int traceLevel; //!< The current logging level
  530. } *connectionCache;
  531. static class ConnectionCacheExpirerObj : public Thread
  532. {
  533. public:
  534. ConnectionCacheExpirerObj()
  535. : Thread("Couchbase::ConnectionCacheExpirer"),
  536. shouldRun(false)
  537. {
  538. }
  539. virtual void start()
  540. {
  541. if (!isAlive())
  542. {
  543. shouldRun = true;
  544. Thread::start();
  545. }
  546. }
  547. virtual void stop()
  548. {
  549. if (isAlive())
  550. {
  551. shouldRun = false;
  552. join();
  553. }
  554. }
  555. virtual int run()
  556. {
  557. // Periodically delete connections that have been idle too long
  558. while (shouldRun)
  559. {
  560. if (connectionCache)
  561. {
  562. connectionCache->expire();
  563. }
  564. std::this_thread::sleep_for(std::chrono::microseconds(1000));
  565. }
  566. return 0;
  567. }
  568. private:
  569. std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
  570. } *connectionCacheExpirer;
  571. static void setupConnectionCache(int traceLevel)
  572. {
  573. couchbaseembed::connectionCache = new couchbaseembed::ConnectionCacheObj(traceLevel);
  574. couchbaseembed::connectionCacheExpirer = new couchbaseembed::ConnectionCacheExpirerObj;
  575. couchbaseembed::connectionCacheExpirer->start();
  576. }
  577. CouchbaseEmbedFunctionContext::CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags)
  578. : logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags)
  579. {
  580. m_pQuery = nullptr;
  581. m_pQcmd = nullptr;
  582. const char *server = "localhost";
  583. const char *user = "";
  584. const char *password = "";
  585. const char *bucketname = "default";
  586. unsigned port = 8091;
  587. bool useSSL = false;
  588. StringBuffer connectionOptions;
  589. unsigned int maxConnections = 0;
  590. StringArray inputOptions;
  591. inputOptions.appendList(options, ",");
  592. ForEachItemIn(idx, inputOptions)
  593. {
  594. const char *opt = inputOptions.item(idx);
  595. const char *val = strchr(opt, '=');
  596. if (val)
  597. {
  598. StringBuffer optName(val-opt, opt);
  599. val++;
  600. if (stricmp(optName, "server")==0)
  601. server = val; // Note that lifetime of val is adequate for this to be safe
  602. else if (stricmp(optName, "port")==0)
  603. port = atoi(val);
  604. else if (stricmp(optName, "user")==0)
  605. user = val; // This is not used but retained for backwards-compatibility
  606. else if (stricmp(optName, "password")==0)
  607. password = val;
  608. else if (stricmp(optName, "bucket")==0)
  609. bucketname = val;
  610. else if (stricmp(optName, "useSSL")==0)
  611. useSSL = clipStrToBool(val);
  612. else if (stricmp(optName, "max_connections")==0)
  613. maxConnections = atoi(val);
  614. //Connection String options
  615. else if (stricmp(optName, "detailed_errcodes")==0
  616. || stricmp(optName, "operation_timeout")==0
  617. || stricmp(optName, "config_total_timeout")==0
  618. || stricmp(optName, "http_poolsize")==0)
  619. connectionOptions.appendf("%s%s=%s", connectionOptions.length() == 0 ? "?" : "&", optName.str(), val);
  620. else
  621. failx("Unknown option %s", optName.str());
  622. }
  623. }
  624. std::call_once(connectionCacheInitFlag, setupConnectionCache, logctx.queryTraceLevel());
  625. // Get a cached idle connection or create a new one
  626. m_oCBConnection = connectionCache->getConnection(useSSL, server, port, bucketname, password, connectionOptions.str(), maxConnections);
  627. }
  628. CouchbaseEmbedFunctionContext::~CouchbaseEmbedFunctionContext()
  629. {
  630. if (m_pQcmd)
  631. {
  632. delete m_pQcmd;
  633. m_pQcmd = nullptr;
  634. }
  635. if (m_pQuery)
  636. {
  637. delete m_pQuery;
  638. m_pQuery = nullptr;
  639. }
  640. if (m_oCBConnection)
  641. {
  642. // When the context is deleted we should return any connection
  643. // object back to idle status
  644. connectionCache->releaseActive(m_oCBConnection);
  645. m_oCBConnection = nullptr;
  646. }
  647. }
  648. IPropertyTree * CouchbaseEmbedFunctionContext::nextResultRowTree()
  649. {
  650. for (auto cbrow : *m_pQuery)
  651. {
  652. auto json = cbrow.json().to_string();
  653. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json.c_str());
  654. return contentTree.getLink();
  655. }
  656. reportIfQueryFailure(m_pQuery);
  657. return nullptr;
  658. }
  659. IPropertyTreeIterator * CouchbaseEmbedFunctionContext::nextResultRowIterator()
  660. {
  661. for (auto cbrow : *m_pQuery)
  662. {
  663. auto json = cbrow.json().to_string();
  664. Owned<IPropertyTree> contentTree = createPTreeFromJSONString(json.c_str());
  665. if (contentTree)
  666. return contentTree->getElements("./*");
  667. failx("Could not fetch next result row.");
  668. break;
  669. }
  670. reportIfQueryFailure(m_pQuery);
  671. return nullptr;
  672. }
  673. const char * CouchbaseEmbedFunctionContext::nextResultScalar()
  674. {
  675. m_resultrow.setown(nextResultRowIterator());
  676. if (m_resultrow)
  677. {
  678. m_resultrow->first();
  679. if(m_resultrow->isValid() == true)
  680. {
  681. if (m_resultrow->query().hasChildren())
  682. typeError("scalar", "");
  683. return m_resultrow->query().queryProp("");
  684. }
  685. else
  686. failx("Could not fetch next result column.");
  687. }
  688. else
  689. failx("Could not fetch next result row.");
  690. return nullptr;
  691. }
  692. bool CouchbaseEmbedFunctionContext::getBooleanResult()
  693. {
  694. bool mybool;
  695. auto scalar = nextResultScalar();
  696. handleDeserializeOutcome(m_tokenDeserializer.deserialize(scalar, mybool), "bool", scalar);
  697. return mybool;
  698. }
  699. void CouchbaseEmbedFunctionContext::getDataResult(size32_t &len, void * &result)
  700. {
  701. auto value = nextResultScalar();
  702. if (value && *value)
  703. {
  704. rtlStrToDataX(len, result, strlen(value), value); // This feels like it may not work to me - will preallocate rather larger than we want
  705. }
  706. else
  707. {
  708. rtlStrToDataX(len, result, NULLFIELD.resultChars, NULLFIELD.stringResult);
  709. }
  710. }
  711. double CouchbaseEmbedFunctionContext::getRealResult()
  712. {
  713. double mydouble = 0.0;
  714. auto value = nextResultScalar();
  715. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mydouble), "real", value);
  716. return mydouble;
  717. }
  718. __int64 CouchbaseEmbedFunctionContext::getSignedResult()
  719. {
  720. __int64 myint64 = 0;
  721. auto value = nextResultScalar();
  722. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myint64), "signed", value);
  723. return myint64;
  724. }
  725. unsigned __int64 CouchbaseEmbedFunctionContext::getUnsignedResult()
  726. {
  727. unsigned __int64 myuint64 = 0;
  728. auto value = nextResultScalar();
  729. handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myuint64), "unsigned", value);
  730. return myuint64;
  731. }
  732. void CouchbaseEmbedFunctionContext::getStringResult(size32_t &chars, char * &result)
  733. {
  734. auto value = nextResultScalar();
  735. if (value && *value)
  736. {
  737. unsigned numchars = rtlUtf8Length(strlen(value), value);
  738. rtlUtf8ToStrX(chars, result, numchars, value);
  739. }
  740. else
  741. {
  742. rtlStrToStrX(chars, result, NULLFIELD.resultChars, NULLFIELD.stringResult);
  743. }
  744. }
  745. void CouchbaseEmbedFunctionContext::getUTF8Result(size32_t &chars, char * &result)
  746. {
  747. getStringResult(chars, result);
  748. }
  749. void CouchbaseEmbedFunctionContext::getUnicodeResult(size32_t &chars, UChar * &result)
  750. {
  751. auto value = nextResultScalar();
  752. if (value && *value)
  753. {
  754. unsigned numchars = rtlUtf8Length(strlen(value), value);
  755. rtlUtf8ToUnicodeX(chars, result, numchars, value);
  756. }
  757. else
  758. {
  759. rtlUnicodeToUnicodeX(chars, result, NULLFIELD.resultChars, NULLFIELD.unicodeResult);
  760. }
  761. }
  762. void CouchbaseEmbedFunctionContext::getDecimalResult(Decimal &value)
  763. {
  764. auto text = nextResultScalar();
  765. if (text && *text)
  766. value.setString(rtlUtf8Length(strlen(text), text), text);
  767. else
  768. value.set(NULLFIELD.decimalResult);
  769. }
  770. IRowStream * CouchbaseEmbedFunctionContext::getDatasetResult(IEngineRowAllocator * _resultAllocator)
  771. {
  772. Owned<CouchbaseRowStream> cbaseRowStream;
  773. cbaseRowStream.setown(new CouchbaseRowStream(_resultAllocator, m_pQuery));
  774. return cbaseRowStream.getLink();
  775. }
  776. byte * CouchbaseEmbedFunctionContext::getRowResult(IEngineRowAllocator * _resultAllocator)
  777. {
  778. Owned<CouchbaseRowStream> cbaseRowStream;
  779. cbaseRowStream.setown(new CouchbaseRowStream(_resultAllocator, m_pQuery));
  780. return (byte *)cbaseRowStream->nextRow();
  781. }
  782. size32_t CouchbaseEmbedFunctionContext::getTransformResult(ARowBuilder & rowBuilder)
  783. {
  784. execute();
  785. auto resultrow = nextResultRowTree();
  786. if (!resultrow)
  787. fail("Failed to read row");
  788. if (resultrow->getCount("./*") != 1)
  789. typeError("row", "");
  790. CouchbaseRowBuilder couchbaseRowBuilder(resultrow);
  791. const RtlTypeInfo *typeInfo = rowBuilder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  792. assertex(typeInfo);
  793. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  794. return typeInfo->build(rowBuilder, 0, &dummyField, couchbaseRowBuilder);
  795. }
  796. void CouchbaseEmbedFunctionContext::bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
  797. {
  798. CouchbaseRecordBinder binder(logctx, metaVal.queryTypeInfo(), m_pQcmd, m_nextParam);
  799. binder.processRow(val);
  800. m_nextParam += binder.numFields();
  801. }
  802. void CouchbaseEmbedFunctionContext::bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  803. {
  804. // We only support a single dataset parameter...
  805. // MORE - look into batch?
  806. if (m_oInputStream)
  807. {
  808. fail("At most one dataset parameter supported");
  809. }
  810. m_oInputStream.setown(new CouchbaseDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), m_pQcmd, m_nextParam));
  811. m_nextParam += m_oInputStream->numFields();
  812. }
  813. void CouchbaseEmbedFunctionContext::bindBooleanParam(const char *name, bool val)
  814. {
  815. checkNextParam(name);
  816. StringBuffer serialized;
  817. m_tokenSerializer.serialize(val, serialized);
  818. VStringBuffer cbPlaceholder("$%s", name);
  819. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  820. if (!status.success())
  821. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  822. }
  823. void CouchbaseEmbedFunctionContext::bindDataParam(const char *name, size32_t len, const void *val)
  824. {
  825. checkNextParam(name);
  826. VStringBuffer cbPlaceholder("$%s", name);
  827. size32_t bytes;
  828. void *data;
  829. rtlStrToDataX(bytes, data, len, val);
  830. auto status = m_pQcmd->named_param(cbPlaceholder.str(), (char *)data);
  831. if (!status.success())
  832. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), (char *)data);
  833. }
  834. void CouchbaseEmbedFunctionContext::bindFloatParam(const char *name, float val)
  835. {
  836. checkNextParam(name);
  837. StringBuffer serialized;
  838. m_tokenSerializer.serialize(val, serialized);
  839. VStringBuffer cbPlaceholder("$%s", name);
  840. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  841. if (!status.success())
  842. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  843. }
  844. void CouchbaseEmbedFunctionContext::bindRealParam(const char *name, double val)
  845. {
  846. checkNextParam(name);
  847. StringBuffer serialized;
  848. m_tokenSerializer.serialize(val, serialized);
  849. VStringBuffer cbPlaceholder("$%s", name);
  850. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  851. if (!status.success())
  852. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  853. }
  854. void CouchbaseEmbedFunctionContext::bindSignedSizeParam(const char *name, int size, __int64 val)
  855. {
  856. bindSignedParam(name, val);
  857. }
  858. void CouchbaseEmbedFunctionContext::bindSignedParam(const char *name, __int64 val)
  859. {
  860. checkNextParam(name);
  861. StringBuffer serialized;
  862. m_tokenSerializer.serialize(val, serialized);
  863. VStringBuffer cbPlaceholder("$%s", name);
  864. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  865. if (!status.success())
  866. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  867. }
  868. void CouchbaseEmbedFunctionContext::bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  869. {
  870. bindUnsignedParam(name, val);
  871. }
  872. void CouchbaseEmbedFunctionContext::bindUnsignedParam(const char *name, unsigned __int64 val)
  873. {
  874. checkNextParam(name);
  875. StringBuffer serialized;
  876. m_tokenSerializer.serialize(val, serialized);
  877. VStringBuffer cbPlaceholder("$%s", name);
  878. auto status = m_pQcmd->named_param(cbPlaceholder.str(), serialized.str());
  879. if (!status.success())
  880. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), serialized.str());
  881. }
  882. void CouchbaseEmbedFunctionContext::bindStringParam(const char *name, size32_t len, const char *val)
  883. {
  884. checkNextParam(name);
  885. VStringBuffer cbPlaceholder("$%s", name);
  886. size32_t utf8chars;
  887. char *utf8;
  888. rtlStrToUtf8X(utf8chars, utf8, len, val);
  889. auto status = m_pQcmd->named_param(cbPlaceholder.str(), utf8);
  890. if (!status.success())
  891. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  892. }
  893. void CouchbaseEmbedFunctionContext::bindVStringParam(const char *name, const char *val)
  894. {
  895. checkNextParam(name);
  896. bindStringParam(name, strlen(val), val);
  897. }
  898. void CouchbaseEmbedFunctionContext::bindUTF8Param(const char *name, size32_t chars, const char *val)
  899. {
  900. checkNextParam(name);
  901. bindStringParam(name, strlen(val), val);
  902. }
  903. void CouchbaseEmbedFunctionContext::bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  904. {
  905. checkNextParam(name);
  906. VStringBuffer cbPlaceholder("$%s", name);
  907. size32_t utf8chars;
  908. char *utf8;
  909. rtlUnicodeToUtf8X(utf8chars, utf8, chars, val);
  910. auto status = m_pQcmd->named_param(cbPlaceholder.str(), utf8);
  911. if (!status.success())
  912. failx("Could not bind Param: %s val: %s", cbPlaceholder.str(), utf8);
  913. }
  914. void CouchbaseEmbedFunctionContext::compileEmbeddedScript(size32_t chars, const char *script)
  915. {
  916. if (script && *script)
  917. {
  918. // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
  919. size32_t len = rtlUtf8Size(chars, script);
  920. if (len > 0)
  921. {
  922. StringAttr queryScript;
  923. queryScript.set(script, len);
  924. const char * terminatedScript = queryScript.get(); // Now null terminated
  925. if (m_pQcmd)
  926. delete m_pQcmd;
  927. m_pQcmd = new Couchbase::QueryCommand(terminatedScript);
  928. if ((m_scriptFlags & EFnoparams) == 0)
  929. m_numParams = countParameterPlaceholders(terminatedScript);
  930. else
  931. m_numParams = 0;
  932. }
  933. else
  934. failx("Empty N1QL query detected");
  935. }
  936. else
  937. failx("Empty N1QL query detected");
  938. }
  939. void CouchbaseEmbedFunctionContext::callFunction()
  940. {
  941. execute();
  942. }
  943. void CouchbaseEmbedFunctionContext::execute()
  944. {
  945. if (m_oInputStream)
  946. m_oInputStream->executeAll(m_oCBConnection);
  947. else
  948. {
  949. if (m_pQuery)
  950. delete m_pQuery;
  951. m_pQuery = m_oCBConnection->query(m_pQcmd);
  952. reportIfQueryFailure(m_pQuery);
  953. }
  954. }
  955. unsigned CouchbaseEmbedFunctionContext::checkNextParam(const char *name)
  956. {
  957. if (m_nextParam == m_numParams)
  958. failx("Too many parameters supplied: No matching $<name> placeholder for parameter %s", name);
  959. return m_nextParam++;
  960. }
  961. bool CouchbaseRowBuilder::getBooleanResult(const RtlFieldInfo *field)
  962. {
  963. const char * value = nextField(field);
  964. if (!value || !*value)
  965. {
  966. NullFieldProcessor p(field);
  967. return p.boolResult;
  968. }
  969. bool mybool;
  970. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mybool), "bool", value);
  971. return mybool;
  972. }
  973. void CouchbaseRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
  974. {
  975. const char * value = nextField(field);
  976. if (!value || !*value)
  977. {
  978. NullFieldProcessor p(field);
  979. rtlStrToDataX(len, result, p.resultChars, p.stringResult);
  980. return;
  981. }
  982. rtlStrToDataX(len, result, strlen(value), value); // This feels like it may not work to me - will preallocate rather larger than we want
  983. }
  984. double CouchbaseRowBuilder::getRealResult(const RtlFieldInfo *field)
  985. {
  986. const char * value = nextField(field);
  987. if (!value || !*value)
  988. {
  989. NullFieldProcessor p(field);
  990. return p.doubleResult;
  991. }
  992. double mydouble = 0.0;
  993. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, mydouble), "real", value);
  994. return mydouble;
  995. }
  996. __int64 CouchbaseRowBuilder::getSignedResult(const RtlFieldInfo *field)
  997. {
  998. const char * value = nextField(field);
  999. if (!value || !*value)
  1000. {
  1001. NullFieldProcessor p(field);
  1002. return p.uintResult;
  1003. }
  1004. __int64 myint64 = 0;
  1005. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myint64), "signed", value);
  1006. return myint64;
  1007. }
  1008. unsigned __int64 CouchbaseRowBuilder::getUnsignedResult(const RtlFieldInfo *field)
  1009. {
  1010. const char * value = nextField(field);
  1011. if (!value || !*value)
  1012. {
  1013. NullFieldProcessor p(field);
  1014. return p.uintResult;
  1015. }
  1016. unsigned __int64 myuint64 = 0;
  1017. couchbaseembed::handleDeserializeOutcome(m_tokenDeserializer.deserialize(value, myuint64), "unsigned", value);
  1018. return myuint64;
  1019. }
  1020. void CouchbaseRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
  1021. {
  1022. const char * value = nextField(field);
  1023. if (!value || !*value)
  1024. {
  1025. NullFieldProcessor p(field);
  1026. rtlStrToStrX(chars, result, p.resultChars, p.stringResult);
  1027. return;
  1028. }
  1029. unsigned numchars = rtlUtf8Length(strlen(value), value); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  1030. rtlUtf8ToStrX(chars, result, numchars, value);
  1031. return;
  1032. }
  1033. void CouchbaseRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  1034. {
  1035. getStringResult(field, chars, result);
  1036. return;
  1037. }
  1038. void CouchbaseRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  1039. {
  1040. const char * value = nextField(field);
  1041. if (!value || !*value)
  1042. {
  1043. NullFieldProcessor p(field);
  1044. rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
  1045. return;
  1046. }
  1047. unsigned numchars = rtlUtf8Length(strlen(value), value); // MORE - is it a good assumption that it is utf8 ? Depends how the database is configured I think
  1048. rtlUtf8ToUnicodeX(chars, result, numchars, value);
  1049. return;
  1050. }
  1051. void CouchbaseRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  1052. {
  1053. const char * dvalue = nextField(field);
  1054. if (!dvalue || !*dvalue)
  1055. {
  1056. NullFieldProcessor p(field);
  1057. value.set(p.decimalResult);
  1058. return;
  1059. }
  1060. size32_t chars;
  1061. rtlDataAttr result;
  1062. value.setString(strlen(dvalue), dvalue);
  1063. if (field)
  1064. {
  1065. RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *) field->type;
  1066. value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
  1067. }
  1068. }
  1069. void CouchbaseRowBuilder::processBeginSet(const RtlFieldInfo * field, bool &isAll)
  1070. {
  1071. isAll = false; // ALL not supported
  1072. StringBuffer xpath;
  1073. xpathOrName(xpath, field);
  1074. if (!xpath.isEmpty())
  1075. {
  1076. PathTracker newPathNode(xpath, CPNTSet);
  1077. StringBuffer newXPath;
  1078. constructNewXPath(newXPath, xpath.str());
  1079. newPathNode.childCount = m_oResultRow->getCount(newXPath);
  1080. m_pathStack.push_back(newPathNode);
  1081. }
  1082. else
  1083. {
  1084. failx("processBeginSet: Field name or xpath missing");
  1085. }
  1086. }
  1087. bool CouchbaseRowBuilder::processNextSet(const RtlFieldInfo * field)
  1088. {
  1089. return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount;
  1090. }
  1091. void CouchbaseRowBuilder::processBeginDataset(const RtlFieldInfo * field)
  1092. {
  1093. StringBuffer xpath;
  1094. xpathOrName(xpath, field);
  1095. if (!xpath.isEmpty())
  1096. {
  1097. PathTracker newPathNode(xpath, CPNTDataset);
  1098. StringBuffer newXPath;
  1099. constructNewXPath(newXPath, xpath.str());
  1100. newPathNode.childCount = m_oResultRow->getCount(newXPath);
  1101. m_pathStack.push_back(newPathNode);
  1102. }
  1103. else
  1104. {
  1105. failx("processBeginDataset: Field name or xpath missing");
  1106. }
  1107. }
  1108. void CouchbaseRowBuilder::processBeginRow(const RtlFieldInfo * field)
  1109. {
  1110. StringBuffer xpath;
  1111. xpathOrName(xpath, field);
  1112. if (!xpath.isEmpty())
  1113. {
  1114. if (strncmp(xpath.str(), "<nested row>", 12) == 0)
  1115. {
  1116. // Row within child dataset
  1117. if (m_pathStack.back().nodeType == CPNTDataset)
  1118. {
  1119. m_pathStack.back().currentChildIndex++;
  1120. }
  1121. else
  1122. {
  1123. failx("<nested row> received with no outer dataset designated");
  1124. }
  1125. }
  1126. else
  1127. {
  1128. m_pathStack.push_back(PathTracker(xpath, CPNTScalar));
  1129. }
  1130. }
  1131. else
  1132. {
  1133. failx("processBeginRow: Field name or xpath missing");
  1134. }
  1135. }
  1136. bool CouchbaseRowBuilder::processNextRow(const RtlFieldInfo * field)
  1137. {
  1138. return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount;
  1139. }
  1140. void CouchbaseRowBuilder::processEndSet(const RtlFieldInfo * field)
  1141. {
  1142. StringBuffer xpath;
  1143. xpathOrName(xpath, field);
  1144. if (!xpath.isEmpty() && !m_pathStack.empty() && strcmp(xpath.str(), m_pathStack.back().nodeName.str()) == 0)
  1145. {
  1146. m_pathStack.pop_back();
  1147. }
  1148. }
  1149. void CouchbaseRowBuilder::processEndDataset(const RtlFieldInfo * field)
  1150. {
  1151. StringBuffer xpath;
  1152. xpathOrName(xpath, field);
  1153. if (!xpath.isEmpty())
  1154. {
  1155. if (!m_pathStack.empty() && strcmp(xpath.str(), m_pathStack.back().nodeName.str()) == 0)
  1156. {
  1157. m_pathStack.pop_back();
  1158. }
  1159. }
  1160. else
  1161. {
  1162. failx("processEndDataset: Field name or xpath missing");
  1163. }
  1164. }
  1165. void CouchbaseRowBuilder::processEndRow(const RtlFieldInfo * field)
  1166. {
  1167. StringBuffer xpath;
  1168. xpathOrName(xpath, field);
  1169. if (!xpath.isEmpty())
  1170. {
  1171. if (!m_pathStack.empty())
  1172. {
  1173. if (m_pathStack.back().nodeType == CPNTDataset)
  1174. {
  1175. m_pathStack.back().childrenProcessed++;
  1176. }
  1177. else if (strcmp(xpath.str(), m_pathStack.back().nodeName.str()) == 0)
  1178. {
  1179. m_pathStack.pop_back();
  1180. }
  1181. }
  1182. }
  1183. else
  1184. {
  1185. failx("processEndRow: Field name or xpath missing");
  1186. }
  1187. }
  1188. const char * CouchbaseRowBuilder::nextField(const RtlFieldInfo * field)
  1189. {
  1190. StringBuffer xpath;
  1191. xpathOrName(xpath, field);
  1192. if (xpath.isEmpty())
  1193. {
  1194. failx("nextField: Field name or xpath missing");
  1195. }
  1196. StringBuffer fullXPath;
  1197. if (!m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet && strncmp(xpath.str(), "<set element>", 13) == 0)
  1198. {
  1199. m_pathStack.back().currentChildIndex++;
  1200. constructNewXPath(fullXPath, NULL);
  1201. m_pathStack.back().childrenProcessed++;
  1202. }
  1203. else
  1204. {
  1205. constructNewXPath(fullXPath, xpath.str());
  1206. }
  1207. return m_oResultRow->queryProp(fullXPath.str());
  1208. }
  1209. void CouchbaseRowBuilder::xpathOrName(StringBuffer & outXPath, const RtlFieldInfo * field) const
  1210. {
  1211. outXPath.clear();
  1212. if (field->xpath)
  1213. {
  1214. if (field->xpath[0] == xpathCompoundSeparatorChar)
  1215. {
  1216. outXPath.append(field->xpath + 1);
  1217. }
  1218. else
  1219. {
  1220. const char * sep = strchr(field->xpath, xpathCompoundSeparatorChar);
  1221. if (!sep)
  1222. {
  1223. outXPath.append(field->xpath);
  1224. }
  1225. else
  1226. {
  1227. outXPath.append(field->xpath, 0, static_cast<size32_t>(sep - field->xpath));
  1228. }
  1229. }
  1230. }
  1231. else
  1232. {
  1233. outXPath.append(str(field->name));
  1234. }
  1235. }
  1236. void CouchbaseRowBuilder::constructNewXPath(StringBuffer& outXPath, const char * nextNode) const
  1237. {
  1238. bool nextNodeIsFromRoot = (nextNode && *nextNode == '/');
  1239. outXPath.clear();
  1240. if (!nextNodeIsFromRoot)
  1241. {
  1242. // Build up full parent xpath using our previous components
  1243. for (std::vector<PathTracker>::const_iterator iter = m_pathStack.begin(); iter != m_pathStack.end(); iter++)
  1244. {
  1245. if (strncmp(iter->nodeName, "<row>", 5) != 0)
  1246. {
  1247. if (!outXPath.isEmpty())
  1248. {
  1249. outXPath.append("/");
  1250. }
  1251. outXPath.append(iter->nodeName);
  1252. if (iter->nodeType == CPNTDataset || iter->nodeType == CPNTSet)
  1253. {
  1254. outXPath.appendf("[%d]", iter->currentChildIndex);
  1255. }
  1256. }
  1257. }
  1258. }
  1259. if (nextNode && *nextNode)
  1260. {
  1261. if (!outXPath.isEmpty())
  1262. {
  1263. outXPath.append("/");
  1264. }
  1265. outXPath.append(nextNode);
  1266. }
  1267. }
  1268. class CouchbaseEmbedContext : public CInterfaceOf<IEmbedContext>
  1269. {
  1270. public:
  1271. virtual IEmbedFunctionContext * createFunctionContext(unsigned flags, const char *options)
  1272. {
  1273. return createFunctionContextEx(NULL, flags, options);
  1274. }
  1275. virtual IEmbedFunctionContext * createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
  1276. {
  1277. if (flags & EFimport)
  1278. {
  1279. UNSUPPORTED("IMPORT");
  1280. return nullptr;
  1281. }
  1282. else
  1283. return new CouchbaseEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), options, flags);
  1284. }
  1285. virtual IEmbedServiceContext * createServiceContext(const char *service, unsigned flags, const char *options)
  1286. {
  1287. throwUnexpected();
  1288. return nullptr;
  1289. }
  1290. };
  1291. extern DECL_EXPORT IEmbedContext* getEmbedContext()
  1292. {
  1293. return new CouchbaseEmbedContext();
  1294. }
  1295. extern DECL_EXPORT bool syntaxCheck(const char *script)
  1296. {
  1297. return true; // TO-DO
  1298. }
  1299. } // namespace
  1300. MODULE_INIT(INIT_PRIORITY_STANDARD)
  1301. {
  1302. couchbaseembed::connectionCache = nullptr;
  1303. couchbaseembed::connectionCacheExpirer = nullptr;
  1304. return true;
  1305. }
  1306. MODULE_EXIT()
  1307. {
  1308. // Delete the background thread expiring items from the CouchbaseConnection
  1309. // cache before deleting the connection cache
  1310. if (couchbaseembed::connectionCacheExpirer)
  1311. {
  1312. couchbaseembed::connectionCacheExpirer->stop();
  1313. delete(couchbaseembed::connectionCacheExpirer);
  1314. couchbaseembed::connectionCacheExpirer = nullptr;
  1315. }
  1316. if (couchbaseembed::connectionCache)
  1317. {
  1318. couchbaseembed::connectionCache->deleteAll();
  1319. delete(couchbaseembed::connectionCache);
  1320. couchbaseembed::connectionCache = nullptr;
  1321. }
  1322. }