Rembed.cpp 48 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the GPL, Version 2.0 or later
  4. you may not use this file except in compliance with the License.
  5. Unless required by applicable law or agreed to in writing, software
  6. distributed under the License is distributed on an "AS IS" BASIS,
  7. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  8. See the License for the specific language governing permissions and
  9. limitations under the License.
  10. ############################################################################## */
  11. #include "platform.h"
  12. #ifdef RCPP_HEADER_ONLY
  13. // NOTE - these symbols need to be hidden from being exported from the Rembed .so file as RInside tries to dynamically
  14. // load them from Rcpp.so
  15. // If future versions of Rcpp add any (in Rcpp/routines.h) they may need to be added here too.
  16. #define type2name HIDE_RCPP_type2name
  17. #define enterRNGScope HIDE_RCPP_enterRNGScope
  18. #define exitRNGScope HIDE_RCPP_exitRNGScope
  19. #define get_string_buffer HIDE_RCPP_get_string_buffer
  20. #define get_Rcpp_namespace HIDE_RCPP_get_Rcpp_namespace
  21. #define mktime00 HIDE_RCPP_mktime00_
  22. #define gmtime_ HIDE_RCPP_gmtime_
  23. #define rcpp_get_stack_trace HIDE_RCPP_rcpp_get_stack_trace
  24. #define rcpp_set_stack_trace HIDE_RCPP_rcpp_set_stack_trace
  25. #define demangle HIDE_RCPP_demangle
  26. #define short_file_name HIDE_RCPP_short_file_name
  27. #define stack_trace HIDE_RCPP_stack_trace
  28. #define get_string_elt HIDE_RCPP_get_string_elt
  29. #define char_get_string_elt HIDE_RCPP_char_get_string_elt
  30. #define set_string_elt HIDE_RCPP_set_string_elt
  31. #define char_set_string_elt HIDE_RCPP_char_set_string_elt
  32. #define get_string_ptr HIDE_RCPP_get_string_ptr
  33. #define get_vector_elt HIDE_RCPP_get_vector_elt
  34. #define set_vector_elt HIDE_RCPP_set_vector_elt
  35. #define get_vector_ptr HIDE_RCPP_get_vector_ptr
  36. #define char_nocheck HIDE_RCPP_char_nocheck
  37. #define dataptr HIDE_RCPP_dataptr
  38. #define getCurrentScope HIDE_RCPP_getCurrentScope
  39. #define setCurrentScope HIDE_RCPP_setCurrentScope
  40. #define get_cache HIDE_RCPP_get_cache
  41. #define reset_current_error HIDE_RCPP_reset_current_error
  42. #define error_occured HIDE_RCPP_error_occured
  43. #define rcpp_get_current_error HIDE_RCPP_rcpp_get_current_error
  44. #endif
  45. #include "RInside.h"
  46. #include "Rinterface.h"
  47. #include "jexcept.hpp"
  48. #include "jthread.hpp"
  49. #include "hqlplugins.hpp"
  50. #include "deftype.hpp"
  51. #include "eclrtl.hpp"
  52. #include "eclrtl_imp.hpp"
  53. #include "rtlds_imp.hpp"
  54. #include "rtlfield.hpp"
  55. #include "nbcd.hpp"
  56. #include "enginecontext.hpp"
  57. static const char * compatibleVersions[] =
  58. { "R Embed Helper 1.0.0", NULL };
  59. static const char *version = "R Embed Helper 1.0.0";
  60. extern "C" DECL_EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  61. {
  62. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  63. {
  64. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  65. pbx->compatibleVersions = compatibleVersions;
  66. }
  67. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  68. return false;
  69. pb->magicVersion = PLUGIN_VERSION;
  70. pb->version = version;
  71. pb->moduleName = "+R+"; // Hack - we don't want to export any ECL, but if we don't export something,
  72. pb->ECL = ""; // Hack - the dll is unloaded at startup when compiling, and the R runtime closes stdin when unloaded
  73. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  74. pb->description = "R Embed Helper";
  75. return true;
  76. }
  77. #ifdef _WIN32
  78. EXTERN_C IMAGE_DOS_HEADER __ImageBase;
  79. #endif
  80. #define UNSUPPORTED(feature) throw MakeStringException(MSGAUD_user, 0, "Rembed: UNSUPPORTED feature: %s", feature)
  81. #define FAIL(msg) throw MakeStringException(MSGAUD_user, 0, "Rembed: Rcpp error: %s", msg)
  82. using Rcpp::_;
  83. namespace Rembed
  84. {
  85. // Copied from Rcpp 0.12.15's meat/Environment.h, in case an older version of Rcpp is in use
  86. inline Rcpp::Environment _new_env(SEXP parent, int size = 29) {
  87. Rcpp::Function newEnv("new.env", R_BaseNamespace);
  88. return newEnv(_["size"] = size, _["parent"] = parent);
  89. }
  90. __declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
  91. static void failx(const char *message, ...)
  92. {
  93. va_list args;
  94. va_start(args,message);
  95. StringBuffer msg;
  96. msg.append("rembed: ").valist_appendf(message,args);
  97. va_end(args);
  98. rtlFail(0, msg.str());
  99. }
  100. class OwnedRoxieRowSet : public ConstPointerArray
  101. {
  102. public:
  103. ~OwnedRoxieRowSet()
  104. {
  105. ForEachItemIn(idx, *this)
  106. rtlReleaseRow(item(idx));
  107. }
  108. };
  109. class REnvironment : public CInterfaceOf<IInterface>
  110. {
  111. public:
  112. inline REnvironment(Rcpp::Environment _env)
  113. : env(_env)
  114. {
  115. }
  116. inline Rcpp::Environment &query()
  117. {
  118. return env;
  119. }
  120. private:
  121. REnvironment(const REnvironment &);
  122. Rcpp::Environment env;
  123. };
  124. // Use a global object to ensure that the R instance is initialized only once
  125. // Because of R's dodgy stack checks, we also have to do so on main thread
  126. static class RGlobalState
  127. {
  128. public:
  129. RGlobalState()
  130. {
  131. const char *args[] = {"R", "--slave" };
  132. R = new RInside(2, args, true, false, true); // Setting interactive mode=true prevents R syntax errors from terminating the process
  133. // The R code for checking stack limits assumes that all calls are on the same thread
  134. // as the original context was created on - this will not always be true in ECL (and hardly
  135. // ever true in Roxie
  136. // Setting the stack limit to -1 disables this check
  137. R_CStackLimit = -1;
  138. // Make sure we are never unloaded (as R does not support it)
  139. // we do this by doing a dynamic load of the Rembed library
  140. #ifdef _WIN32
  141. char path[_MAX_PATH];
  142. ::GetModuleFileName((HINSTANCE)&__ImageBase, path, _MAX_PATH);
  143. if (strstr(path, "Rembed"))
  144. {
  145. HINSTANCE h = LoadSharedObject(path, false, false);
  146. DBGLOG("LoadSharedObject returned %p", h);
  147. }
  148. #else
  149. StringBuffer modname;
  150. if (findLoadedModule(modname, "Rembed"))
  151. {
  152. HINSTANCE h = LoadSharedObject(modname, false, false);
  153. // Deliberately leak this handle
  154. }
  155. #endif
  156. }
  157. ~RGlobalState()
  158. {
  159. delete R;
  160. }
  161. REnvironment *getNamedScope(const char *key, bool &isNew)
  162. {
  163. Linked<REnvironment> ret = preservedScopes.getValue(key);
  164. if (!ret)
  165. {
  166. ret.setown(new REnvironment(_new_env(Rcpp::Environment::global_env())));
  167. preservedScopes.setValue(key, ret); // NOTE - links arg
  168. isNew = true;
  169. }
  170. else
  171. isNew = false;
  172. return ret.getClear();
  173. }
  174. void releaseNamedScope(const char *key)
  175. {
  176. preservedScopes.remove(key);
  177. }
  178. static void unregister(const char *key);
  179. RInside *R;
  180. private:
  181. MapStringToMyClass<REnvironment> preservedScopes;
  182. }* globalState = NULL;
  183. static CriticalSection RCrit; // R is single threaded - need to own this before making any call to R
  184. void RGlobalState::unregister(const char *key)
  185. {
  186. CriticalBlock b(RCrit);
  187. if (globalState)
  188. globalState->releaseNamedScope(key);
  189. }
  190. static RGlobalState *queryGlobalState()
  191. {
  192. CriticalBlock b(RCrit);
  193. if (!globalState)
  194. globalState = new RGlobalState;
  195. return globalState;
  196. }
  197. extern void unload()
  198. {
  199. CriticalBlock b(RCrit);
  200. if (globalState)
  201. delete globalState;
  202. globalState = NULL;
  203. }
  204. MODULE_INIT(INIT_PRIORITY_STANDARD)
  205. {
  206. queryGlobalState(); // make sure gets loaded by main thread
  207. return true;
  208. }
  209. MODULE_EXIT()
  210. {
  211. // Don't unload, because R seems to have problems with being reloaded, i.e. crashes on next use
  212. // unload();
  213. }
  214. static void getFieldNames(Rcpp::CharacterVector &namevec, const RtlTypeInfo *typeInfo)
  215. {
  216. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  217. while (*fields)
  218. {
  219. const RtlFieldInfo *child = *fields;
  220. // MORE - nested records may make this interesting
  221. namevec.push_back(child->name);
  222. fields++;
  223. }
  224. }
  225. /*
  226. * Create a blank dataframe of the specified size, ready to fill with data from an ECL dataset
  227. */
  228. static Rcpp::DataFrame createDataFrame(const RtlTypeInfo *typeInfo, unsigned numRows)
  229. {
  230. Rcpp::CharacterVector namevec;
  231. getFieldNames(namevec, typeInfo);
  232. Rcpp::List frame(namevec.length()); // Number of columns
  233. frame.attr("names") = namevec;
  234. const RtlFieldInfo * const *fields = typeInfo->queryFields();
  235. for (int i=0; i< frame.length(); i++)
  236. {
  237. assertex(*fields);
  238. const RtlFieldInfo *child = *fields;
  239. switch (child->type->getType())
  240. {
  241. case type_boolean:
  242. {
  243. Rcpp::LogicalVector column(numRows);
  244. frame[i] = column;
  245. break;
  246. }
  247. case type_int:
  248. {
  249. Rcpp::IntegerVector column(numRows);
  250. frame[i] = column;
  251. break;
  252. }
  253. case type_real:
  254. case type_decimal:
  255. {
  256. Rcpp::NumericVector column(numRows);
  257. frame[i] = column;
  258. break;
  259. }
  260. case type_string:
  261. case type_varstring:
  262. {
  263. Rcpp::StringVector column(numRows);
  264. frame[i] = column;
  265. break;
  266. }
  267. default:
  268. {
  269. Rcpp::List column(numRows);
  270. frame[i] = column;
  271. break;
  272. }
  273. }
  274. fields++;
  275. }
  276. Rcpp::StringVector row_names(numRows);
  277. for (unsigned row = 0; row < numRows; row++)
  278. {
  279. StringBuffer rowname;
  280. rowname.append(row+1);
  281. row_names(row) = rowname.str();
  282. }
  283. frame.attr("class") = "data.frame";
  284. frame.attr("row.names") = row_names;
  285. return frame;
  286. }
  287. /*
  288. * Create a blank list of the specified type, ready to fill with data from an ECL record
  289. */
  290. static Rcpp::List createList(const RtlTypeInfo *typeInfo)
  291. {
  292. Rcpp::CharacterVector namevec;
  293. getFieldNames(namevec, typeInfo);
  294. Rcpp::List childRec(namevec.length());
  295. childRec.attr("names") = namevec;
  296. return childRec;
  297. }
  298. // A RDataFrameAppender object is used append a row to a R dataFrame from an ECL row
  299. class RDataFrameAppender : public CInterfaceOf<IFieldProcessor>
  300. {
  301. public:
  302. RDataFrameAppender(Rcpp::DataFrame &_frame)
  303. {
  304. stack.append(*new DataFramePosition(_frame));
  305. }
  306. RDataFrameAppender(Rcpp::List &_list)
  307. {
  308. stack.append(*new ListPosition(_list, nullptr));
  309. }
  310. virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field) override
  311. {
  312. std::string s(value, len);
  313. if (inSet)
  314. theStringSet[setIndex++] = s;
  315. else
  316. stack.tos().setString(s);
  317. }
  318. virtual void processBool(bool value, const RtlFieldInfo * field) override
  319. {
  320. if (inSet)
  321. theBoolSet[setIndex++] = value;
  322. else
  323. stack.tos().setBool(value);
  324. }
  325. virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field) override
  326. {
  327. std::vector<byte> vval;
  328. const byte *cval = (const byte *) value;
  329. vval.assign(cval, cval+len);
  330. unsigned r;
  331. Rcpp::List l = stack.tos().cell(r);
  332. l[r] = vval;
  333. }
  334. virtual void processInt(__int64 value, const RtlFieldInfo * field) override
  335. {
  336. if (inSet)
  337. theIntSet[setIndex++] = (long int) value;
  338. else
  339. stack.tos().setInt(value);
  340. }
  341. virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field) override
  342. {
  343. if (inSet)
  344. theIntSet[setIndex++] = (unsigned long int) value;
  345. else
  346. stack.tos().setUInt(value);
  347. }
  348. virtual void processReal(double value, const RtlFieldInfo * field) override
  349. {
  350. if (inSet)
  351. theRealSet[setIndex++] = value;
  352. else
  353. stack.tos().setReal(value);
  354. }
  355. virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field) override
  356. {
  357. Decimal val;
  358. val.setDecimal(digits, precision, value);
  359. if (inSet)
  360. theRealSet[setIndex++] = val.getReal();
  361. else
  362. stack.tos().setReal(val.getReal());
  363. }
  364. virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field) override
  365. {
  366. Decimal val;
  367. val.setUDecimal(digits, precision, value);
  368. if (inSet)
  369. theRealSet[setIndex++] = val.getReal();
  370. else
  371. stack.tos().setReal(val.getReal());
  372. }
  373. virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field) override
  374. {
  375. UNSUPPORTED("Unicode/UTF8 fields");
  376. }
  377. virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field) override
  378. {
  379. size32_t charCount;
  380. rtlDataAttr text;
  381. rtlQStrToStrX(charCount, text.refstr(), len, value);
  382. processString(charCount, text.getstr(), field);
  383. }
  384. virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field) override
  385. {
  386. UNSUPPORTED("Unicode/UTF8 fields");
  387. }
  388. virtual bool processBeginSet(const RtlFieldInfo * field, unsigned elements, bool isAll, const byte *data) override
  389. {
  390. if (isAll)
  391. UNSUPPORTED("ALL sets are not supported");
  392. unsigned r;
  393. Rcpp::List l = stack.tos().cell(r);
  394. switch (field->type->queryChildType()->fieldType & RFTMkind)
  395. {
  396. case type_boolean:
  397. theBoolSet = Rcpp::LogicalVector(elements);
  398. l[r] = theBoolSet;
  399. break;
  400. case type_int:
  401. theIntSet = Rcpp::IntegerVector(elements);
  402. l[r] = theIntSet;
  403. break;
  404. case type_decimal:
  405. case type_real:
  406. theRealSet = Rcpp::NumericVector(elements);
  407. l[r] = theRealSet;
  408. break;
  409. case type_string:
  410. case type_varstring:
  411. theStringSet = Rcpp::StringVector(elements);
  412. l[r] = theStringSet;
  413. break;
  414. default:
  415. UNSUPPORTED("SET types other than BOOLEAN, STRING, INTEGER and REAL");
  416. }
  417. setIndex = 0;
  418. inSet = true;
  419. return true;
  420. }
  421. virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned rows) override
  422. {
  423. Rcpp::DataFrame myFrame = createDataFrame(field->type->queryChildType(), rows);
  424. unsigned r;
  425. Rcpp::List l = stack.tos().cell(r);
  426. l[r] = myFrame;
  427. push(myFrame);
  428. firstField = true;
  429. return true;
  430. }
  431. virtual bool processBeginRow(const RtlFieldInfo * field) override
  432. {
  433. // We see this at the start of each row in a child dataset, but also at the start of a nested record
  434. // If the field is the outer field, ignore...
  435. if (firstField)
  436. firstField = false;
  437. else
  438. {
  439. Rcpp::List childRec = createList(field->type);
  440. unsigned r;
  441. Rcpp::List l = stack.tos().cell(r);
  442. l[r] = childRec;
  443. stack.append(*new ListPosition(childRec, field));
  444. }
  445. stack.tos().nextRow();
  446. return true;
  447. }
  448. virtual void processEndSet(const RtlFieldInfo * field) override
  449. {
  450. inSet = false;
  451. }
  452. virtual void processEndDataset(const RtlFieldInfo * field) override
  453. {
  454. pop();
  455. }
  456. virtual void processEndRow(const RtlFieldInfo * field) override
  457. {
  458. if (stack.tos().isNestedRow(field))
  459. pop();
  460. else
  461. firstField = true;
  462. }
  463. protected:
  464. interface IDataListPosition : public IInterface
  465. {
  466. virtual Rcpp::List cell(unsigned &row) = 0;
  467. virtual void setBool(bool value) = 0;
  468. virtual void setInt(__int64 value) = 0;
  469. virtual void setUInt(unsigned __int64 value) = 0;
  470. virtual void setReal(double value) = 0;
  471. virtual void setString(const std::string &s) = 0;
  472. virtual void nextRow() = 0;
  473. virtual bool isNestedRow(const RtlFieldInfo *_field) const = 0;
  474. };
  475. class DataFramePosition : public CInterfaceOf<IDataListPosition>
  476. {
  477. public:
  478. DataFramePosition(Rcpp::DataFrame _frame) : frame(_frame) {}
  479. virtual Rcpp::List cell(unsigned &row) override
  480. {
  481. row = rowIdx-1; // nextRow is called before the first row, so rowIdx is 1-based
  482. curCell = frame[colIdx++];
  483. return curCell;
  484. }
  485. virtual void setBool(bool value) override
  486. {
  487. unsigned row = rowIdx-1; // nextRow is called before the first row, so rowIdx is 1-based
  488. Rcpp::LogicalVector l = frame[colIdx++];
  489. l[row] = value;
  490. }
  491. virtual void setInt(__int64 value) override
  492. {
  493. unsigned row = rowIdx-1;
  494. Rcpp::IntegerVector l = frame[colIdx++];
  495. l[row] = (long int) value; // Rcpp does not support int64
  496. }
  497. virtual void setUInt(unsigned __int64 value) override
  498. {
  499. unsigned row = rowIdx-1;
  500. Rcpp::IntegerVector l = frame[colIdx++];
  501. l[row] = (unsigned long int) value; // Rcpp does not support int64
  502. }
  503. virtual void setReal(double value) override
  504. {
  505. unsigned row = rowIdx-1;
  506. Rcpp::NumericVector l = frame[colIdx++];
  507. l[row] = value;
  508. }
  509. virtual void setString(const std::string &value) override
  510. {
  511. unsigned row = rowIdx-1;
  512. Rcpp::StringVector l = frame[colIdx++];
  513. l[row] = value;
  514. }
  515. virtual void nextRow() override
  516. {
  517. rowIdx++;
  518. colIdx = 0;
  519. }
  520. bool isNestedRow(const RtlFieldInfo *_field) const override
  521. {
  522. return false;
  523. }
  524. private:
  525. unsigned rowIdx = 0;
  526. unsigned colIdx = 0;
  527. Rcpp::DataFrame frame;
  528. Rcpp::List curCell;
  529. };
  530. class ListPosition : public CInterfaceOf<IDataListPosition>
  531. {
  532. public:
  533. ListPosition(Rcpp::List _list, const RtlFieldInfo *_field)
  534. : list(_list), field(_field)
  535. {}
  536. virtual Rcpp::List cell(unsigned &row) override
  537. {
  538. row = colIdx++;
  539. return list;
  540. }
  541. virtual void setBool(bool value) override
  542. {
  543. list[colIdx++] = value;
  544. }
  545. virtual void setInt(__int64 value) override
  546. {
  547. list[colIdx++] = (long int) value; // Rcpp does not support int64
  548. }
  549. virtual void setUInt(unsigned __int64 value) override
  550. {
  551. list[colIdx++] = (unsigned long int) value; // Rcpp does not support int64
  552. }
  553. virtual void setReal(double value) override
  554. {
  555. list[colIdx++] = value;
  556. }
  557. virtual void setString(const std::string &value) override
  558. {
  559. list[colIdx++] = value;
  560. }
  561. virtual void nextRow() override
  562. {
  563. colIdx = 0;
  564. }
  565. virtual bool isNestedRow(const RtlFieldInfo *_field) const override
  566. {
  567. return field==_field;
  568. }
  569. private:
  570. unsigned colIdx = 0;
  571. Rcpp::List list;
  572. const RtlFieldInfo *field;
  573. };
  574. void push(Rcpp::DataFrame frame)
  575. {
  576. stack.append(*new DataFramePosition(frame));
  577. }
  578. void pop()
  579. {
  580. stack.pop();
  581. }
  582. IArrayOf<IDataListPosition> stack;
  583. Rcpp::IntegerVector theIntSet;
  584. Rcpp::StringVector theStringSet;
  585. Rcpp::NumericVector theRealSet;
  586. Rcpp::LogicalVector theBoolSet;
  587. bool firstField = true;
  588. bool inSet = false;
  589. unsigned setIndex = 0;
  590. };
  591. // A RRowBuilder object is used to construct ECL rows from R dataframes or lists
  592. class RRowBuilder : public CInterfaceOf<IFieldSource>
  593. {
  594. public:
  595. RRowBuilder(Rcpp::DataFrame &_frame, const RtlFieldInfo *_outerRow)
  596. : outerRow(_outerRow)
  597. {
  598. stack.append(*new RowState(_frame));
  599. }
  600. RRowBuilder(Rcpp::List &_list, const RtlFieldInfo *_outerRow)
  601. : outerRow(_outerRow)
  602. {
  603. stack.append(*new ListState(_list, nullptr));
  604. }
  605. virtual bool getBooleanResult(const RtlFieldInfo *field)
  606. {
  607. nextField(field);
  608. return ::Rcpp::as<bool>(elem);
  609. }
  610. virtual void getDataResult(const RtlFieldInfo *field, size32_t &__len, void * &__result)
  611. {
  612. nextField(field);
  613. std::vector<byte> vval = ::Rcpp::as<std::vector<byte> >(elem);
  614. rtlStrToDataX(__len, __result, vval.size(), vval.data());
  615. }
  616. virtual double getRealResult(const RtlFieldInfo *field)
  617. {
  618. nextField(field);
  619. return ::Rcpp::as<double>(elem);
  620. }
  621. virtual __int64 getSignedResult(const RtlFieldInfo *field)
  622. {
  623. nextField(field);
  624. return ::Rcpp::as<long int>(elem); // Should really be long long, but RInside does not support that
  625. }
  626. virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
  627. {
  628. nextField(field);
  629. return ::Rcpp::as<unsigned long int>(elem); // Should really be long long, but RInside does not support that
  630. }
  631. virtual void getStringResult(const RtlFieldInfo *field, size32_t &__len, char * &__result)
  632. {
  633. nextField(field);
  634. std::string str = ::Rcpp::as<std::string>(elem);
  635. rtlStrToStrX(__len, __result, str.length(), str.data());
  636. }
  637. virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
  638. {
  639. UNSUPPORTED("Unicode/UTF8 fields");
  640. }
  641. virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
  642. {
  643. UNSUPPORTED("Unicode/UTF8 fields");
  644. }
  645. virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
  646. {
  647. nextField(field);
  648. double ret = ::Rcpp::as<double>(elem);
  649. value.setReal(ret);
  650. }
  651. virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
  652. {
  653. nextField(field);
  654. isAll = false; // No concept of an 'all' set in R
  655. Rcpp::List childrec = ::Rcpp::as<Rcpp::List>(elem); // MORE - is converting it to a list inefficient? Keeps the code simpler!
  656. stack.append(*new ListState(childrec, field));
  657. }
  658. virtual bool processNextSet(const RtlFieldInfo * field)
  659. {
  660. return stack.tos().moreFields();
  661. }
  662. virtual void processBeginDataset(const RtlFieldInfo * field)
  663. {
  664. nextField(field);
  665. push();
  666. }
  667. virtual void processBeginRow(const RtlFieldInfo * field)
  668. {
  669. // We see this at the start of each row in a child dataset, but also at the start of a nested record
  670. // We want to ignore it if we are expecting the former case...
  671. if (firstField)
  672. firstField = false;
  673. else
  674. {
  675. nextField(field);
  676. Rcpp::List childrec = ::Rcpp::as<Rcpp::List>(elem);
  677. stack.append(*new ListState(childrec, field));
  678. }
  679. }
  680. virtual bool processNextRow(const RtlFieldInfo * field)
  681. {
  682. firstField = true;
  683. IRowState &cur = stack.tos();
  684. return stack.tos().processNextRow();
  685. }
  686. virtual void processEndSet(const RtlFieldInfo * field)
  687. {
  688. pop();
  689. }
  690. virtual void processEndDataset(const RtlFieldInfo * field)
  691. {
  692. pop();
  693. }
  694. virtual void processEndRow(const RtlFieldInfo * field)
  695. {
  696. if (stack.tos().isNestedRow(field))
  697. pop();
  698. }
  699. protected:
  700. interface IRowState : public IInterface
  701. {
  702. virtual Rcpp::RObject nextField() = 0;
  703. virtual bool processNextRow() = 0;
  704. virtual bool isNestedRow(const RtlFieldInfo *_field) const = 0;
  705. virtual bool moreFields() const = 0;
  706. };
  707. class RowState : public CInterfaceOf<IRowState>
  708. {
  709. public:
  710. RowState(Rcpp::DataFrame _frame) : frame(_frame)
  711. {
  712. /* these functions have been renamed in Rcpp 0.2.10, but the old names still work... */
  713. numRows = frame.nrows();
  714. numCols = frame.length();
  715. }
  716. bool moreFields() const override
  717. {
  718. return colIdx < numCols;
  719. }
  720. Rcpp::RObject nextField() override
  721. {
  722. assertex(colIdx < numCols && rowIdx-1 < numRows);
  723. Rcpp::RObject colObject = frame[colIdx];
  724. Rcpp::List column = ::Rcpp::as<Rcpp::List>(colObject); // MORE - this can crash if wrong type came from R. But I can't work out how to test that
  725. Rcpp::RObject elem = column[rowIdx-1]; // processNextRow gets called before first row, so it's 1-based
  726. colIdx++;
  727. return elem;
  728. }
  729. bool processNextRow() override
  730. {
  731. if (rowIdx < numRows)
  732. {
  733. rowIdx++;
  734. colIdx = 0;
  735. return true;
  736. }
  737. return false;
  738. }
  739. bool isNestedRow(const RtlFieldInfo *_field) const override
  740. {
  741. return false;
  742. }
  743. private:
  744. Rcpp::DataFrame frame;
  745. unsigned rowIdx = 0;
  746. unsigned colIdx = 0;
  747. unsigned numRows = 0;
  748. unsigned numCols = 0;
  749. };
  750. class ListState : public CInterfaceOf<IRowState>
  751. {
  752. public:
  753. ListState(Rcpp::List _list, const RtlFieldInfo *_field) : list(_list), field(_field)
  754. {
  755. numCols = list.length();
  756. }
  757. Rcpp::RObject nextField() override
  758. {
  759. assertex (colIdx < numCols);
  760. Rcpp::RObject elem = list[colIdx];
  761. colIdx++;
  762. return elem;
  763. }
  764. bool moreFields() const override
  765. {
  766. return colIdx < numCols;
  767. }
  768. bool processNextRow() override
  769. {
  770. throwUnexpected();
  771. }
  772. bool isNestedRow(const RtlFieldInfo *_field) const override
  773. {
  774. return field==_field;
  775. }
  776. private:
  777. Rcpp::List list;
  778. const RtlFieldInfo *field;
  779. unsigned colIdx = 0;
  780. unsigned numCols = 0;
  781. };
  782. void nextField(const RtlFieldInfo * field)
  783. {
  784. // NOTE - we could put support for looking up columns by name here, but for efficiency reasons we only support matching by position
  785. IRowState &cur = stack.tos();
  786. elem = cur.nextField();
  787. }
  788. void push()
  789. {
  790. stack.append(*new RowState(::Rcpp::as<Rcpp::DataFrame>(elem)));
  791. }
  792. void pop()
  793. {
  794. stack.pop();
  795. }
  796. IArrayOf<IRowState> stack;
  797. Rcpp::RObject elem;
  798. const RtlFieldInfo *outerRow;
  799. bool firstField = true;
  800. };
  801. static size32_t getRowResult(RInside::Proxy &result, ARowBuilder &builder)
  802. {
  803. // To return a single row, we expect a list...
  804. Rcpp::List row = ::Rcpp::as<Rcpp::List>(result);
  805. const RtlTypeInfo *typeInfo = builder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
  806. assertex(typeInfo);
  807. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  808. RRowBuilder myRRowBuilder(row, &dummyField);
  809. return typeInfo->build(builder, 0, &dummyField, myRRowBuilder);
  810. }
  811. // A R function that returns a dataset will return a RRowStream object that can be
  812. // interrogated to return each row of the result in turn
  813. class RRowStream : public CInterfaceOf<IRowStream>
  814. {
  815. public:
  816. RRowStream(RInside::Proxy &_result, IEngineRowAllocator *_resultAllocator, const RtlTypeInfo *_typeInfo)
  817. : dFrame(::Rcpp::as<Rcpp::DataFrame>(_result)),
  818. resultAllocator(_resultAllocator),
  819. typeInfo(_typeInfo),
  820. dummyField("<row>", NULL, typeInfo),
  821. myRRowBuilder(dFrame, &dummyField)
  822. {
  823. }
  824. virtual const void *nextRow()
  825. {
  826. CriticalBlock b(RCrit);
  827. if (!resultAllocator)
  828. return NULL;
  829. try
  830. {
  831. if (!myRRowBuilder.processNextRow(&dummyField))
  832. {
  833. stop();
  834. return NULL;
  835. }
  836. RtlDynamicRowBuilder builder(resultAllocator);
  837. size32_t len = typeInfo->build(builder, 0, &dummyField, myRRowBuilder);
  838. return builder.finalizeRowClear(len);
  839. }
  840. catch (std::exception &E)
  841. {
  842. FAIL(E.what());
  843. }
  844. }
  845. virtual void stop()
  846. {
  847. resultAllocator.clear();
  848. }
  849. protected:
  850. Rcpp::DataFrame dFrame;
  851. Linked<IEngineRowAllocator> resultAllocator;
  852. const RtlTypeInfo *typeInfo;
  853. RtlFieldStrInfo dummyField;
  854. RRowBuilder myRRowBuilder;
  855. };
  856. // Each call to a R function will use a new REmbedFunctionContext object
  857. // This takes care of ensuring that the critsec is locked while we are executing R code,
  858. // and released when we are not
  859. class REmbedFunctionContext: public CInterfaceOf<IEmbedFunctionContext>
  860. {
  861. public:
  862. REmbedFunctionContext(RInside &_R)
  863. : R(_R), block(RCrit), result(R_NilValue)
  864. {
  865. }
  866. void setScopes(ICodeContext *codeCtx, const char *_options)
  867. {
  868. StringArray options;
  869. options.appendList(_options, ",");
  870. StringBuffer scopeKey;
  871. const char *scopeKey2 = nullptr;
  872. bool registerCallback = false;
  873. bool wuidScope = false;
  874. IEngineContext *engine = nullptr;
  875. ForEachItemIn(idx, options)
  876. {
  877. const char *opt = options.item(idx);
  878. const char *val = strchr(opt, '=');
  879. if (val)
  880. {
  881. StringBuffer optName(val-opt, opt);
  882. val++;
  883. if (strieq(optName, "globalscope"))
  884. scopeKey2 = val;
  885. else if (strieq(optName, "persist"))
  886. {
  887. if (scopeKey.length())
  888. failx("persist option specified more than once");
  889. if (strieq(val, "global"))
  890. scopeKey.append("global");
  891. else if (strieq(val, "workunit"))
  892. {
  893. engine = codeCtx->queryEngineContext();
  894. wuidScope = true;
  895. if (!engine)
  896. failx("Persist mode 'workunit' not supported here");
  897. }
  898. else if (strieq(val, "query"))
  899. {
  900. engine = codeCtx->queryEngineContext();
  901. wuidScope = false;
  902. if (!engine)
  903. failx("Persist mode 'query' not supported here");
  904. }
  905. else
  906. failx("Unrecognized persist mode %s", val);
  907. }
  908. else
  909. failx("Unrecognized option %s", optName.str());
  910. }
  911. else
  912. failx("Unrecognized option %s", opt);
  913. }
  914. if (engine)
  915. engine->getQueryId(scopeKey, wuidScope);
  916. if (scopeKey2)
  917. scopeKey.append(':').append(scopeKey2);
  918. if (scopeKey.length())
  919. {
  920. bool isNew;
  921. env.setown(globalState->getNamedScope(scopeKey, isNew));
  922. if (isNew && engine)
  923. engine->onTermination(RGlobalState::unregister, scopeKey.str(), wuidScope);
  924. }
  925. else
  926. env.setown(new REnvironment(_new_env(Rcpp::Environment::global_env())));
  927. }
  928. ~REmbedFunctionContext()
  929. {
  930. }
  931. virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
  932. {
  933. return NULL;
  934. }
  935. virtual void paramWriterCommit(IInterface *writer)
  936. {
  937. }
  938. virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer)
  939. {
  940. }
  941. virtual bool getBooleanResult()
  942. {
  943. try
  944. {
  945. return ::Rcpp::as<bool>(result);
  946. }
  947. catch (std::exception &E)
  948. {
  949. FAIL(E.what());
  950. }
  951. }
  952. virtual void getDataResult(size32_t &__len, void * &__result)
  953. {
  954. try
  955. {
  956. std::vector<byte> vval = ::Rcpp::as<std::vector<byte> >(result);
  957. rtlStrToDataX(__len, __result, vval.size(), vval.data());
  958. }
  959. catch (std::exception &E)
  960. {
  961. FAIL(E.what());
  962. }
  963. }
  964. virtual double getRealResult()
  965. {
  966. try
  967. {
  968. return ::Rcpp::as<double>(result);
  969. }
  970. catch (std::exception &E)
  971. {
  972. FAIL(E.what());
  973. }
  974. }
  975. virtual __int64 getSignedResult()
  976. {
  977. try
  978. {
  979. return ::Rcpp::as<long int>(result); // Should really be long long, but RInside does not support that
  980. }
  981. catch (std::exception &E)
  982. {
  983. FAIL(E.what());
  984. }
  985. }
  986. virtual unsigned __int64 getUnsignedResult()
  987. {
  988. try
  989. {
  990. return ::Rcpp::as<unsigned long int>(result); // Should really be long long, but RInside does not support that
  991. }
  992. catch (std::exception &E)
  993. {
  994. FAIL(E.what());
  995. }
  996. }
  997. virtual void getStringResult(size32_t &__len, char * &__result)
  998. {
  999. try
  1000. {
  1001. std::string str = ::Rcpp::as<std::string>(result);
  1002. rtlStrToStrX(__len, __result, str.length(), str.data());
  1003. }
  1004. catch (std::exception &E)
  1005. {
  1006. FAIL(E.what());
  1007. }
  1008. }
  1009. virtual void getUTF8Result(size32_t &chars, char * &result)
  1010. {
  1011. UNSUPPORTED("Unicode/UTF8 results");
  1012. }
  1013. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  1014. {
  1015. UNSUPPORTED("Unicode/UTF8 results");
  1016. }
  1017. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int _elemType, size32_t elemSize)
  1018. {
  1019. try
  1020. {
  1021. type_t elemType = (type_t) _elemType;
  1022. __isAllResult = false;
  1023. switch(elemType)
  1024. {
  1025. #define FETCH_ARRAY(type) \
  1026. { \
  1027. std::vector<type> vval = ::Rcpp::as< std::vector<type> >(result); \
  1028. rtlStrToDataX(__resultBytes, __result, vval.size()*elemSize, (const void *) vval.data()); \
  1029. }
  1030. case type_boolean:
  1031. {
  1032. std::vector<bool> vval = ::Rcpp::as< std::vector<bool> >(result);
  1033. size32_t size = vval.size();
  1034. // Vector of bool is odd, and can't be retrieved via data()
  1035. // Instead we need to iterate, I guess
  1036. rtlDataAttr out(size);
  1037. bool *outData = (bool *) out.getdata();
  1038. for (std::vector<bool>::iterator iter = vval.begin(); iter < vval.end(); iter++)
  1039. {
  1040. *outData++ = *iter;
  1041. }
  1042. __resultBytes = size;
  1043. __result = out.detachdata();
  1044. break;
  1045. }
  1046. case type_int:
  1047. /* if (elemSize == sizeof(signed char)) // rcpp does not seem to support...
  1048. FETCH_ARRAY(signed char)
  1049. else */ if (elemSize == sizeof(short))
  1050. FETCH_ARRAY(short)
  1051. else if (elemSize == sizeof(int))
  1052. FETCH_ARRAY(int)
  1053. else if (elemSize == sizeof(long)) // __int64 / long long does not work...
  1054. FETCH_ARRAY(long)
  1055. else
  1056. rtlFail(0, "Rembed: Unsupported result type");
  1057. break;
  1058. case type_unsigned:
  1059. if (elemSize == sizeof(byte))
  1060. FETCH_ARRAY(byte)
  1061. else if (elemSize == sizeof(unsigned short))
  1062. FETCH_ARRAY(unsigned short)
  1063. else if (elemSize == sizeof(unsigned int))
  1064. FETCH_ARRAY(unsigned int)
  1065. else if (elemSize == sizeof(unsigned long)) // __int64 / long long does not work...
  1066. FETCH_ARRAY(unsigned long)
  1067. else
  1068. rtlFail(0, "Rembed: Unsupported result type");
  1069. break;
  1070. case type_real:
  1071. if (elemSize == sizeof(float))
  1072. FETCH_ARRAY(float)
  1073. else if (elemSize == sizeof(double))
  1074. FETCH_ARRAY(double)
  1075. else
  1076. rtlFail(0, "Rembed: Unsupported result type");
  1077. break;
  1078. case type_string:
  1079. case type_varstring:
  1080. {
  1081. std::vector<std::string> vval = ::Rcpp::as< std::vector<std::string> >(result);
  1082. size32_t numResults = vval.size();
  1083. rtlRowBuilder out;
  1084. byte *outData = NULL;
  1085. size32_t outBytes = 0;
  1086. if (elemSize != UNKNOWN_LENGTH)
  1087. {
  1088. outBytes = numResults * elemSize; // MORE - check for overflow?
  1089. out.ensureAvailable(outBytes);
  1090. outData = out.getbytes();
  1091. }
  1092. for (std::vector<std::string>::iterator iter = vval.begin(); iter < vval.end(); iter++)
  1093. {
  1094. size32_t lenBytes = (*iter).size();
  1095. const char *text = (*iter).data();
  1096. if (elemType == type_string)
  1097. {
  1098. if (elemSize == UNKNOWN_LENGTH)
  1099. {
  1100. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  1101. outData = out.getbytes() + outBytes;
  1102. * (size32_t *) outData = lenBytes;
  1103. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  1104. outBytes += lenBytes + sizeof(size32_t);
  1105. }
  1106. else
  1107. {
  1108. rtlStrToStr(elemSize, outData, lenBytes, text);
  1109. outData += elemSize;
  1110. }
  1111. }
  1112. else
  1113. {
  1114. if (elemSize == UNKNOWN_LENGTH)
  1115. {
  1116. out.ensureAvailable(outBytes + lenBytes + 1);
  1117. outData = out.getbytes() + outBytes;
  1118. rtlStrToVStr(0, outData, lenBytes, text);
  1119. outBytes += lenBytes + 1;
  1120. }
  1121. else
  1122. {
  1123. rtlStrToVStr(elemSize, outData, lenBytes, text); // Fixed size null terminated strings... weird.
  1124. outData += elemSize;
  1125. }
  1126. }
  1127. }
  1128. __resultBytes = outBytes;
  1129. __result = out.detachdata();
  1130. break;
  1131. }
  1132. default:
  1133. rtlFail(0, "REmbed: Unsupported result type");
  1134. break;
  1135. }
  1136. }
  1137. catch (std::exception &E)
  1138. {
  1139. FAIL(E.what());
  1140. }
  1141. }
  1142. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  1143. {
  1144. try
  1145. {
  1146. return new RRowStream(result, _resultAllocator, _resultAllocator->queryOutputMeta()->queryTypeInfo());
  1147. }
  1148. catch (std::exception &E)
  1149. {
  1150. FAIL(E.what());
  1151. }
  1152. }
  1153. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  1154. {
  1155. try
  1156. {
  1157. RtlDynamicRowBuilder rowBuilder(_resultAllocator);
  1158. size32_t len = Rembed::getRowResult(result, rowBuilder);
  1159. return (byte *) rowBuilder.finalizeRowClear(len);
  1160. }
  1161. catch (std::exception &E)
  1162. {
  1163. FAIL(E.what());
  1164. }
  1165. }
  1166. virtual size32_t getTransformResult(ARowBuilder & builder)
  1167. {
  1168. try
  1169. {
  1170. return Rembed::getRowResult(result, builder);
  1171. }
  1172. catch (std::exception &E)
  1173. {
  1174. FAIL(E.what());
  1175. }
  1176. }
  1177. virtual void bindBooleanParam(const char *name, bool val)
  1178. {
  1179. env->query()[name] = val;
  1180. }
  1181. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  1182. {
  1183. std::vector<byte> vval;
  1184. const byte *cval = (const byte *) val;
  1185. vval.assign(cval, cval+len);
  1186. env->query()[name] = vval;
  1187. }
  1188. virtual void bindFloatParam(const char *name, float val)
  1189. {
  1190. env->query()[name] = val;
  1191. }
  1192. virtual void bindRealParam(const char *name, double val)
  1193. {
  1194. env->query()[name] = val;
  1195. }
  1196. virtual void bindSignedSizeParam(const char *name, int size, __int64 val)
  1197. {
  1198. env->query()[name] = (long int) val;
  1199. }
  1200. virtual void bindSignedParam(const char *name, __int64 val)
  1201. {
  1202. env->query()[name] = (long int) val;
  1203. }
  1204. virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val)
  1205. {
  1206. env->query()[name] = (long int) val;
  1207. }
  1208. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  1209. {
  1210. env->query()[name] = (unsigned long int) val;
  1211. }
  1212. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  1213. {
  1214. std::string s(val, len);
  1215. env->query()[name] = s;
  1216. }
  1217. virtual void bindVStringParam(const char *name, const char *val)
  1218. {
  1219. env->query()[name] = val;
  1220. }
  1221. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  1222. {
  1223. rtlFail(0, "Rembed: Unsupported parameter type UTF8");
  1224. }
  1225. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  1226. {
  1227. rtlFail(0, "Rembed: Unsupported parameter type UNICODE");
  1228. }
  1229. virtual void bindSetParam(const char *name, int _elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData)
  1230. {
  1231. if (isAll)
  1232. rtlFail(0, "Rembed: Unsupported parameter type ALL");
  1233. type_t elemType = (type_t) _elemType;
  1234. int numElems = totalBytes / elemSize;
  1235. switch(elemType)
  1236. {
  1237. #define BIND_ARRAY(type) \
  1238. { \
  1239. std::vector<type> vval; \
  1240. const type *start = (const type *) setData; \
  1241. vval.assign(start, start+numElems); \
  1242. env->query()[name] = vval; \
  1243. }
  1244. case type_boolean:
  1245. BIND_ARRAY(bool)
  1246. break;
  1247. case type_int:
  1248. /* if (elemSize == sizeof(signed char)) // No binding exists in rcpp
  1249. BIND_ARRAY(signed char)
  1250. else */ if (elemSize == sizeof(short))
  1251. BIND_ARRAY(short)
  1252. else if (elemSize == sizeof(int))
  1253. BIND_ARRAY(int)
  1254. else if (elemSize == sizeof(long)) // __int64 / long long does not work...
  1255. BIND_ARRAY(long)
  1256. else
  1257. rtlFail(0, "Rembed: Unsupported parameter type");
  1258. break;
  1259. case type_unsigned:
  1260. if (elemSize == sizeof(unsigned char))
  1261. BIND_ARRAY(unsigned char)
  1262. else if (elemSize == sizeof(unsigned short))
  1263. BIND_ARRAY(unsigned short)
  1264. else if (elemSize == sizeof(unsigned int))
  1265. BIND_ARRAY(unsigned int)
  1266. else if (elemSize == sizeof(unsigned long)) // __int64 / long long does not work...
  1267. BIND_ARRAY(unsigned long)
  1268. else
  1269. rtlFail(0, "Rembed: Unsupported parameter type");
  1270. break;
  1271. case type_real:
  1272. if (elemSize == sizeof(float))
  1273. BIND_ARRAY(float)
  1274. else if (elemSize == sizeof(double))
  1275. BIND_ARRAY(double)
  1276. else
  1277. rtlFail(0, "Rembed: Unsupported parameter type");
  1278. break;
  1279. case type_string:
  1280. case type_varstring:
  1281. {
  1282. std::vector<std::string> vval;
  1283. const byte *inData = (const byte *) setData;
  1284. const byte *endData = inData + totalBytes;
  1285. while (inData < endData)
  1286. {
  1287. int thisSize;
  1288. if (elemSize == UNKNOWN_LENGTH)
  1289. {
  1290. if (elemType==type_varstring)
  1291. thisSize = strlen((const char *) inData) + 1;
  1292. else
  1293. {
  1294. thisSize = * (size32_t *) inData;
  1295. inData += sizeof(size32_t);
  1296. }
  1297. }
  1298. else
  1299. thisSize = elemSize;
  1300. std::string s((const char *) inData, thisSize);
  1301. vval.push_back(s);
  1302. inData += thisSize;
  1303. numElems++;
  1304. }
  1305. env->query()[name] = vval;
  1306. break;
  1307. }
  1308. default:
  1309. rtlFail(0, "REmbed: Unsupported parameter type");
  1310. break;
  1311. }
  1312. }
  1313. virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, const byte *row) override
  1314. {
  1315. // We create a list
  1316. const RtlTypeInfo *typeInfo = metaVal.queryTypeInfo();
  1317. assertex(typeInfo);
  1318. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1319. Rcpp::List myList = createList(typeInfo);
  1320. RDataFrameAppender frameBuilder(myList);
  1321. typeInfo->process(row, row, &dummyField, frameBuilder);
  1322. env->query()[name] = myList;
  1323. }
  1324. virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
  1325. {
  1326. OwnedRoxieRowSet rows;
  1327. for (;;)
  1328. {
  1329. const byte *row = (const byte *) val->ungroupedNextRow();
  1330. if (!row)
  1331. break;
  1332. rows.append(row);
  1333. }
  1334. const RtlTypeInfo *typeInfo = metaVal.queryTypeInfo();
  1335. assertex(typeInfo);
  1336. Rcpp::DataFrame frame = createDataFrame(typeInfo, rows.length());
  1337. RDataFrameAppender frameBuilder(frame);
  1338. RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
  1339. ForEachItemIn(idx, rows)
  1340. {
  1341. const byte * row = (const byte *) rows.item(idx);
  1342. typeInfo->process(row, row, &dummyField, frameBuilder);
  1343. }
  1344. env->query()[name] = frame;
  1345. }
  1346. virtual void importFunction(size32_t lenChars, const char *utf)
  1347. {
  1348. throwUnexpected();
  1349. }
  1350. virtual void compileEmbeddedScript(size32_t lenChars, const char *utf)
  1351. {
  1352. StringBuffer text;
  1353. text.append(rtlUtf8Size(lenChars, utf), utf);
  1354. text.stripChar('\r');
  1355. func.set(text.str());
  1356. }
  1357. virtual void loadCompiledScript(size32_t chars, const void *_script) override
  1358. {
  1359. throwUnexpected();
  1360. }
  1361. virtual void enter() override {}
  1362. virtual void reenter(ICodeContext *codeCtx) override {}
  1363. virtual void exit() override {}
  1364. virtual void callFunction()
  1365. {
  1366. try
  1367. {
  1368. Rcpp::ExpressionVector exp(func) ;
  1369. result = exp.eval(env->query());
  1370. }
  1371. catch (std::exception &E)
  1372. {
  1373. FAIL(E.what());
  1374. }
  1375. }
  1376. private:
  1377. RInside &R;
  1378. RInside::Proxy result;
  1379. StringAttr func;
  1380. CriticalBlock block;
  1381. Owned<REnvironment> env;
  1382. };
  1383. class REmbedContext: public CInterfaceOf<IEmbedContext>
  1384. {
  1385. public:
  1386. virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
  1387. {
  1388. return createFunctionContextEx(nullptr, nullptr, flags, options);
  1389. }
  1390. virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
  1391. {
  1392. Owned<REmbedFunctionContext> ret = new REmbedFunctionContext(*queryGlobalState()->R);
  1393. ret->setScopes(ctx, options);
  1394. return ret.getClear();
  1395. }
  1396. virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
  1397. {
  1398. throwUnexpected();
  1399. }
  1400. };
  1401. extern DECL_EXPORT IEmbedContext* getEmbedContext()
  1402. {
  1403. return new REmbedContext;
  1404. }
  1405. extern DECL_EXPORT void syntaxCheck(size32_t & __lenResult, char * & __result, const char *funcname, size32_t charsBody, const char * body, const char *argNames, const char *compilerOptions, const char *persistOptions)
  1406. {
  1407. StringBuffer result;
  1408. try
  1409. {
  1410. Owned<REmbedFunctionContext> ctx = new REmbedFunctionContext(*queryGlobalState()->R);
  1411. // MORE - could check supplied persistOptions are valid
  1412. StringBuffer text;
  1413. text.append(rtlUtf8Size(charsBody, body), body);
  1414. text.stripChar('\r');
  1415. Rcpp::ExpressionVector exp(text);
  1416. }
  1417. catch (std::exception &E)
  1418. {
  1419. result.append("Rembed: Parse error from R while checking embedded code"); // Unfortunately we don't get any info about the error position or nature, just "Parse error."
  1420. }
  1421. __lenResult = result.length();
  1422. __result = result.detach();
  1423. }
  1424. } // namespace