couchbaseembed.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _COUCHBASEEMBED_INCL
  14. #define _COUCHBASEEMBED_INCL
  15. #ifdef COUCHBASEEMBED_PLUGIN_EXPORTS
  16. #define COUCHBASEEMBED_PLUGIN_API DECL_EXPORT
  17. #else
  18. #define COUCHBASEEMBED_PLUGIN_API DECL_IMPORT
  19. #endif
  20. //Using cpp wrapper from https://github.com/couchbaselabs/libcouchbase-cxx
  21. #include <libcouchbase/couchbase++.h>
  22. #include <libcouchbase/couchbase++/views.h>
  23. #include <libcouchbase/couchbase++/query.h>
  24. #include <libcouchbase/couchbase++/endure.h>
  25. #include <libcouchbase/couchbase++/logging.h>
  26. #include "platform.h"
  27. #include "jthread.hpp"
  28. #include "hqlplugins.hpp"
  29. #include "eclrtl_imp.hpp"
  30. #include "eclhelper.hpp"
  31. #include "rtlembed.hpp"
  32. #include "jptree.hpp"
  33. #include "tokenserialization.hpp"
  34. #include "rtlds_imp.hpp"
  35. #include "rtlfield.hpp"
  36. #include "roxiemem.hpp"
  37. #include <time.h>
  38. #include <vector>
  39. namespace couchbaseembed
  40. {
  41. extern void UNSUPPORTED(const char *feature) __attribute__((noreturn));
  42. extern void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
  43. extern void fail(const char *msg) __attribute__((noreturn));
  44. static void typeError(const char *expected, const char * fieldname)
  45. {
  46. VStringBuffer msg("Couchbase: type mismatch - %s expected", expected);
  47. if (fieldname && *fieldname)
  48. msg.appendf(" for field %s", fieldname);
  49. rtlFail(0, msg.str());
  50. }
  51. static void typeError(const char *expected, const RtlFieldInfo *field)
  52. {
  53. typeError(expected, field ? field->name : nullptr);
  54. }
  55. static int getNumFields(const RtlTypeInfo *record)
  56. {
  57. int count = 0;
  58. const RtlFieldInfo * const *fields = record->queryFields();
  59. assertex(fields);
  60. while (*fields++)
  61. count++;
  62. return count;
  63. }
  64. static void handleDeserializeOutcome(DeserializationResult resultcode, const char * targetype, const char * culpritvalue)
  65. {
  66. switch (resultcode)
  67. {
  68. case Deserialization_SUCCESS:
  69. break;
  70. case Deserialization_BAD_TYPE:
  71. failx("Deserialization error (%s): value cannot be const", targetype);
  72. break;
  73. case Deserialization_UNSUPPORTED:
  74. failx("Deserialization error (%s): encountered value type not supported", targetype);
  75. break;
  76. case Deserialization_INVALID_TOKEN:
  77. failx("Deserialization error (%s): token cannot be NULL, empty, or all whitespace", targetype);
  78. break;
  79. case Deserialization_NOT_A_NUMBER:
  80. failx("Deserialization error (%s): non-numeric characters found in numeric conversion: '%s'", targetype, culpritvalue);
  81. break;
  82. case Deserialization_OVERFLOW:
  83. failx("Deserialization error (%s): number too large to be represented by receiving value", targetype);
  84. break;
  85. case Deserialization_UNDERFLOW:
  86. failx("Deserialization error (%s): number too small to be represented by receiving value", targetype);
  87. break;
  88. default:
  89. typeError(targetype, culpritvalue);
  90. break;
  91. }
  92. }
  93. static const char * findUnquotedChar(const char *query, char searchFor)
  94. {
  95. // Note - returns pointer to char AFTER the first occurrence of searchFor outside of quotes
  96. char inStr = '\0';
  97. char ch;
  98. while ((ch = *query++) != 0)
  99. {
  100. if (ch == inStr)
  101. inStr = false;
  102. else switch (ch)
  103. {
  104. case '\'':
  105. case '"':
  106. inStr = ch;
  107. break;
  108. case '\\':
  109. if (inStr && *query)
  110. query++;
  111. break;
  112. case '/':
  113. if (!inStr)
  114. {
  115. if (*query=='/')
  116. {
  117. while (*query && *query != '\n')
  118. query++;
  119. }
  120. else if (*query=='*')
  121. {
  122. query++;
  123. for (;;)
  124. {
  125. if (!*query)
  126. fail("Unterminated comment in query string");
  127. if (*query=='*' && query[1]=='/')
  128. {
  129. query+= 2;
  130. break;
  131. }
  132. query++;
  133. }
  134. }
  135. }
  136. break;
  137. default:
  138. if (!inStr && ch==searchFor)
  139. return query;
  140. break;
  141. }
  142. }
  143. return NULL;
  144. }
  145. static unsigned countParameterPlaceholders(const char *query)
  146. {
  147. unsigned queryCount = 0;
  148. while ((query = findUnquotedChar(query, '$')) != NULL)
  149. queryCount++;
  150. return queryCount;
  151. }
  152. class CouchbaseRowStream : public RtlCInterface, implements IRowStream
  153. {
  154. public:
  155. CouchbaseRowStream(IEngineRowAllocator* _resultAllocator, Couchbase::Query * cbaseQuery);
  156. virtual ~CouchbaseRowStream();
  157. RTLIMPLEMENT_IINTERFACE
  158. virtual const void* nextRow();
  159. virtual void stop();
  160. private:
  161. Linked<IEngineRowAllocator> m_resultAllocator; //!< Pointer to allocator used when building result rows
  162. bool m_shouldRead; //!< If true, we should continue trying to read more messages
  163. StringArray m_Rows; //!< Local copy of result rows
  164. __int64 m_currentRow; //!< Current result row
  165. };
  166. class CouchbaseConnection : public CInterface
  167. {
  168. public:
  169. CouchbaseConnection(bool useSSL, const char * host, unsigned port, const char * bucketname, const char * password, const char * connOptions)
  170. {
  171. StringBuffer connectionString;
  172. makeConnectionString(useSSL, host, port, bucketname, connOptions, connectionString);
  173. m_pCouchbaseClient = new Couchbase::Client(connectionString.str(), password);
  174. timeLastUsed = 0;
  175. }
  176. CouchbaseConnection(const StringBuffer& connectionString, const char * password)
  177. {
  178. m_pCouchbaseClient = new Couchbase::Client(connectionString.str(), password);
  179. timeLastUsed = 0;
  180. }
  181. virtual ~CouchbaseConnection()
  182. {
  183. if (m_pCouchbaseClient)
  184. {
  185. delete m_pCouchbaseClient;
  186. m_pCouchbaseClient = nullptr;
  187. }
  188. }
  189. static void makeConnectionString(bool useSSL, const char * host, unsigned port, const char * bucketname, const char * connOptions, StringBuffer& out)
  190. {
  191. out.setf("couchbase%s://%s:%d/%s%s", useSSL ? "s" : "", host, port, bucketname, connOptions);
  192. }
  193. Couchbase::Query * query(Couchbase::QueryCommand * qcommand);
  194. inline void connect() { m_connectionStatus = m_pCouchbaseClient->connect(); }
  195. inline const Couchbase::Status& getConnectionStatus() const { return m_connectionStatus; }
  196. inline time_t getTimeTouched() const { return timeLastUsed; }
  197. inline void updateTimeTouched() {timeLastUsed = time(NULL); }
  198. private:
  199. Couchbase::Client * m_pCouchbaseClient;
  200. Couchbase::Status m_connectionStatus;
  201. CouchbaseConnection(const CouchbaseConnection &);
  202. time_t timeLastUsed;
  203. };
  204. enum PathNodeType {CPNTScalar, CPNTDataset, CPNTSet};
  205. struct PathTracker
  206. {
  207. StringBuffer nodeName;
  208. PathNodeType nodeType;
  209. unsigned int currentChildIndex;
  210. unsigned int childCount;
  211. unsigned int childrenProcessed;
  212. // Simple constructor
  213. PathTracker()
  214. {}
  215. // Constructor given node name and dataset bool
  216. PathTracker(const StringBuffer& _nodeName, PathNodeType _nodeType)
  217. : nodeName(_nodeName), nodeType(_nodeType), currentChildIndex(0), childCount(0), childrenProcessed(0)
  218. {}
  219. // Copy constructor
  220. PathTracker(const PathTracker& other)
  221. : nodeName(other.nodeName), nodeType(other.nodeType), currentChildIndex(other.currentChildIndex), childCount(other.childCount), childrenProcessed(other.childrenProcessed)
  222. {}
  223. };
  224. class CouchbaseRowBuilder : public CInterfaceOf<IFieldSource>
  225. {
  226. public:
  227. CouchbaseRowBuilder(IPropertyTree * resultrow)
  228. {
  229. m_oResultRow.set(resultrow);
  230. if (!m_oResultRow)
  231. failx("Missing result row data");
  232. m_pathStack.reserve(10);
  233. }
  234. virtual bool getBooleanResult(const RtlFieldInfo *field);
  235. virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result);
  236. virtual double getRealResult(const RtlFieldInfo *field);
  237. virtual __int64 getSignedResult(const RtlFieldInfo *field);
  238. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field);
  239. virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result);
  240. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result);
  241. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result);
  242. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value);
  243. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll);
  244. virtual bool processNextSet(const RtlFieldInfo * field);
  245. virtual void processBeginDataset(const RtlFieldInfo * field);
  246. virtual void processBeginRow(const RtlFieldInfo * field);
  247. virtual bool processNextRow(const RtlFieldInfo * field);
  248. virtual void processEndSet(const RtlFieldInfo * field);
  249. virtual void processEndDataset(const RtlFieldInfo * field);
  250. virtual void processEndRow(const RtlFieldInfo * field);
  251. protected:
  252. const char * nextField(const RtlFieldInfo * field);
  253. void xpathOrName(StringBuffer & outXPath, const RtlFieldInfo * field) const;
  254. void constructNewXPath(StringBuffer& outXPath, const char * nextNode) const;
  255. private:
  256. TokenDeserializer m_tokenDeserializer;
  257. Owned<IPropertyTree> m_oResultRow;
  258. std::vector<PathTracker> m_pathStack;
  259. };
  260. // Bind Couchbase columns from an ECL record
  261. class CouchbaseRecordBinder : public CInterfaceOf<IFieldProcessor>
  262. {
  263. public:
  264. CouchbaseRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, Couchbase::QueryCommand * _pQcmd, int _firstParam)
  265. : logctx(_logctx), typeInfo(_typeInfo), m_pQcmd(_pQcmd), firstParam(_firstParam), dummyField("<row>", NULL, typeInfo), thisParam(_firstParam) {}
  266. int numFields();
  267. void processRow(const byte *row);
  268. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field);
  269. virtual void processBool(bool value, const RtlFieldInfo * field);
  270. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field);
  271. virtual void processInt(__int64 value, const RtlFieldInfo * field);
  272. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field);
  273. virtual void processReal(double value, const RtlFieldInfo * field);
  274. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field);
  275. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
  276. {
  277. UNSUPPORTED("UNSIGNED decimals");
  278. }
  279. virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo * field);
  280. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field);
  281. virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field);
  282. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
  283. {
  284. UNSUPPORTED("SET");
  285. return false;
  286. }
  287. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
  288. {
  289. return false;
  290. }
  291. virtual bool processBeginRow(const RtlFieldInfo * field)
  292. {
  293. return true;
  294. }
  295. virtual void processEndSet(const RtlFieldInfo * field)
  296. {
  297. UNSUPPORTED("SET");
  298. }
  299. virtual void processEndDataset(const RtlFieldInfo * field)
  300. {
  301. UNSUPPORTED("DATASET");
  302. }
  303. virtual void processEndRow(const RtlFieldInfo * field)
  304. {
  305. }
  306. protected:
  307. inline unsigned checkNextParam(const RtlFieldInfo * field);
  308. const RtlTypeInfo *typeInfo;
  309. Couchbase::QueryCommand * m_pQcmd;
  310. const IContextLogger &logctx;
  311. int firstParam;
  312. RtlFieldStrInfo dummyField;
  313. int thisParam;
  314. TokenSerializer m_tokenSerializer;
  315. };
  316. class CouchbaseDatasetBinder : public CouchbaseRecordBinder
  317. {
  318. public:
  319. CouchbaseDatasetBinder(const IContextLogger &_logctx, IRowStream * _input, const RtlTypeInfo *_typeInfo, Couchbase::QueryCommand * _pQcmd, int _firstParam)
  320. : input(_input), CouchbaseRecordBinder(_logctx, _typeInfo, _pQcmd, _firstParam)
  321. {
  322. }
  323. bool bindNext()
  324. {
  325. roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
  326. if (!nextRow)
  327. return false;
  328. processRow((const byte *) nextRow.get()); // Bind the variables for the current row
  329. return true;
  330. }
  331. void executeAll(CouchbaseConnection * conn)
  332. {
  333. while (bindNext())
  334. {
  335. std::unique_ptr<Couchbase::Query> query(conn->query(m_pQcmd));
  336. if (query->meta().status().errcode() != LCB_SUCCESS )//rows.length() == 0)
  337. failx("Query execution error: %s", query->meta().body().to_string().c_str());
  338. //consider parsing json result
  339. if (strstr(query->meta().body().data(), "\"status\": \"errors\""))
  340. failx("Err: %s", query->meta().body().data());
  341. }
  342. }
  343. protected:
  344. Owned<IRowStream> input;
  345. };
  346. class CouchbaseEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
  347. {
  348. public:
  349. CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags);
  350. virtual ~CouchbaseEmbedFunctionContext();
  351. IPropertyTree * nextResultRowTree();
  352. IPropertyTreeIterator * nextResultRowIterator();
  353. const char * nextResultScalar();
  354. virtual bool getBooleanResult();
  355. virtual void getDataResult(size32_t &len, void * &result);
  356. virtual double getRealResult();
  357. virtual __int64 getSignedResult();
  358. virtual unsigned __int64 getUnsignedResult();
  359. virtual void getStringResult(size32_t &chars, char * &result);
  360. virtual void getUTF8Result(size32_t &chars, char * &result);
  361. virtual void getUnicodeResult(size32_t &chars, UChar * &result);
  362. virtual void getDecimalResult(Decimal &value);
  363. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
  364. {
  365. UNSUPPORTED("SET results");
  366. }
  367. virtual IRowStream * getDatasetResult(IEngineRowAllocator * _resultAllocator);
  368. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator);
  369. virtual size32_t getTransformResult(ARowBuilder & rowBuilder);
  370. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, const byte *val) override;
  371. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val);
  372. virtual void bindBooleanParam(const char *name, bool val);
  373. virtual void bindDataParam(const char *name, size32_t len, const void *val);
  374. virtual void bindFloatParam(const char *name, float val);
  375. virtual void bindRealParam(const char *name, double val);
  376. virtual void bindSignedSizeParam(const char *name, int size, __int64 val);
  377. virtual void bindSignedParam(const char *name, __int64 val);
  378. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val);
  379. virtual void bindUnsignedParam(const char *name, unsigned __int64 val);
  380. virtual void bindStringParam(const char *name, size32_t len, const char *val);
  381. virtual void bindVStringParam(const char *name, const char *val);
  382. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val);
  383. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val);
  384. virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  385. {
  386. UNSUPPORTED("SET parameters");
  387. }
  388. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  389. {
  390. return NULL;
  391. }
  392. virtual void paramWriterCommit(IInterface *writer)
  393. {
  394. UNSUPPORTED("paramWriterCommit");
  395. }
  396. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  397. {
  398. UNSUPPORTED("writeResult");
  399. }
  400. virtual void importFunction(size32_t lenChars, const char *text)
  401. {
  402. UNSUPPORTED("importFunction");
  403. }
  404. virtual void compileEmbeddedScript(size32_t chars, const char *script);
  405. virtual void callFunction();
  406. virtual void loadCompiledScript(size32_t chars, const void *_script) override
  407. {
  408. UNSUPPORTED("loadCompiledScript");
  409. }
  410. virtual void enter() override {}
  411. virtual void exit() override {}
  412. protected:
  413. void execute();
  414. unsigned countBindings(const char *query);
  415. const char * findUnquoted(const char *query, char searchFor);
  416. unsigned checkNextParam(const char *name);
  417. const IContextLogger &logctx;
  418. CouchbaseConnection * m_oCBConnection;
  419. Couchbase::Query * m_pQuery;
  420. Couchbase::QueryCommand * m_pQcmd;
  421. Owned<IPropertyTreeIterator> m_resultrow;
  422. StringArray m_Rows;
  423. int m_NextRow;
  424. Owned<CouchbaseDatasetBinder> m_oInputStream;
  425. TokenDeserializer m_tokenDeserializer;
  426. TokenSerializer m_tokenSerializer;
  427. unsigned m_nextParam;
  428. unsigned m_numParams;
  429. unsigned m_scriptFlags;
  430. };
  431. } // couchbaseembed namespace
  432. #endif